天天看点

如何实现漏桶算法与令牌桶算法

作者:大数据孟小鹏(Java架构沉思录做了部分修改)

Java中对于生产者消费者模型,或者小米手机营销(1分钟卖多少台手机)等都存在限流的思想在里面。

关于限流目前存在两大类:从线程并发数角度(jdk1.5 Semaphore)限流和从速率限流(guava)。

Semaphore:从线程并发数限流。

RateLimiter:从速率限流。目前常见的算法是漏桶算法和令牌算法。

令牌桶算法。相比漏桶算法而言区别在于,令牌桶是会去匀速的生成令牌,拿到令牌才能够进行处理,类似于匀速往桶里放令牌。

漏桶算法是:生产者消费者模型,生产者往木桶里生产数据,消费者按照预先定义的速度去消费数据。

应用场景:

漏桶算法:必须读写分离的情况下,限制读取的速度。

令牌桶算法:必须读写分离的情况下,限制写的速率。

实现的方法都是一样的,通过RateLimiter来实现。

对于多线程场景下,很多时候使用的类都是原子性的,但是由于代码逻辑的原因,也可能发生线程安全问题。

1. 关于RateLimter和Semphore简单用法

package concurrent; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.*; import java.util.stream.IntStream; import static java.lang.Thread.currentThread; /** * ${DESCRIPTION} * 关于限流 目前存在两大类,从线程个数(jdk1.5 Semaphore)和RateLimiter速率(guava) * Semaphore:从线程个数限流 * RateLimiter:从速率限流  目前常见的算法是漏桶算法和令牌算法,下面会具体介绍 * * @author mengxp * @version 1.0 * @create 2018-01-15 22:44 **/ public class RateLimiterExample {   //Guava  0.5的意思是 1秒中0.5次的操作,2秒1次的操作  从速度来限流,从每秒中能够执行的次数来    private final static RateLimiter limiter=RateLimiter.create(0.5d);    //同时只能有三个线程工作 Java1.5  从同时处理的线程个数来限流    private final static Semaphore sem=new Semaphore(3);    private static void testSemaphore(){        try {            sem.acquire();            System.out.println(currentThread().getName()+" is doing work...");            TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(10));        } catch (InterruptedException e) {            e.printStackTrace();        }finally {            sem.release();            System.out.println(currentThread().getName()+" release the semephore..other thread can get and do job");        }    }    public static void runTestSemaphore(){        ExecutorService service = Executors.newFixedThreadPool(10);        IntStream.range(0,10).forEach((i)->{            //RateLimiterExample::testLimiter 这种写法是创建一个线程            service.submit(RateLimiterExample::testSemaphore);        });    }    /**     * Guava的RateLimiter     */    private static void testLimiter(){        System.out.println(currentThread().getName()+" waiting  " +limiter.acquire());    }    //Guava的RateLimiter    public static void runTestLimiter(){        ExecutorService service = Executors.newFixedThreadPool(10);        IntStream.range(0,10).forEach((i)->{            //RateLimiterExample::testLimiter 这种写法是创建一个线程            service.submit(RateLimiterExample::testLimiter);        });    }    public static void main(String[] args) {        IntStream.range(0,10).forEach((a)-> System.out.println(a));//从0-9        //runTestLimiter();        runTestSemaphore();    } }

2. 实现漏桶算法

package concurrent.BucketAl; import com.google.common.util.concurrent.Monitor; import com.google.common.util.concurrent.RateLimiter; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static java.lang.Thread.currentThread; /** * ${DESCRIPTION} * * @author mengxp * @version 1.0 * @create 2018-01-20 22:42 * 实现漏桶算法 实现多线程生产者消费者模型 限流 **/ public class Bucket {    //定义桶的大小    private final ConcurrentLinkedQueue<Integer> container=new ConcurrentLinkedQueue<>();    private final static int  BUCKET_LIMIT=1000;    //消费者 不论多少个线程,每秒最大的处理能力是1秒中执行10次    private final RateLimiter consumerRate=RateLimiter.create(10d);    //往桶里面放数据时,确认没有超过桶的最大的容量    private Monitor offerMonitor=new Monitor();    //从桶里消费数据时,桶里必须存在数据    private Monitor consumerMonitor=new Monitor();    /**     * 往桶里面写数据     * @param data     */    public void submit(Integer data){        if (offerMonitor.enterIf(offerMonitor.newGuard(()->container.size()<BUCKET_LIMIT))){            try {                container.offer(data);                System.out.println(currentThread()+" submit.."+data+" container size is :["+container.size()+"]");            } finally {                offerMonitor.leave();            }        }else {            //这里时候采用降级策略了。消费速度跟不上产生速度时,而且桶满了,抛出异常            //或者存入MQ DB等后续处理            throw new IllegalStateException(currentThread().getName()+"The bucket is ful..Pls latter can try...");        }    }    /**     * 从桶里面消费数据     * @param consumer     */    public void takeThenConsumer(Consumer<Integer> consumer){        if (consumerMonitor.enterIf(consumerMonitor.newGuard(()->!container.isEmpty()))){            try {                //不打印时 写 consumerRate.acquire();                System.out.println(currentThread()+"  waiting"+consumerRate.acquire());                Integer data = container.poll();                //container.peek() 只是去取出来不会删掉                consumer.accept(data);            }finally {                consumerMonitor.leave();            }        }else {            //当木桶的消费完后,可以消费那些降级存入MQ或者DB里面的数据            System.out.println("will consumer Data from MQ...");            try {                TimeUnit.SECONDS.sleep(10);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    } }

2.1 漏桶算法测试类

package concurrent.BucketAl; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import static java.lang.Thread.currentThread; /** * ${DESCRIPTION} * * @author mengxp * @version 1.0 * @create 2018-01-20 23:11 * 漏桶算法测试 * 实现漏桶算法 实现多线程生产者消费者模型 限流 **/ public class BuckerTest {    public static void main(String[] args) {        final Bucket bucket = new Bucket();        final AtomicInteger DATA_CREATOR = new AtomicInteger(0);        //生产线程 10个线程 每秒提交 50个数据  1/0.2s*10=50个        IntStream.range(0, 10).forEach(i -> {            new Thread(() -> {                for (; ; ) {                    int data = DATA_CREATOR.incrementAndGet();                    try {                        bucket.submit(data);                        TimeUnit.MILLISECONDS.sleep(200);                    } catch (Exception e) {                        //对submit时,如果桶满了可能会抛出异常                        if (e instanceof IllegalStateException) {                            System.out.println(e.getMessage());                            //当满了后,生产线程就休眠1分钟                            try {                                TimeUnit.SECONDS.sleep(60);                            } catch (InterruptedException e1) {                                e1.printStackTrace();                            }                        }                    }                }            }).start();        });        //消费线程  采用RateLimiter每秒处理10个  综合的比率是5:1        IntStream.range(0, 10).forEach(i -> {            new Thread(                    () -> {                        for (; ; ) {                            bucket.takeThenConsumer(x -> {                                System.out.println(currentThread()+"C.." + x);                            });                        }                    }            ).start();        });    } }

3. 令牌桶算法