天天看点

谈disruptor的单线程数据库操作

  首先是定义一个抽象类,实现runnable接口

public abstract class  task implements runnable  {

public task(){}

}

public class taskevent {

private task tk;

public task gettask() {

return tk;

public void settask(task tk) {

this.tk = tk;

public final static eventfactory<taskevent> event_factory = new eventfactory<taskevent>() {

public taskevent newinstance() {

return new taskevent();

};

public class taskeventhandler implements eventhandler<taskevent> {

//  执行接口函数onevent执行

public void onevent(taskevent event, long sequence,

boolean endofbatch) throws exception {

event.gettask().run();

import java.util.concurrent.executors;

import java.util.concurrent.scheduledexecutorservice;

import java.util.concurrent.timeunit;

import com.zhenhai.bonecp.customthreadfactory;

import com.zhenhai.disruptor.batcheventprocessor;

import com.zhenhai.disruptor.ringbuffer;

import com.zhenhai.disruptor.sequencebarrier;

import com.zhenhai.disruptor.yieldingwaitstrategy;

import com.zhenhai.disruptor.dsl.producertype;

/**

*     使用方法

disruptorhelper.initandstart();

task tt=new taska();

disruptorhelper.produce(tt);

disruptorhelper.shutdown();

*

*/

public class disruptorhelper {

* ringbuffer容量,最好是2的n次方

private static final int buffer_size = 1024 * 1;

private static int group=2;

private ringbuffer<taskevent> ringbuffer[];

private sequencebarrier sequencebarrier[];

private taskeventhandler handler[];

private batcheventprocessor<taskevent> batcheventprocessor[];

private  static disruptorhelper instance;

private static boolean inited = false;

private static scheduledexecutorservice tasktimer=null;

//jdk 创建一个使用单个 worker 线程的 executor,以无界队列方式来运行该线程。

private    executorservice execute[];

//启动监视线程

static {

system.out.println("init disruptorhelper!!!!!!!!!!!!!!!!!");

instance = new disruptorhelper();

instance.init();

inited = true;

system.out.println("init disruptorhelper end!!!!!!!!!!!!!!!!!");

**

* 静态类

* @return

private disruptorhelper(){ }

* 初始化

private void init(){

execute=new executorservice[group];

ringbuffer=new ringbuffer[group];

sequencebarrier=new sequencebarrier[group];

handler=new taskeventhandler[group];

batcheventprocessor=new batcheventprocessor[group];

////////////////定时执行////////////////

//初始化ringbuffer,存放event

for(int i=0;i<group;i++){

ringbuffer[i] = ringbuffer.create(producertype.single, taskevent.event_factory, buffer_size, new yieldingwaitstrategy());

sequencebarrier[i] = ringbuffer[i].newbarrier();

handler[i] = new taskeventhandler();

batcheventprocessor[i] = new batcheventprocessor<taskevent>(ringbuffer[i], sequencebarrier[i], handler[i]);

ringbuffer[i].addgatingsequences(batcheventprocessor[i].getsequence());

execute[i]= executors.newsinglethreadexecutor();

execute[i].submit(instance.batcheventprocessor[i]);

this.tasktimer =  executors.newscheduledthreadpool(10, new customthreadfactory("disruptorhelper-scheduler", true));

* 执行定时器

* @param tk

private void produce(int index,task tk){

//system.out.println("index:="+index);

if(index<0||index>=group) {

system.out.println("out of group index:="+index);

return;

// if capacity less than 10%, don't use ringbuffer anymore

system.out.println("capacity:="+ringbuffer[index].remainingcapacity());

if(ringbuffer[index].remainingcapacity() < buffer_size * 0.1) {

system.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");

// do something

}else {

long sequence = ringbuffer[index].next();

//将状态报告存入ringbuffer的该序列号中

ringbuffer[index].get(sequence).settask(tk);

//通知消费者该资源可以消费

ringbuffer[index].publish(sequence);

* 获得容器的capacity的数量

* @param index

private long  remainingcapacity(int index){

return 0l;

long capacity= ringbuffer[index].remainingcapacity();

return capacity;

private void shutdown0(){

execute[i].shutdown();

////////////////////////////////下面是静态方法提供调用////////////////////////////////////////////////////////

* 直接消费

public static void addtask(int priority,task tk){

instance.produce(priority,tk);

* 定时消费

* @param delay

* @param period

public static void scheduletask(int priority,task tk,long delay,long period){

runnable timertask = new scheduledtask(priority, tk);

tasktimer.scheduleatfixedrate(timertask, delay, period, timeunit.milliseconds);

* 定点执行

* @param hourse

* @param minus

* @param sec

public static runnable scheduletask(int priority,task tk, int hourse,int minus,int sec)

{

//每天2:30分执行

long delay = helper.calcdelay(hourse,minus,sec);

long period = helper.one_day;

system.out.println("delay:"+(delay/1000)+"secs");

return timertask;

//对定时执行的程序进行分装

private static class scheduledtask implements runnable

private int priority;

private task task;

scheduledtask(int priority, task task)

this.priority = priority;

this.task = task;

public void run()

try{

instance.produce(priority,task);

}catch(exception e){

system.out.println("catch exception in disruptorhelper!");

public static long getremainingcapatiye(int index){

return instance.getremainingcapatiye(index);

public static void shutdown(){

if(!inited){

throw new runtimeexception("disruptor还没有初始化!");

instance.shutdown0();

最新内容请见作者的github页:http://qaseven.github.io/