Redis应用实践之资源抢占的思考
- 场景描述
- 实现思路
-
- 抢占过程
- 代码片段
- 总结
场景描述
如果在某个应用中写了一个轮训的定时任务,每30秒执行一次。在集群模式下(假设3台),就相当于3台应用都会执行这个定时任务,这个时候问题就来了:
- 如何保证只有一个节点获得执行权限,其他节点处于备胎状态。
- 一旦获得执行权限的节点挂掉了,从其他备胎节点挑选一个负责执行。
接下来带着这些思考如何通过redis去实现
实现思路
抢占过程
- 通过setnx来抢占该key的值。k = key ,value = ip_时间戳
- 如果抢占成功
- 设置过期时间。
- 如果抢占不成功
- 获取当前key的值
- 判断值是否是当前线程的
- 如果是持锁的线程的,刷新一下时间戳
- 如果非当前线程的
- 比较时间戳和当前时间是否过期(
),过期则删除,重新再开始抢。这里只是为了避免设置过期时间失败的情况
- 比较值如果非当前线程直接返回抢占失败
- 比较时间戳和当前时间是否过期(
- 判断值是否是当前线程的
- 获取当前key的值
- 如果抢占成功
代码片段
实现代码
/**
* 是否是当前资源抢占者
*
* @param key key名称
* @param value 当前线程表示
* @param expire 过期时间(ms)
* @return
* @throws Exception
*/
public boolean isHoldLock(String key, String value, Long expire) throws Exception {
long currentTimeMillis = System.currentTimeMillis();
// 如果不存在缓存中出现的值
NXModel nxModel = new NXModel();
nxModel.setTime(currentTimeMillis);
nxModel.setValue(value);
// 利用setNX设置值
boolean result = cacheTemplate.string().setIfAbsent(key, nxModel);
// 不成功说明当前key已经有值了
if (!result) {
// 判断缓存内的值和当前值比对
NXModel oldValue = (NXModel) cacheTemplate.string().get(key);
if (oldValue == null) { //极端时间过期情况
logger.info("触发极端时间过期情况");
return isHoldLock(key, value, expire);
}
// 如果是value已经过期的情况,这种情况通常发生在设置redis过期时间失败的情况
if ((oldValue.getTime() + expire) < currentTimeMillis) {
logger.info(" {} 触发过期时间计算淘汰..", oldValue.getValue());
cacheTemplate.getRedisTemplate().delete(key);
return isHoldLock(key, value, expire);
}
if (!value.equals(oldValue.getValue())) {
return false;
}
}
// 说明缓存内的值就是当前值
cacheTemplate.string().set(key, nxModel, expire, TimeUnit.MILLISECONDS);
return true;
}
测试代码
@Test
public void testUnionInvoke() throws Exception {
// 模拟6个不同的线程
List<String> ipList = Arrays.asList("liukx_A", "liukx_B", "liukx_C", "liukx_D", "liukx_E", "liukx_F");
// 抢占同一个key
String redisKey = "{USER}_LOCK_UNION_PROCESS";
// 每5秒抢占一次
int defaultSleep = 5000;
AtomicInteger count = new AtomicInteger();
for (int i = 0; i < 6; i++) {
String key = ipList.get(i);
new Thread(() -> {
int index = 0;
while (true) {
try {
String name = Thread.currentThread().getName();
boolean cacheValue = isHoldLock(redisKey, name, (long) (defaultSleep));
index++;
if (cacheValue) {
// 这里表示抢占成功的线程
int invokeCount = count.incrementAndGet();
logger.info("执行业务操作 ==> {} ==> 当前锁被抢次数 : {}", name, invokeCount);
// 抢占3次成功之后,立马挂掉.
if (invokeCount % 3 == 0) {
// count.incrementAndGet(); // 跳过一个批次
logger.info(" 当前批次 :{} {} =================释放执行权利,线程结束", index, name);
break;
}
Thread.sleep(defaultSleep);
} else {
// 抢不到的线程
logger.info("当前批次 :{} - {} 该线程没有抢到,下次再抢. ", index, name);
Thread.sleep((long) (defaultSleep));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, key).start();
}
System.in.read();
}
下面是执行结果:
2020-08-22 11:41:49.438 INFO 39564 --- [liukx_C] : 执行业务操作 ==> liukx_C ==> 当前锁被抢次数 : 1
2020-08-22 11:41:49.473 INFO 39564 --- [liukx_F] : 当前批次 :1 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:41:49.473 INFO 39564 --- [liukx_A] : 当前批次 :1 - liukx_A 该线程没有抢到,下次再抢.
2020-08-22 11:41:49.473 INFO 39564 --- [liukx_B] : 当前批次 :1 - liukx_B 该线程没有抢到,下次再抢.
2020-08-22 11:41:49.473 INFO 39564 --- [liukx_E] : 当前批次 :1 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:41:49.474 INFO 39564 --- [liukx_D] : 当前批次 :1 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:41:54.461 INFO 39564 --- [liukx_C] : 执行业务操作 ==> liukx_C ==> 当前锁被抢次数 : 2
2020-08-22 11:41:54.504 INFO 39564 --- [liukx_A] : 当前批次 :2 - liukx_A 该线程没有抢到,下次再抢.
2020-08-22 11:41:54.513 INFO 39564 --- [liukx_F] : 当前批次 :2 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:41:54.513 INFO 39564 --- [liukx_B] : 当前批次 :2 - liukx_B 该线程没有抢到,下次再抢.
2020-08-22 11:41:54.513 INFO 39564 --- [liukx_E] : 当前批次 :2 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:41:54.513 INFO 39564 --- [liukx_D] : 当前批次 :2 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:41:59.488 INFO 39564 --- [liukx_C] : 执行业务操作 ==> liukx_C ==> 当前锁被抢次数 : 3
2020-08-22 11:41:59.488 INFO 39564 --- [liukx_C] : 当前批次 :3 liukx_C =================释放执行权利,线程结束
2020-08-22 11:41:59.540 INFO 39564 --- [liukx_A] : 当前批次 :3 - liukx_A 该线程没有抢到,下次再抢.
2020-08-22 11:41:59.549 INFO 39564 --- [liukx_F] : 当前批次 :3 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:41:59.549 INFO 39564 --- [liukx_B] : 当前批次 :3 - liukx_B 该线程没有抢到,下次再抢.
2020-08-22 11:41:59.549 INFO 39564 --- [liukx_E] : 当前批次 :3 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:41:59.549 INFO 39564 --- [liukx_D] : 当前批次 :3 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:04.561 INFO 39564 --- [liukx_A] : 执行业务操作 ==> liukx_A ==> 当前锁被抢次数 : 4
2020-08-22 11:42:04.580 INFO 39564 --- [liukx_B] : 当前批次 :4 - liukx_B 该线程没有抢到,下次再抢.
2020-08-22 11:42:04.589 INFO 39564 --- [liukx_D] : 当前批次 :4 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:04.589 INFO 39564 --- [liukx_E] : 当前批次 :4 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:04.589 INFO 39564 --- [liukx_F] : 当前批次 :4 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:42:09.582 INFO 39564 --- [liukx_A] : 执行业务操作 ==> liukx_A ==> 当前锁被抢次数 : 5
2020-08-22 11:42:09.608 INFO 39564 --- [liukx_B] : 当前批次 :5 - liukx_B 该线程没有抢到,下次再抢.
2020-08-22 11:42:09.634 INFO 39564 --- [liukx_F] : 当前批次 :5 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:42:09.642 INFO 39564 --- [liukx_D] : 当前批次 :5 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:09.642 INFO 39564 --- [liukx_E] : 当前批次 :5 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:14.604 INFO 39564 --- [liukx_A] : 执行业务操作 ==> liukx_A ==> 当前锁被抢次数 : 6
2020-08-22 11:42:14.604 INFO 39564 --- [liukx_A] : 当前批次 :6 liukx_A =================释放执行权利,线程结束
2020-08-22 11:42:14.631 INFO 39564 --- [liukx_B] : 当前批次 :6 - liukx_B 该线程没有抢到,下次再抢.
2020-08-22 11:42:14.677 INFO 39564 --- [liukx_F] : 当前批次 :6 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:42:14.686 INFO 39564 --- [liukx_D] : 当前批次 :6 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:14.686 INFO 39564 --- [liukx_E] : 当前批次 :6 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:19.656 INFO 39564 --- [liukx_B] : 执行业务操作 ==> liukx_B ==> 当前锁被抢次数 : 7
2020-08-22 11:42:19.754 INFO 39564 --- [liukx_F] : 当前批次 :7 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:42:19.766 INFO 39564 --- [liukx_D] : 当前批次 :7 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:19.766 INFO 39564 --- [liukx_E] : 当前批次 :7 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:24.679 INFO 39564 --- [liukx_B] : 执行业务操作 ==> liukx_B ==> 当前锁被抢次数 : 8
2020-08-22 11:42:24.779 INFO 39564 --- [liukx_F] : 当前批次 :8 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:42:24.801 INFO 39564 --- [liukx_D] : 当前批次 :8 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:24.812 INFO 39564 --- [liukx_E] : 当前批次 :8 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:29.706 INFO 39564 --- [liukx_B] : 执行业务操作 ==> liukx_B ==> 当前锁被抢次数 : 9
2020-08-22 11:42:29.707 INFO 39564 --- [liukx_B] : 当前批次 :9 liukx_B =================释放执行权利,线程结束
2020-08-22 11:42:29.814 INFO 39564 --- [liukx_F] : 当前批次 :9 - liukx_F 该线程没有抢到,下次再抢.
2020-08-22 11:42:29.841 INFO 39564 --- [liukx_D] : 当前批次 :9 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:29.855 INFO 39564 --- [liukx_E] : 当前批次 :9 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:34.839 INFO 39564 --- [liukx_F] : 执行业务操作 ==> liukx_F ==> 当前锁被抢次数 : 10
2020-08-22 11:42:34.879 INFO 39564 --- [liukx_D] : 当前批次 :10 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:34.887 INFO 39564 --- [liukx_E] : 当前批次 :10 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:39.860 INFO 39564 --- [liukx_F] : 执行业务操作 ==> liukx_F ==> 当前锁被抢次数 : 11
2020-08-22 11:42:39.907 INFO 39564 --- [liukx_D] : 当前批次 :11 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:39.915 INFO 39564 --- [liukx_E] : 当前批次 :11 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:44.897 INFO 39564 --- [liukx_F] : 执行业务操作 ==> liukx_F ==> 当前锁被抢次数 : 12
2020-08-22 11:42:44.897 INFO 39564 --- [liukx_F] : 当前批次 :12 liukx_F =================释放执行权利,线程结束
2020-08-22 11:42:44.952 INFO 39564 --- [liukx_D] : 当前批次 :12 - liukx_D 该线程没有抢到,下次再抢.
2020-08-22 11:42:44.961 INFO 39564 --- [liukx_E] : 当前批次 :12 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:49.983 INFO 39564 --- [liukx_D] : 执行业务操作 ==> liukx_D ==> 当前锁被抢次数 : 13
2020-08-22 11:42:49.989 INFO 39564 --- [liukx_E] : 当前批次 :13 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:42:55.013 INFO 39564 --- [liukx_D] : 执行业务操作 ==> liukx_D ==> 当前锁被抢次数 : 14
2020-08-22 11:42:55.022 INFO 39564 --- [liukx_E] : 当前批次 :14 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:43:00.049 INFO 39564 --- [liukx_D] : 执行业务操作 ==> liukx_D ==> 当前锁被抢次数 : 15
2020-08-22 11:43:00.049 INFO 39564 --- [liukx_D] : 当前批次 :15 liukx_D =================释放执行权利,线程结束
2020-08-22 11:43:00.055 INFO 39564 --- [liukx_E] : 当前批次 :15 - liukx_E 该线程没有抢到,下次再抢.
2020-08-22 11:43:05.077 INFO 39564 --- [liukx_E] : 执行业务操作 ==> liukx_E ==> 当前锁被抢次数 : 16
2020-08-22 11:43:10.100 INFO 39564 --- [liukx_E] : 执行业务操作 ==> liukx_E ==> 当前锁被抢次数 : 17
2020-08-22 11:43:15.125 INFO 39564 --- [liukx_E] : 执行业务操作 ==> liukx_E ==> 当前锁被抢次数 : 18
2020-08-22 11:43:15.125 INFO 39564 --- [liukx_E] : 当前批次 :18 liukx_E =================释放执行权利,线程结束
实际业务中可以将value的值替换成机器的唯一标识,我这里只是做测试。
总结
集群模式下:
- 假设A获得了执行权限,那么BCDEF都默认抢不到。
- 抢到的A在下一次执行的时间比BCDEF都要快那么一丢丢,同时把这个锁的有效期延长。(
)最好是将这个周期错开,避免同一时刻所有线程都在抢
- 如果A在锁延长成功之后,挂掉了。要么redis的key过期了,要么就是比较value的时间戳的时候过期。
- BCDEF开始抢占执行权限,回到第一步。
合理的控制有效时间,来控制你希望下一个周期是同一个人还是其他人同时来抢。当然也要把业务执行时间算到里面去喔~
这是本人的一些思考及实践,如果你有什么更好的方式,欢迎留言,我会在第一时间回复!
谢谢观看。