天天看点

通过scan方式优雅取出redis的数据

创建jedis连接池

@Configuration
@Slf4j
public class RedisConfig {
    @Autowired
    private RedisProperties redisProperties; //从配置文件获取redis连接配置

    private JedisPool jedisPool;

    @Autowired
    private JedisPoolConfig jedisPoolConfig;

    @Bean
    @Scope("prototype")
    /**
     * 多例模式返回redis客户端,但是在依赖注入时,建议不要使用
     * 因为jedis客户端是阻塞式IO的,而且线程不安全,
     * 所以在操作数据库的时候不要使用单例,建议从连接池获取
     */
    public redis.clients.jedis.Jedis jedis(){
        if (jedisPool!=null){
            return jedisPool.getResource();
        }else {
            return jedisPool().getResource();
        }
//        return jedisPool.getResource();
    }

    @Bean
    /**
     * 单例模式返回redis连接池
     * 依赖注入时,建议注入jedisPool,获取连接池,然后调用getResource方法获取jedis客户端
     * 使用完后,调用jedis.close()方法归还连接,节约资源
     */
    public JedisPool jedisPool(){
        synchronized (this){
            if(jedisPool==null){
                synchronized (this){
                    
                    if (StringUtils.isEmpty(redisProperties.getPassword())){
                        //无密码
                        jedisPool = new JedisPool(jedisPoolConfig,redisProperties.getHost(),redisProperties.getPort(),(int)redisProperties.getTimeout().toMillis());
                    }else {//有密码
                        jedisPool = new JedisPool(jedisPoolConfig,redisProperties.getHost(),redisProperties.getPort(),(int)redisProperties.getTimeout().toMillis(),redisProperties.getPassword());
                    }
                         
                }
            }
        }
        return jedisPool;
    }
    

    
    @Bean
    public JedisPoolConfig jedisPoolConfig(){
        // Jedis连接池配置
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive());
        // 最大空闲连接数, 默认8个
//        jedisPoolConfig.setMaxIdle(50);
        jedisPoolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle());
        // 最大连接数, 默认8个
//        jedisPoolConfig.setMaxTotal(300);
        //最小空闲连接数, 默认0
        jedisPoolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle());
//        jedisPoolConfig.setMinIdle(20);
        // 获取连接时的最大等待毫秒数(如果设置为阻塞时BlockWhenExhausted),如果超时就抛异常, 小于零:阻塞不确定的时间,  默认-1
        jedisPoolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().toMillis()); // 设置10秒


//        jedisPoolConfig.setMaxWaitMillis(3000); // 设置2秒
        //对拿到的connection进行validateObject校验
        
        jedisPoolConfig.setTestOnBorrow(true);
        return jedisPoolConfig;
    }
}      

jedis工具类

@Component
@Slf4j
public class RedisUtil {
    @Autowired
    private JedisPool jedisPool;

    /**
     * findValueByKeyOfPipeline和 findValueByKeyOfPipelineMget 的区别在于 前者根据阈值获取多次 后者是直接一次性获取
     */


    /**
     * 根据(不完整)key值通过Pipeline的方式获取值
     * 先通过scan获取全部的key,再通过Pipeline获取全部的值
     * 根据countVPT 这个 阈值来控制 获取多次
     * (通过建立一个管道一次提交)
     * @param redisKey
     * @Auther erpangshou
     * @Date:2021/6/17 10:36
     **/
    public List<String> findValueByKeyOfPipeline(String redisKey) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        ArrayList<Response<List<String>>> responses = new ArrayList<>();
        int countVPT=ConstantCom.BATCH_SIZE;
        try {
            jedis = jedisPool.getResource();
            //从redis获取值刷新航迹信息
            List<String> allKeys = findAllKeys(redisKey);
            if (Objects.isNull(allKeys) || allKeys.isEmpty()) {
                return values;
            }
            Pipeline pipelined = jedis.pipelined();
            ArrayList<String> strings = new ArrayList<>();
            for (String key : allKeys) {
                strings.add(key);
                if (strings.size()==countVPT){
                    ArrayList<String> subList = new ArrayList<>(strings);
                    String[] keys = subList.toArray(new String[subList.size()]);
                    Response<List<String>> mget = pipelined.mget(keys);
                    responses.add(mget);
                    strings=new ArrayList<>();
                }
            }
            //最后的数据
            String[] keys = strings.toArray(new String[strings.size()]);
            Response<List<String>> mget = pipelined.mget(keys);
            responses.add(mget);
            pipelined.sync();
            for (Response<List<String>> respons : responses) {
                values.addAll(respons.get());
            }

        } catch (Exception e) {
            e.printStackTrace();
            log.error("redis查询异常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }

    /**
     * 根据(不完整)key值通过Pipeline的方式获取值
     * 先通过scan获取全部的key,再通过Pipeline获取全部的值
     * (通过Pipeline.mget直接获取)
     * @param redisKey
     * @Auther erpangshou
     * @Date:2021/6/17 10:36
     **/
    public List<String> findValueByKeyOfPipelineMget(String redisKey) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        ArrayList<Response<List<String>>> responses = new ArrayList<>();
        try {
            jedis = jedisPool.getResource();
            //从redis获取值刷新航迹信息
            List<String> allKeys = findAllKeys(redisKey);
            if (Objects.isNull(allKeys) || allKeys.isEmpty()) {
                return values;
            }
//            allKeys=allKeys.subList(0,5000);
            String[] keys = allKeys.toArray(new String[allKeys.size()]);
            Pipeline pipelined = jedis.pipelined();
            Response<List<String>> mget = pipelined.mget(keys);
            pipelined.sync();
            values=mget.get();
        } catch (Exception e) {
            log.error("redis查询异常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }


    /**
     * jedis的met 
     * @param redisKey
     * @Auther erpangshou
     * @return
     */
    public List<String> mget(String redisKey) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        try {
            jedis = jedisPool.getResource();
            //从redis获取值刷新航迹信息
            List<String> allKeys = findAllKeys(redisKey);
            if (Objects.isNull(allKeys) || allKeys.isEmpty()) {
                return values;
            }
            String[] keys = allKeys.toArray(new String[allKeys.size()]);
            List<String> mget1 = jedis.mget(keys);
            values=mget1;

        } catch (Exception e) {
            log.error("redis查询异常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }

    /**
     * 根据(不完整)key值通过Pipeline的方式获取值   --使用传入jedis的方式
     * 先通过scan获取全部的key,再通过Pipeline获取全部的值
     *
     * @param redisKey
     * @Auther erpangshou
     * @Date:2021/6/17 10:36
     **/
    public List<String> findValueByKeyOfPipeline(String redisKey, Jedis jedis) {
        List<String> values = new ArrayList<>();
        try {
            //从redis获取值刷新航迹信息
            List<String> allKeys = findAllKeys(redisKey);
            if (allKeys == null || allKeys.isEmpty()) {
                return values;
            }
            //此处采用Pipeline,以提升性能
            Pipeline pipelined = jedis.pipelined();
            Response<List<String>> mget = pipelined.mget(allKeys.toArray(new String[allKeys.size()]));
            pipelined.sync();
            values = mget.get();
        } catch (Exception e) {
            log.error("redis查询异常:" + e.getMessage());
        }
        return values;
    }

    /**
     * 根据多个完整(精确)key值通过Pipeline的方式获取值
     * 使用完整的key通过Pipeline获取全部的值
     *
     * @param allKeys
     * @Auther erpangshou
     * @Date 2021/6/17  10:33
     */
    public List<String> findAllkeysByKeysOfPipeline(String[] allKeys) {
        Jedis jedis = null;
        List<String> values = new ArrayList<>();
        int countVPT=ConstantCom.BATCH_SIZE;

        try {
            jedis = jedisPool.getResource();
            ArrayList<Response<List<String>>> responses = new ArrayList<>();
            //从redis获取值刷新航迹信息
            if (Objects.isNull(allKeys) || allKeys.length == 0) {
                return values;
            }
            //此处采用Pipeline,以提升性能
            Pipeline pipelined = jedis.pipelined();
            ArrayList<String> strings = new ArrayList<>();
            for (String key : allKeys) {
                strings.add(key);
                if (strings.size()==countVPT){
                    ArrayList<String> subList = new ArrayList<>(strings);
                    String[] keys = subList.toArray(new String[subList.size()]);
                    Response<List<String>> mget = pipelined.mget(keys);
                    responses.add(mget);
                    strings=new ArrayList<>();
                }
            }
            //最后的数据
            String[] keys = strings.toArray(new String[strings.size()]);
            Response<List<String>> mget = pipelined.mget(keys);
            responses.add(mget);
            pipelined.sync();

            for (Response<List<String>> respons : responses) {
                values.addAll(respons.get());
            }

        } catch (Exception e) {
            log.error("redis查询异常:" + e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return values;
    }



    /**
     * 根据多个完整(精确)key值通过Pipeline的方式获取值  --使用传入jedis的方式
     * 使用完整的key通过Pipeline获取全部的值
     *
     * @param redisKeys
     * @Auther erpangshou
     * @Date 2021/6/17  10:33
     */
    public List<String> findAllkeysByKeysOfPipeline(String[] redisKeys, Jedis jedis) {
        List<String> values = new ArrayList<>();
        try {
            //从redis获取值刷新航迹信息
            if (redisKeys == null || redisKeys.length == 0) {
                return values;
            }
            //此处采用Pipeline,以提升性能
            Pipeline pipelined = jedis.pipelined();
            Response<List<String>> mget = pipelined.mget(redisKeys);
            pipelined.sync();
            values = mget.get();
        } catch (Exception e) {
            log.error("redis查询异常:" + e.getMessage());
        }
        return values;
    }

    /**
     * 根据keys 获取value
     *
     * @param pattern
     * @Auther erpangshou
     */
    public List<String> findValueByKey(String pattern) {
        List<String> values = null;
        List<String> keys = findAllKeys(pattern);
        if (keys != null && keys.size() > 0) {
            String[] strings = keys.toArray(new String[keys.size()]);
            values = findAllkeysByKeysOfPipeline(strings);
        }
        return values;
    }

    /**
     * 根据keys 获取value  --使用传入jedis的方式
     *
     * @param pattern
     * @Auther erpangshou
     */
    public List<String> findValueByKey(String pattern, Jedis jedis) {
        List<String> values = null;
        List<String> keys = findAllKeys(pattern);
        if (keys != null && keys.size() > 0) {
            String[] strings = keys.toArray(new String[keys.size()]);
            values = findAllkeysByKeysOfPipeline(strings, jedis);
        }
        return values;
    }

    /**
     * 根据keys 获取value 已map形式返回
     *
     * @param pattern
     * @Auther erpangshou
     */
    public List<Map> findValuetoMapByKey(String pattern, Jedis jedis) {
        List<String> values = new ArrayList<>();
        ArrayList<Map> mapList = new ArrayList<>();
        try {
            List<String> keys = findAllKeys(pattern);
            if (keys != null && keys.size() > 0) {
                String[] strings = keys.toArray(new String[keys.size()]);
                values = findAllkeysByKeysOfPipeline(strings);
            }

            if (ObjectUtil.isNotEmpty(values)) {
                for (String value : values) {
                    Map<String, Object> map = ((JSONObject) JSONObject.parse(value)).toJavaObject(Map.class);
                    mapList.add(map);
                }
            }

        } catch (Exception e) {
            jedis.close();
            e.printStackTrace();
        }
        return mapList;
    }

    /**
     * 因为Redis是单线程jedis.keys()方法会导致数据库阻塞,
     * 可能导致Redis其它业务无法操作,所有采用迭代模式(scan)获取数据
     * 并且keys()方法的时间复杂度为O(n),scan()时间复杂度为O(1)
     */
    public List<String> findAllKeys(String pattern) {
        Jedis jedis = null;
        List<String> total = null;
        try {
            jedis = jedisPool.getResource();
            String cursor = String.valueOf(0);
            total = new ArrayList<>();
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量获取redis的key失败");
        } finally {
            if (ObjectUtil.isNotEmpty(jedis)) {
                jedis.close();
            }
        }
        return total;
    }

    /**
     * 获取全都的key --使用传入jedis的方式
     * 因为Redis是单线程jedis.keys()方法会导致数据库阻塞,
     * 可能导致Redis其它业务无法操作,所有采用迭代模式(scan)获取数据
     * 并且keys()方法的时间复杂度为O(n),scan()时间复杂度为O(1)
     */
    public List<String> findAllKeys(String pattern, Jedis jedis) {
        List<String> total = null;
        try {
            String cursor = String.valueOf(0);
            total = new ArrayList<>();
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量获取redis的key失败");
        }
        return total;
    }

    /**
     * 取代jedis.keys(*)
     *
     * @param pattern
     * @return
     */
    public Set<String> findAllKeysToSet(String pattern) {
        Jedis jedis = null;
        Set<String> total = null;
        try {
            jedis = jedisPool.getResource();
            String cursor = String.valueOf(0);
            total = new HashSet<String>() {
            };
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量获取redis的key失败");

        } finally {
            if (ObjectUtil.isNotEmpty(jedis)) {
                jedis.close();
            }
        }
        return total;
    }

    /**
     * 取代jedis.keys(*)  --使用传入jedis的方式
     * @param pattern
     * @return
     */
    public Set<String> findAllKeysToSet(String pattern, Jedis jedis) {
        Set<String> total = null;
        try {
            String cursor = String.valueOf(0);
            total = new HashSet<String>();
            ScanParams params = new ScanParams();
            params.match(pattern);
            do {
                params.count(ConstantCom.BATCH_SIZE);
                ScanResult<String> result = jedis.scan(cursor, params);
                cursor = result.getStringCursor();
                total.addAll(result.getResult());
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量获取redis的key失败");

        }
        return total;
    }

    /**
     * 实时数据取出,封装成指定的对象集合
     *
     * @param list        redis取出的value集合
     * @param entityClass 需要封装的对象类型
     * @param <T>
     * @return
     */
    public <T> List<T> valueToClass(List<String> list, Class<T> entityClass) {
        List<T> result = new ArrayList<>();
        for (String value : list) {
            Map<String, Object> map = ((JSONObject) JSONObject.parse(value)).toJavaObject(Map.class);
            Set<String> keySet = map.keySet();
            // 将所有key转换为小写
            Map<String, Object> map1 = new HashMap<>();
            for (String set : keySet) {
                map1.put(set.toLowerCase(), map.get(set));
            }
            T instance = null;
            try {
                instance = entityClass.newInstance();
                BeanUtils.populate(instance, map1);
            } catch (InstantiationException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            }
            result.add(instance);
        }
        return result;

    }

    public List<Map<String, Object>> valueToMap(List<String> list) {
        List<Map<String, Object>> result = new ArrayList<>();
        for (String value : list) {
            if (org.apache.commons.lang3.StringUtils.isNotBlank(value)) {
                Map<String, Object> map = ((JSONObject) JSONObject.parse(value)).toJavaObject(Map.class);
                result.add(map);
            }
        }
        return result;
    }

    /**
     * @description redis常用方法封装get
     * @author erpangshou
     * @date 2021/6/11
     **/
    public String get(String key) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!Objects.isNull(jedis)) {
                jedis.close();
            }
        }
        return null;
    }

    /**
     * @description redis常用方法封装set
     * @author erpangshou
     * @date 2021/6/11
     **/
    public String set(String key, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.set(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!Objects.isNull(jedis)) {
                jedis.close();
            }
        }
        return null;
    }
    

    public String hget(String key, String field) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.hget(key, field);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!Objects.isNull(jedis)) {
                jedis.close();
            }
        }
        return null;
    }

    /**
     * 获取redis的hash值
     * 用scan方式模仿hgetAll
     * @param pattern key
     * @Auther erpangshou
     * @return
     */

    public Map<String, String> hScan(String pattern) {
        long startTime = System.currentTimeMillis();
        Jedis jedis = null;

        Map<String, String> result = new HashMap<>();
        List<Map.Entry<String, String>> results = new ArrayList<Map.Entry<String, String>>();
        try {
            jedis = jedisPool.getResource();
            String cursor = String.valueOf(0);
            ScanParams params = new ScanParams();
            params.count(ConstantCom.BATCH_SIZE);

            do {
                ScanResult<Map.Entry<String, String>> scanResult = jedis.hscan(pattern, cursor, params);
                cursor = scanResult.getStringCursor();
                scanResult.getResult().stream().forEach(entry->result.put(entry.getKey(),entry.getValue()));
            } while (Integer.valueOf(cursor) > 0);
        } catch (NumberFormatException e) {
            e.printStackTrace();
            log.error("批量获取redis的key失败");
        } finally {
            if (ObjectUtil.isNotEmpty(jedis)) {
                jedis.close();
            }
        }
        System.out.println("hscansize:"+result.size());
        System.out.println("hscantime:"+(System.currentTimeMillis()-startTime));
        return result;
    }

}