天天看點

Vert.x 響應式微服務設計之-緩存設計

作者:ai暴走de碼農

設計思路

思路很簡單,無非就是優先使用緩存資料,如果緩存為空,則從資料庫加載資料,并緩存,但是由于我們是使用Vert.x實作響應式的微服務,是以上述步驟必須以異步的方式實作,整合這個異步的過程将是一個難點,這裡我們使用到了響應式資料庫用戶端:vertx-jdbc-client和響應式Redis用戶端:vertx-redis-client,它們提供了豐富的API,能夠幫助我們輕松實作上訴目标。

響應式資料庫用戶端:vertx-jdbc-client,主要用到了基于RxJava版本的相關API,如:rxQueryWithParams,基于RxJava版本的API,利用了RxJava的釋出訂閱機制,在寫法上更簡潔,直覺,優雅,避免了原生版本的回調地獄等缺點,大大提高了代碼的可讀性和可維護性。

響應式Redis用戶端:vertx-redis-client,主要用到了三個API:get,getset,expire,分表表示:擷取緩存資料,設定緩存資料,設定緩存KEY過期時間。

核心互動過程如下:

1,讀取緩存

//首先從緩存中擷取值,如果沒有值,則查詢資料庫,并緩存,否則使用緩存的值。
        RedisAPIHolder.getRedisAPI().get(cacheKey)
                .onSuccess(val -> {
                    //緩存中沒有資料,則從資料加載,并緩存
                    if(val == null){
                        LOGGER.info("預編譯SQL:{},參數:{}",finalSql,dataAll);
                        //被觀察者對象
                        Single<ResultSet> resultSet = dbClient.rxQueryWithParams(finalSql, dataAll);
                        //觀察者對象
                        SingleObserver<List<JsonObject>> singleObserver = RedisAPIHolder.toObserver(cacheKey, resultHandler);
                        //綁定:觀察者對象訂閱被觀察者對象釋出的事件
                        resultSet.map(ResultSet::getRows).subscribe(singleObserver);
                    }else{//緩存有值,則使用緩存的值
                        LOGGER.info("從緩存擷取值:key->{}", cacheKey);
                        List<JsonObject> jsonObjectList = toJsonObjectList(val.toString());
                        resultHandler.handle(Future.succeededFuture(jsonObjectList));
                    }
                })
                .onFailure(event -> LOGGER.error("從緩存擷取值失敗:key->{},{}", cacheKey,event.getCause()));           

2,緩存資料并設定過期時間

/**
     * 異步結果觀察者對象
     * <li></li>
     * @author javacoo
     * @date 2023/6/11 22:25

     * @param cacheKey 緩存KEY
     * @param handler 異步結果處理器
     * @return: SingleObserver<T>
     */
    public static <T> SingleObserver<T> toObserver(String cacheKey,Handler<AsyncResult<T>> handler) {
        AtomicBoolean completed = new AtomicBoolean();
        return new SingleObserver<T>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onSuccess(@NonNull T item) {
                if (completed.compareAndSet(false, true)) {
                    //響應結果
                    handler.handle(Future.succeededFuture(item));
                    //緩存資料,并設定過期時間
                    redisAPI.getset(cacheKey, JSONUtil.toJsonStr(item))
                            .onSuccess(val -> {
                                LOGGER.info("查詢資料庫擷取值,并緩存:key->{},expireTime->{}秒",cacheKey,redisProperties.getExpireTime());
                                //設定過期時間
                                List<String> command = new ArrayList<>();
                                command.add(cacheKey);
                                command.add(String.valueOf(redisProperties.getExpireTime()));
                                redisAPI.expire(command);
                            })
                            .onFailure(event -> LOGGER.error("緩存資料失敗:key->{},{}",cacheKey,event.getCause()));

                }
            }
            @Override
            public void onError(Throwable error) {
                if (completed.compareAndSet(false, true)) {
                    handler.handle(Future.failedFuture(error));
                }
            }
        };
    }           

相關配置及核心代碼實作

一:導入依賴

<dependency>
            <groupId>org.javacoo.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-rx-java2</artifactId>
            <version>${vertx.version}</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-jdbc-client</artifactId>
            <version>${vertx.version}</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-config-yaml</artifactId>
            <version>${vertx.version}</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-pg-client</artifactId>
            <version>${vertx.version}</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.2</version>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-redis-client</artifactId>
            <version>${vertx.version}</version>
        </dependency>           

二:配置檔案

redis:
  urls:
    - redis://127.0.0.1:6379/0
  clientType: STANDALONE
  poolName: p-red
  poolCleanerInterval: 30000
  maxPoolSize: 8
  maxPoolWaiting: 32
  password: 123456           

三:redis配置對象:包含在:ConfigProperties配置對象中

public static class RedisProperties {
        private RedisClientType clientType = RedisClientType.STANDALONE;
        private String[] urls;
        private String password;
        private String poolName = "redis-p";
        private int poolCleanerInterval = 30_000;
        private int maxPoolSize = 8;
        private int maxPoolWaiting = 32;
        private String masterName = "mymaster";
        private RedisRole role = RedisRole.MASTER;

        private Integer expireTime = 60 * 60;
    //省略getter,setter
    ...
}           

四:加載配置:加載系統相關配置

private ConfigRetrieverOptions initOptions() {
        // 使用預設ConfigStore
        ConfigRetrieverOptions options = new ConfigRetrieverOptions().setIncludeDefaultStores(false);
        // 禁用配置重新整理
        options.setScanPeriod(-1);
        //加載主配置
        options.addStore(new ConfigStoreOptions()
                .setType("file")
                .setFormat("yaml")
                .setOptional(true)
                .setConfig(new JsonObject().put("path", "application.yaml")));
        String envFile = new StringBuilder("application-").append(env).append(".yaml").toString();
        //加載環境配置
        options.addStore(new ConfigStoreOptions()
                .setType("file")
                .setFormat("yaml")
                .setOptional(true)
                .setConfig(new JsonObject().put("path", envFile)));
        // 禁用緩存
        options.getStores().forEach(store -> {
            store.getConfig().put("cache", "false");
        });
        return options;
    }           

五:初始化系統配置對象

public class ConfigPropertiesHolder {
    private static ConfigProperties configProperties;
    public static void init(JsonObject envConfig){
        Objects.requireNonNull(envConfig, "未初始化envConfig");
        configProperties = envConfig.mapTo(ConfigProperties.class);
    }
    public static ConfigProperties getConfigProperties() {
        Objects.requireNonNull(configProperties, "未初始化ConfigProperties");
        return configProperties;
    }
}           

六:Redis緩存元件

初始化

//初始化RedisClientHolder
                RedisAPIHolder.init(mainVertx, envConfig, redisConnectionAsyncResult -> {
                    if(redisConnectionAsyncResult.succeeded()){
                        LOGGER.info("redis初始化成功");
                    }else{
                        LOGGER.error("redis初始化失敗",redisConnectionAsyncResult.cause());
                    }
                });           

RedisClientHolder

/**
 * Redis持有者
 * <li></li>
 *
 * @author javacoo
 * @version 1.0
 * @date 2023/6/11 21:55
 */
public class RedisAPIHolder {
    protected static final Logger LOGGER = LoggerFactory.getLogger(RedisAPIHolder.class);
    private static RedisAPI redisAPI;
    private static Redis redis;

    private static ConfigProperties.RedisProperties redisProperties;
    public static void init(Vertx vertx, JsonObject envConfig,Handler<AsyncResult<RedisConnection>> resultHandler) {
        Objects.requireNonNull(envConfig, "未初始化envConfig");
        //解析配置
        redisProperties = envConfig.mapTo(ConfigProperties.class).getRedis();
        RedisOptions options = new RedisOptions()
                .setType(redisProperties.getClientType())
                .setPoolName(redisProperties.getPoolName())
                .setMaxPoolSize(redisProperties.getMaxPoolSize())
                .setMaxPoolWaiting(redisProperties.getMaxPoolWaiting())
                .setPoolCleanerInterval(redisProperties.getPoolCleanerInterval());

        // password
        if (StrUtil.isNotBlank(redisProperties.getPassword())) {
            options.setPassword(redisProperties.getPassword());
        }
        // connect address [redis://localhost:7000, redis://localhost:7001]
        for (String url : redisProperties.getUrls()) {
            options.addConnectionString(url);
        }
        // sentinel
        if (redisProperties.getClientType().equals(RedisClientType.SENTINEL)) {
            options.setRole(redisProperties.getRole()).setMasterName(redisProperties.getMasterName());
        }
        //建立redisclient執行個體
        redis = Redis.createClient(vertx, options);
        redis.connect(resultHandler);
    }

    public static RedisAPI getRedisAPI() {
        Objects.requireNonNull(redis, "未初始化Redis");
        redisAPI = RedisAPI.api(redis);
        return redisAPI;
    }
    /**
     * 異步結果觀察者對象
     * <li></li>
     * @author javacoo
     * @date 2023/6/11 22:25

     * @param cacheKey 緩存KEY
     * @param handler 異步結果處理器
     * @return: SingleObserver<T>
     */
    public static <T> SingleObserver<T> toObserver(String cacheKey,Handler<AsyncResult<T>> handler) {
        AtomicBoolean completed = new AtomicBoolean();
        return new SingleObserver<T>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
            @Override
            public void onSuccess(@NonNull T item) {
                if (completed.compareAndSet(false, true)) {
                    //響應結果
                    handler.handle(Future.succeededFuture(item));
                    //緩存資料,并設定過期時間
                    redisAPI.getset(cacheKey, JSONUtil.toJsonStr(item))
                            .onSuccess(val -> {
                                LOGGER.info("查詢資料庫擷取值,并緩存:key->{},expireTime->{}秒",cacheKey,redisProperties.getExpireTime());
                                //設定過期時間
                                List<String> command = new ArrayList<>();
                                command.add(cacheKey);
                                command.add(String.valueOf(redisProperties.getExpireTime()));
                                redisAPI.expire(command);
                            })
                            .onFailure(event -> LOGGER.error("緩存資料失敗:key->{},{}",cacheKey,event.getCause()));

                }
            }
            @Override
            public void onError(Throwable error) {
                if (completed.compareAndSet(false, true)) {
                    handler.handle(Future.failedFuture(error));
                }
            }
        };
    }
}           

七:元件使用

/**
     * 加載集合資料
     * <li></li>
     * @author javacoo
     * @date 2023/6/10 22:42
     * @param dbClient dbClient
     * @param resultHandler resultHandler
     * @param finalSql 預編譯SQL
     * @param dataAll SQL參數
     * @param cacheKey 緩存KEY
     * @return: void
     */
    public static void loadListData(JDBCClient dbClient,Handler<AsyncResult<List<JsonObject>>> resultHandler, String finalSql, JsonArray dataAll, String cacheKey) {
        //首先從緩存中擷取值,如果沒有值,則查詢資料庫,并緩存,否則使用緩存的值。
        RedisAPIHolder.getRedisAPI().get(cacheKey)
                .onSuccess(val -> {
                    //緩存中沒有資料,則從資料加載,并緩存
                    if(val == null){
                        LOGGER.info("預編譯SQL:{},參數:{}",finalSql,dataAll);
                        //被觀察者對象
                        Single<ResultSet> resultSet = dbClient.rxQueryWithParams(finalSql, dataAll);
                        //觀察者對象
                        SingleObserver<List<JsonObject>> singleObserver = RedisAPIHolder.toObserver(cacheKey, resultHandler);
                        //綁定:觀察者對象訂閱被觀察者對象釋出的事件
                        resultSet.map(ResultSet::getRows).subscribe(singleObserver);
                    }else{//緩存有值,則使用緩存的值
                        LOGGER.info("從緩存擷取值:key->{}", cacheKey);
                        List<JsonObject> jsonObjectList = toJsonObjectList(val.toString());
                        resultHandler.handle(Future.succeededFuture(jsonObjectList));
                    }
                })
                .onFailure(event -> LOGGER.error("從緩存擷取值失敗:key->{},{}", cacheKey,event.getCause()));
    }           

八:測試

第一次查詢:

[2023-06-17 21:13:28.126] [level: INFO] [Thread: vert.x-eventloop-thread-1] [ Class:org.javacoo.vertx.core.factory.RouterHandlerFactory >> Method: lambda$createRouter$1:92 ]
INFO:執行交易->appid:null,交易流水号:null,方法:http://127.0.0.1:8331/api/basicEducation/getBasicEduSpecialTeachersTotalByQualificationType,參數:{"areaCode":"510000000000","level":"1"}

[2023-06-17 21:13:28.246] [level: DEBUG] [Thread: vert.x-worker-thread-3] [ Class:io.vertx.redis.client.impl.RedisConnectionManager >> Method: ?:? ]
DEBUG:{mode: standalone, server: redis, role: master, proto: 3, id: 95923, version: 7.0.4, modules: []}

[2023-06-17 21:13:28.344] [level: INFO] [Thread: vert.x-worker-thread-3] [ Class:edu.sc.gis.api.utils.ServiceUtils >> Method: lambda$loadListData$0:40 ]
INFO:預編譯SQL:SELECT sum(g.zrjss) total , sum(g.jzgs) empTotal , g.eduType, g.qualificationType FROM(SELECT t1.zrjss, t1.jzgs, replace(t1.xl, 'XL@GJ@','') qualificationType, t.xxbxlx eduType FROM gis_js_xx_jbxx t LEFT JOIN gis_js_xlxb_rs t1 ON t1.gis_xx_jbxx = t.gis_xx_jbxx WHERE t1.zrjss is NOT null AND t1.xl is NOT null AND t.sszgjyxzd LIKE ?) g GROUP BY g.eduType ,g.qualificationType,參數:["51%"]

[2023-06-17 21:13:28.407] [level: DEBUG] [Thread: vert.x-worker-thread-1] [ Class:io.vertx.ext.jdbc.spi.JDBCEncoder >> Method: ?:? ]
DEBUG:Convert JDBC column [JDBCColumnDescriptor[columnName=(0), jdbcTypeWrapper=(JDBCTypeWrapper[vendorTypeNumber=(12), vendorTypeName=(text), vendorTypeClass=(class java.lang.String), jdbcType=(VARCHAR)])]][java.lang.String]

[2023-06-17 21:13:28.980] [level: INFO] [Thread: vert.x-worker-thread-3] [ Class:edu.sc.gis.api.cache.RedisAPIHolder >> Method: lambda$onSuccess$0:83 ]
INFO:查詢資料庫擷取值,并緩存:key->totalTeacherByQueryType_f.qualificationType_51%,expireTime->3600秒           

第二次查詢:

[2023-06-17 21:18:18.121] [level: INFO] [Thread: vert.x-eventloop-thread-1] [ Class:org.javacoo.vertx.core.factory.RouterHandlerFactory >> Method: lambda$createRouter$1:92 ]
INFO:執行交易->appid:null,交易流水号:null,方法:http://127.0.0.1:8331/api/basicEducation/getBasicEduSpecialTeachersTotalByQualificationType,參數:{"areaCode":"510000000000","level":"1"}

[2023-06-17 21:18:18.212] [level: DEBUG] [Thread: vert.x-worker-thread-3] [ Class:io.vertx.redis.client.impl.RedisConnectionManager >> Method: ?:? ]
DEBUG:{mode: standalone, server: redis, role: master, proto: 3, id: 95924, version: 7.0.4, modules: []}

[2023-06-17 21:18:18.392] [level: INFO] [Thread: vert.x-worker-thread-1] [ Class:edu.sc.gis.api.utils.ServiceUtils >> Method: lambda$loadListData$0:44 ]
INFO:從緩存擷取值:key->totalTeacherByQueryType_f.qualificationType_51%           

繼續閱讀