Java并发编程框架Disruptor

Disruptor是什么?Disruptor是一个高性能的异步处理框架,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”(这有点吓人吧),并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后再交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信) 。本文将使用disruptor最新版3.3.6进行介绍,可以在https://github.com/LMAX-Exchange/disruptor/releases 下载最新的JAR包开始disruptor之旅吧 。
轮胎:RingBuffer
RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节 。如下图所示:

Java并发编程框架Disruptor

文章插图
 
数组
这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多 。
序号
【Java并发编程框架Disruptor】RingBuffer中元素拥有序号的概念,并且序号是一直增长的,这怎么理解?比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素 。也即是说通过 序号%SIZE 来定位元素,实现set/get操作 。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收 。
由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方 。
无锁的机制
在生产者/消费者模式下,disruptor号称“无锁并行框架”(要知道BlockingQueue是利用了Lock锁机制来实现的),这是怎么做到的呢?下面我们来具体分析下:
一个生产者 + 一个消费者
生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号 。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小 。
一个生产者 + 多个消费者
多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念 。此时也不需要进行锁定 。
多个生产者 + N个消费者
很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了 。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁 。
Disruptor初体验:简单的生产者和消费者
业务数据对象POJO(Event)
public class Order {//订单IDprivate long id;//订单信息private String info;//订单价格private double price;public long getId() {return id;}public void setId(long id) {this.id = id;}public String getInfo() {return info;}public void setInfo(String info) {this.info = info;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}}业务数据工厂(Factory)
public class OrderFactory implements EventFactory{@Overridepublic Object newInstance() {System.out.println("OrderFactory.newInstance");return new Order();}}事件处理器(Handler,即消费者处理逻辑)
public class OrderHandler implements EventHandler<Order>{@Overridepublic void onEvent(Order order, long l, boolean b) throws Exception {System.out.println(Thread.currentThread().getName() + " 消费者处理中:" + l);order.setInfo("info" + order.getId());order.setPrice(Math.random());}}Main
public class Main {public static void main(String[] args) throws InterruptedException {//创建订单工厂OrderFactory orderFactory = new OrderFactory();//ringbuffer的大小int RINGBUFFER_SIZE = 1024;//创建disruptorDisruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());//设置事件处理器 即消费者disruptor.handleEventsWith(new OrderHandler());disruptor.start();RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();//-------------生产数据for(int i = 0 ; i < 3 ; i++){long sequence = ringBuffer.next();Order order = ringBuffer.get(sequence);order.setId(i);ringBuffer.publish(sequence);System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);}Thread.sleep(1000);disruptor.shutdown();}}


推荐阅读