首先是定义一个抽象类,实现runnable接口
public abstract class task implements runnable {
public task(){}
}
public class taskevent {
private task tk;
public task gettask() {
return tk;
public void settask(task tk) {
this.tk = tk;
public final static eventfactory<taskevent> event_factory = new eventfactory<taskevent>() {
public taskevent newinstance() {
return new taskevent();
};
public class taskeventhandler implements eventhandler<taskevent> {
// 执行接口函数onevent执行
public void onevent(taskevent event, long sequence,
boolean endofbatch) throws exception {
event.gettask().run();
import java.util.concurrent.executors;
import java.util.concurrent.scheduledexecutorservice;
import java.util.concurrent.timeunit;
import com.zhenhai.bonecp.customthreadfactory;
import com.zhenhai.disruptor.batcheventprocessor;
import com.zhenhai.disruptor.ringbuffer;
import com.zhenhai.disruptor.sequencebarrier;
import com.zhenhai.disruptor.yieldingwaitstrategy;
import com.zhenhai.disruptor.dsl.producertype;
/**
* 使用方法
disruptorhelper.initandstart();
task tt=new taska();
disruptorhelper.produce(tt);
disruptorhelper.shutdown();
*
*/
public class disruptorhelper {
* ringbuffer容量,最好是2的n次方
private static final int buffer_size = 1024 * 1;
private static int group=2;
private ringbuffer<taskevent> ringbuffer[];
private sequencebarrier sequencebarrier[];
private taskeventhandler handler[];
private batcheventprocessor<taskevent> batcheventprocessor[];
private static disruptorhelper instance;
private static boolean inited = false;
private static scheduledexecutorservice tasktimer=null;
//jdk 创建一个使用单个 worker 线程的 executor,以无界队列方式来运行该线程。
private executorservice execute[];
//启动监视线程
static {
system.out.println("init disruptorhelper!!!!!!!!!!!!!!!!!");
instance = new disruptorhelper();
instance.init();
inited = true;
system.out.println("init disruptorhelper end!!!!!!!!!!!!!!!!!");
**
* 静态类
* @return
private disruptorhelper(){ }
* 初始化
private void init(){
execute=new executorservice[group];
ringbuffer=new ringbuffer[group];
sequencebarrier=new sequencebarrier[group];
handler=new taskeventhandler[group];
batcheventprocessor=new batcheventprocessor[group];
////////////////定时执行////////////////
//初始化ringbuffer,存放event
for(int i=0;i<group;i++){
ringbuffer[i] = ringbuffer.create(producertype.single, taskevent.event_factory, buffer_size, new yieldingwaitstrategy());
sequencebarrier[i] = ringbuffer[i].newbarrier();
handler[i] = new taskeventhandler();
batcheventprocessor[i] = new batcheventprocessor<taskevent>(ringbuffer[i], sequencebarrier[i], handler[i]);
ringbuffer[i].addgatingsequences(batcheventprocessor[i].getsequence());
execute[i]= executors.newsinglethreadexecutor();
execute[i].submit(instance.batcheventprocessor[i]);
this.tasktimer = executors.newscheduledthreadpool(10, new customthreadfactory("disruptorhelper-scheduler", true));
* 执行定时器
* @param tk
private void produce(int index,task tk){
//system.out.println("index:="+index);
if(index<0||index>=group) {
system.out.println("out of group index:="+index);
return;
// if capacity less than 10%, don't use ringbuffer anymore
system.out.println("capacity:="+ringbuffer[index].remainingcapacity());
if(ringbuffer[index].remainingcapacity() < buffer_size * 0.1) {
system.out.println("disruptor:ringbuffer avaliable capacity is less than 10 %");
// do something
}else {
long sequence = ringbuffer[index].next();
//将状态报告存入ringbuffer的该序列号中
ringbuffer[index].get(sequence).settask(tk);
//通知消费者该资源可以消费
ringbuffer[index].publish(sequence);
* 获得容器的capacity的数量
* @param index
private long remainingcapacity(int index){
return 0l;
long capacity= ringbuffer[index].remainingcapacity();
return capacity;
private void shutdown0(){
execute[i].shutdown();
////////////////////////////////下面是静态方法提供调用////////////////////////////////////////////////////////
* 直接消费
public static void addtask(int priority,task tk){
instance.produce(priority,tk);
* 定时消费
* @param delay
* @param period
public static void scheduletask(int priority,task tk,long delay,long period){
runnable timertask = new scheduledtask(priority, tk);
tasktimer.scheduleatfixedrate(timertask, delay, period, timeunit.milliseconds);
* 定点执行
* @param hourse
* @param minus
* @param sec
public static runnable scheduletask(int priority,task tk, int hourse,int minus,int sec)
{
//每天2:30分执行
long delay = helper.calcdelay(hourse,minus,sec);
long period = helper.one_day;
system.out.println("delay:"+(delay/1000)+"secs");
return timertask;
//对定时执行的程序进行分装
private static class scheduledtask implements runnable
private int priority;
private task task;
scheduledtask(int priority, task task)
this.priority = priority;
this.task = task;
public void run()
try{
instance.produce(priority,task);
}catch(exception e){
system.out.println("catch exception in disruptorhelper!");
public static long getremainingcapatiye(int index){
return instance.getremainingcapatiye(index);
public static void shutdown(){
if(!inited){
throw new runtimeexception("disruptor还没有初始化!");
instance.shutdown0();
最新内容请见作者的github页:http://qaseven.github.io/