天天看点

以dubbo源码为例-使用lambda重构面向对象模块

作者:闪念基因
以dubbo源码为例-使用lambda重构面向对象模块

本文将以 Dubbo 源码为例,和您讨论如何使用 Lambda 和面向对象两种方法,对 Java 应用进行重构。并以实例展示了两者结合,写出简洁优雅的代码。

1. 使用 Lambda 改善可读性和灵活性

1.1 改善代码的可读性

Java 8 的 Lambda 新特性也可以帮助提升代码的可读性:

  • 使用 Lambda,你可以减少冗长的代码,让代码更易于理解
  • 通过方法引用和 Stream API,你的代码会变得更直观

1.2 使用场景

在回调场景,可以使用 Lambda 简化代码。

比如,我们在使用 JDBC 的时候,资源用完要马上释放,否则会资源泄漏,如代码示例。

public String query(String sql) {
        connection = getConnection();
        statement = connection.getStatement(sql);
        ResultSet rs = statement.executeQuery();
        String result;
        try {
            while (rs.next()) {
                //执行具体操作
                result = rs.getString("name");
            }
        } finally {
            rs.close();
            statement.close;
        }
    }
           

query 函数中,除了取结果的 result = rs.getString("name") 这句,其他都是样板代码。这个时候,是不是可以使用模板方法的设计模式解决,抽象一个 AbstractQuery,并使用不同的子类实现查询,如下:

public class AbstractQuery {

public String query(String sql) {
        connection = getConnection();
        statement = connection.getStatement(sql);
        ResultSet rs = statement.executeQuery();
        String result;
        try {
            while (rs.next()) {
                //执行具体操作
                result = getResult(rs);
            }
          return result;
        } finally {
            rs.close();
            statement.close;
        }
    }
  
  abstract String getResult();
}

public class NameQuery extends AbstractQuery {
  public String query(ResultSet rs) {
    return rs.getString("name");
  }
  
  public static void main(String[] args) {
        new NameQuery().query("xxxxx");
    }
}

           

显然,这种方式不可取,因为查询的 sql 是动态的,得写无数类去实现。

这个时候,我们可以使用函数式来实现。

public class JbdcUtil {
public String query(String sql, Function<ResultSet, String> resFunc) {
        connection = getConnection();
        statement = connection.getStatement(sql);
        ResultSet rs = statement.executeQuery();
        String result;
        try {
            while (rs.next()) {
                //执行具体操作
                result = resFunc.apply(rs);
            }
            return result;
        } finally {
            rs.close();
            statement.close;
        }
    }
  
   public static void main(String[] args) {
       String result = JbdcUtil.query("select xxxxx", rs ->rs.getString("name"));
    }
}
           

这个样例中,使用了 Jdk 自带的 Function 来实现,因为 Function 的 apply 方法一个参数一个返回,和场景对应。

通常,我们可以用以下几步来确定函数:

1、查看需要的方法数量和是否需要返回

2、根据1的结果,选择 jdk 自带函数,没有合适的则自己写。

@FunctionalInterface
public interface Function<T, R> {

   //根据参数和返回类型确定范型类型。上面示例中,我们需要根据 ResultSet 参数,获取 String 的返回
   //则在参数中写  Function<ResultSet, String> resFunc ,从而与 apply 方法对应
    R apply(T t); 
}
    
           

JbdcUtil.query("xxxxx", rs ->rs.getString("name")); 这里的实参 ,-> 前面的代表参数,后面则是 apply 的 override 实现,如果不习惯,可以写成以下形式,idea可以自动帮助简写。

String result = JbdcUtil.query("select xxxxx", new Function<ResultSet, String>() {
            @Override
            public String apply(ResultSet rs) {
                return rs.getString("name");
            }
        });
           

1.3 Dubbo 源码对 Lambda 的使用

Dubbo 在 3.2 中,引入了完整的可观测性特性。可观测指标,需要对业务代码进行埋点,为统一埋点形式,使用了事件机制。如服务暴露时,需要获取向注册中心的注册 rt 延时、及异常次数指标。

// Provider 启动,向注册中心注册当前实例
public void register() {
   ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
}
           

事件机制埋点

public void register() {
        MetricsEventBus.post(new RegisterEvent(System.currentMills()));
        try {
            ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
        } catch (Exception e) {
            MetricsEventBus.error(new RegisterEvent(System.currentMills()));
            throw e;
        }
        MetricsEventBus.finish(new RegisterEvent(System.currentMills()));
    }
           

可以看到,加入事件后,比初始代码多了不少样板代码。使用 lambda 优化如下

public void register() {
MetricsEventBus.post(new RegisterEvent(),
                () -> {
        ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
                }
            );
}

//使用 jdk 自带 Supplier 函数,因为没有参数,只有返回
public class MetricsEventBus {
    public static <T> T post(MetricsEvent event, Supplier<T> targetSupplier) {
        dispatcher.publishEvent(event);
        T result;
        try {
            result = targetSupplier.get();
        } catch (Throwable e) {
            dispatcher.publishErrorEvent(event);
            throw e;
        }
        return result;
    }
}
           

以上代码对 Dubbo 源码进行了精简,保留了主要逻辑。

2. 面向对象重构

2.1 单次Rpc的请求指标源码分析

Dubbo 收集的指标维度及类型非常的多,比如一次 rpc 请求,需要统计成功、失败次数,rt 方面,需要统计单次 rt,最近一次 rt,平均 rt,总 rt 等,需要一些容器来存放数据。

// lastRT, totalRT, rtCount, avgRT share a container, can utilize the system cache line
    private final ConcurrentMap<M, AtomicLongArray> rtSample = new ConcurrentHashMap<>();
    private final ConcurrentMap<M, LongAccumulator> minRT = new ConcurrentHashMap<>();
    private final ConcurrentMap<M, LongAccumulator> maxRT = new ConcurrentHashMap<>();
    private final ConcurrentMap<K, ConcurrentMap<M, AtomicLongArray>> rtGroupSample = new ConcurrentHashMap<>();
    private final ConcurrentMap<K, ConcurrentMap<M, LongAccumulator>> groupMinRT = new ConcurrentHashMap<>();
    private final ConcurrentMap<K, ConcurrentMap<M, LongAccumulator>> groupMaxRT = new ConcurrentHashMap<>();
           

这些指标数据,有许多相同之处,也有部分差异,比如:

  • 初始化也不相同:不通类型的数据,初始化动作不同。如 int num = 0; long num = 0L; AtomicLongArray = new AtomicLongArray(4);
  • 计算方法不同:比较多的是自增,有取当前值(比如最近一次请求的rt), 也有比较大小(比如rt的最大值,每次需要当前值和集合中的最大值进行比较),还有计算平均值等等。
  • 导出方法不同:集合中数据,需要导出成统一格式(Dubbo 使用了 micrometer)。因为本身数据格式差异,导出方法也有相应差异。

这一系列过程中,很容易把代码写出面向过程,比如对请求 rt 指标的初始化和计算:

public void addRT(S source, Long rt) {
        MetricsCountSampleConfigurer<S, K, M> sampleConfigure = new MetricsCountSampleConfigurer<>();
        sampleConfigure.setSource(source);

        this.rtConfigure(sampleConfigure);

        M metric = sampleConfigure.getMetric();

        // 初始化 AtomicLongArray 类型(不存在时)
        AtomicLongArray rtCalculator = ConcurrentHashMapUtils.computeIfAbsent(this.rtSample, metric, k -> new AtomicLongArray(4));

        // 设置 rt 类型的值(last类型rt)
        rtCalculator.set(0, rt);

        // 自增类型的 rt 更新(累加类型rt)
        rtCalculator.addAndGet(1, rt);

        rtCalculator.incrementAndGet(2);

        // 计算 rt 最小值
        LongAccumulator min = ConcurrentHashMapUtils.computeIfAbsent(minRT, metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));
        min.accumulate(rt);

        LongAccumulator max = ConcurrentHashMapUtils.computeIfAbsent(maxRT, metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));
        max.accumulate(rt);

        sampleConfigure.setRt(rt);

        sampleConfigure.getFireEventHandler().accept(sampleConfigure);
    }
           

以上代码来自 SimpleMetricsCountSampler.addRT(),较明显的面向过程写法,把各容器相同阶段(初始化、计算,这里不包含导出)的不同计算操作(赋值、累加、平均值等),耦合在一个方法中。如果增加了一种容器及新类型计算(假如中位数),只能在addRt方法修改。面向过程的代码特点是,容易出 bug 且不易维护。

2.2 注册中心指标重构实践

1、容器类的抽象

请求指标代码中,对不同数据容器,简单地使用 Map 存储。map 能存储数据,但是没有数据的处理能力,只能依赖调用代码执行处理。我们可以使用功能更全面的独立对象来表示容器。

public class LongContainer<N extends Number> extends ConcurrentHashMap<String, N> {

    /**
     * 指标的 key 类型,比如注册、订阅、通知变更等
     */
    private final transient MetricsKeyWrapper metricsKeyWrapper;
    /**
     *  初始化函数 
     */
    private final transient Function<String, N> initFunc;
    /**
     *  计算函数 
     */
    private final transient BiConsumer<Long, N> consumerFunc;
  
    /**
     *  导出函数 
     */
    private transient Function<String, Long> valueSupplier;


    public LongContainer(MetricsKeyWrapper metricsKeyWrapper, Supplier<N> initFunc, BiConsumer<Long, N> consumerFunc) {
        this.metricsKeyWrapper = metricsKeyWrapper;
        this.initFunc = s -> initFunc.get();
        this.consumerFunc = consumerFunc;
        this.valueSupplier = k -> this.get(k).longValue();
    }

   //省略其他代码
}
           

以上容器类,包含了数据和数据处理函数,代替之前的 Map。

结合函数式编程,对容器类进行初始化,从而简化后续的计算、导出过程。

public class RegistryStatComposite implements MetricsExport {

    public Map<ApplicationType, Map<String, AtomicLong>> applicationNumStats = new ConcurrentHashMap<>();
    public Map<ServiceType, Map<ServiceKeyMetric, AtomicLong>> serviceNumStats = new ConcurrentHashMap<>();
   //rt 容器变为 LongContainer 的集合,更方便遍历及统一处理
    public List<LongContainer<? extends Number>> rtStats = new ArrayList<>();

    public RegistryStatComposite() {
        for (ApplicationType type : ApplicationType.values()) {
            // Application key and increment val
            applicationNumStats.put(type, new ConcurrentHashMap<>());
        }

        for (ServiceType type : ServiceType.values()) {
            // Service key
            serviceNumStats.put(type, new ConcurrentHashMap<>());
        }

        // App-level
        rtStats.addAll(initStats(OP_TYPE_REGISTER, false));
        rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE, false));
        rtStats.addAll(initStats(OP_TYPE_NOTIFY, false));

        // Service-level
        rtStats.addAll(initStats(OP_TYPE_REGISTER_SERVICE, true));
        rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE_SERVICE, true));
      
        //如果需要新增指标类型,此处增加即可
    }

    //初始化容器,设置初始、计算及导出函数
    private List<LongContainer<? extends Number>> initStats(String registryOpType, boolean isServiceLevel) {
        List<LongContainer<? extends Number>> singleRtStats = new ArrayList<>();
        singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_LAST, isServiceLevel)));
        singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MIN, isServiceLevel), new LongAccumulator(Long::min, Long.MAX_VALUE)));
        singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MAX, isServiceLevel), new LongAccumulator(Long::max, Long.MIN_VALUE)));
        singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_SUM, isServiceLevel), (responseTime, longAccumulator) -> longAccumulator.addAndGet(responseTime)));
        // AvgContainer 比较特殊,存储了总数,但是导出函数是平均数计算函数
        AtomicLongContainer avgContainer = new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_AVG, isServiceLevel), (k, v) -> v.incrementAndGet());
        avgContainer.setValueSupplier(applicationName -> {
            LongContainer<? extends Number> totalContainer = rtStats.stream().filter(longContainer -> longContainer.isKeyWrapper(MetricsKey.METRIC_RT_SUM, registryOpType)).findFirst().get();
            AtomicLong totalRtTimes = avgContainer.get(applicationName);
            AtomicLong totalRtSum = (AtomicLong) totalContainer.get(applicationName);
            return totalRtSum.get() / totalRtTimes.get();
        });
        singleRtStats.add(avgContainer);
        return singleRtStats;
    }
 }
           

指标计算:

public void calcApplicationRt(String applicationName, String registryOpType, Long responseTime) {
    for (LongContainer container : appRtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
        Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, applicationName, container.getInitFunc());
        //使用容器的计算函数,执行埋点后的指标计算
        container.getConsumerFunc().accept(responseTime, current);
    }
}
           

可以看出,数据容器,在使用对象代替 map 后,看上去变的精简,可维护性也上升了。

作者:林尧

来源:微信公众号:政采云技术

出处:https://mp.weixin.qq.com/s/FaBIoNsmIFjLvgXdzy2yTw

继续阅读