天天看点

利用BlockingQueue进行批量操作

1 背景

我曾接触一个项目,业务数据用Hbase存储,为了方便查询统计,采用elasticsearch作为查询库,但直接操作elasticsearch对性能有致命的影响。于是引用了rabbitmq + spring-stream来实现削峰操作。

一开始,接收到一条消息后便将其插入到elasticsearch,但消费太慢,于是改用调用ES批量操作的接口,且用异步多线程的方式插入。

2 方案

用生产者消费者的方式实现,接收到消息后,立即将消息插入到本地消息队列。另启几个线程消费线程消费消息,消费线程在消息达到一定数量,或等待一定时间后,会执行插入操作。

3 实现

java.util.concurrent.BlockingQueue,因为其自身是线程安全类,所以是实现生产者消费者方案的首选,而且除了阻塞功能外,还自带有timeout功能。两种功能相互配合,可以实现很好的批量操作功能。

以下是api文档中对BlockingQueue的描述。你可以看到三种类型的操作:insert,remove, examine。且每种都有在达到条件时采取的四种策略。

Summary of BlockingQueue methods

Throws exception Special value Blocks Times out
Insert add add(e) offer offer(e) put put(e) offer(Object, long, TimeUnit) offer(e, time, unit)
Remove remove remove() poll poll() take take() poll(long, TimeUnit) poll(time, unit)
Examine element element() peek peek() not applicable not applicable

我在里需要用到的是 Remove 的Block与Timeout

4 代码展示

import jdk.nashorn.internal.ir.Block;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class TestBolckQueue {

    public static class Message{
        private int id;
        private String name;

        public Message(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Message{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }

    /**
     * 消息队列
      */
   public final static BlockingQueue<Message> queue = new LinkedBlockingDeque(100);

    /***
     * 消息生产者
     */
    public  static Runnable product = ()->{
        int i =0;
        Random random = new Random();
        while(true){
            try {
                long sleep = random.nextInt(1000);
                System.out.println("睡眠:"+sleep+"豪秒");
                Thread.sleep(sleep);
                queue.put(new Message(i++,"msg"+i));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };


    /**
     * 消费者
     */
    public static class Consumer implements Runnable{

        /***
         * 批量操作的最大条数
         */
        private static int MAX_BATCH_SIZE = 10;

        /**
         * 等徒时长
         */
        private static long MAX_WAIT_TIME = 700;

        private BlockingQueue<Message> queue;

        public Consumer(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                loop();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void loop() throws InterruptedException {
            Message message ;
            List<Message> willSend ;
            while(true){
                willSend = new ArrayList<>(10);
                message = this.queue.take();    //take() 是个阻塞操作,当队列无消息时,线程在此阻塞
                willSend.add(message);
                int i = 0;
                while( ++i < MAX_BATCH_SIZE &&
                        (message = queue.poll(MAX_WAIT_TIME, TimeUnit.MILLISECONDS)) != null){ // poll是一个等待的操作,当等
                    willSend.add(message);                                                     // 待 MAX_WAIT_TIME 后未能获
                }                                                                              // 取消息,返回 null , 停止等待

                send(willSend);

            }


        }

        protected void send(List<Message> willSend){
            System.out.println("发送"+willSend.size()+"条消息");
            willSend.forEach(System.out::println);
            System.out.println("=============发送完成========");
        }
    }




    public static void main(String[] args) {
        new Thread(product).start();
        new Thread(new Consumer(queue)).start();
    }
    
}

           
利用BlockingQueue进行批量操作