记录一次千万级别数据通过接口在线同步迁移的case
首先描述一下背景:
应大王要求,周边各个寨子需要像大王统一汇聚本寨子已实名注册用户资质数据,云南寨已注册自然人用户总数2800余万,陕西寨1500余万,皆需要通过在线接口同步至大王的平台,是的 你没听错,总共4300多w的数据,都要通过接口的方式,单条单条的给大王怀里塞。疯了?不能通过库对库的方式推数吗?是的,不能,因为不安全。ok,那让我们先理下思路。
首先,肯定要通过多线程的方式编写程序,不然单条单条同步的话,估计一个月都不用干其他的事了,我倒是没问题,领导能活劈了我。
好的,基于多线程,那么我们继续往下走
- 第一,要考虑到任务分配的问题,不同的节点各同步哪些数据
- 第二,要考虑到断点续传,谁也不能保证服务器会不会开小差,传了几百万,崩一下子就重新开始了?
- 第三,要考虑到异常暂停,失败数量到了指定阀值就暂停程序,如果跟国办之间没商量好就开始传,全部传完了发现国办那边一条没收着!掀桌ing,玩不起玩不起
- 第四,剩下的就是一些使程序更加人性化的功能,添加开关对整个程序的运行加已控制。添加时间控制器,指定每天开始同步的时间和结束同步的时间。添加异常分析日志。
ok,该考虑到的差不多都考虑到了,实际研发中肯定还有各种问题,go on!
包结构

流程图
大概描述下上传流程
代码实现
知道了大概流程,那么我们一起来分析实现
- 首先进入请求控制,确保当前节点上传任务不会重复执行
- 然后打开监控,时间监控,开关监控,以及异常阀值监控
监控线程包括当前监控线程,以及主线程工具类,以及当前线程执行动作,该动作在初始化监控时指定为Consumer动作函数
打开监控
Consumer函数举一个开关的例子吧
- 然后获取到本节点开始上传位置以及每批数量后,根据偏移量offset计算出当前节点上传开始位置
开始上传
这里是真正的上传动作
uploadHandler中包含当前批次的信息,如下
- 上传者根据当前批次信息查到需要上传的用户数据后,交由传输者进行上传
这里是通过循环开始之前开始创建的多线程service来执行上传的,upload是当前批次单条用户数据的传输者,里面包含用户所有资质信息
为了确保当前批次所有数据上传完毕再进行下一个批次,这里使用了一个闭锁
同步完成后,注意把当前批次的上传状态同步到数据库里,每批次的上传结果是放到一个线程安全的list中的(如下图),UploadResult保存着本次上传用户的行号以及上传状态,在本批次批量同步到数据库后进行clear清空操作,确保不会与下一批次同步状态混淆。
uploader是一个用来执行上传动作的线程,由于架构设计本工程不做网络交互,所以将上传动作放到另一个工程里
另一个工程中就只有上传动作了,通过单例获取上传client
记得再client连接失败时进行重试
- 然后等所有批次用户都上传完毕后,进行最后的收尾动作
收尾主要包括关闭上传线程池,结束监控线程,重置上传各项阀值
关闭监控
至此,代码部分已经讲解完毕。
上传效率
有的同学就要讲了,每个服务器的性能不一样,怎么控制上传的效率呢,效率太高服务器撑不住,效率太低又跑的很慢。你且听我慢慢道来
控制上传的效率有两个点
- 在打开监听线程时会初始化本次上传用到的线程池,这里我用的是一个固定数量线程时,线程的数量存放再在库中,根据top(linux)查看服务器性能,酌情配置
千万级多线程接口同步 - 最重要的控制性能的办法,是在当前批次真正开启线程执行上传的时候,通过sleep()适当的对cpu的时间片进行干扰,设置正在(50-100)之前较为合理。这里设置为-1就是不干扰cpu时间片,此时服务器如一头凶猛的野兽,cpu飙升至100%,每分钟上传10w左右, 2000w数据3小时上传完毕。
结语
以上就是本次千万量级数据多线程接口上传的全部内容,如果有不对的地方,还请大神指出,不胜感激。
天道酬勤