Java并发编程框架Disruptor( 二 )

运行结果:

Java并发编程框架Disruptor

文章插图
 
说明:
其实上面的结果已经很明显的说明了,在初始阶段构造Disruptor的时候,会调用工厂Factory去实例化RingBuffer中的Event数据对象 。
另外在构造Disruptor的时候,在3.3.6之前使用的是API:
Java并发编程框架Disruptor

文章插图
 
到了3.3.6这些API都不推荐使用了,即不再推荐传入Executor这样的线程池,而是推荐传入ThreadFactory线程工厂 。这样的话,关闭disruptor就会自动关闭Executor线程池,而不需要像以前那样必须在关闭disruptor的时候再关闭线程池了 。
构造Disruptor时,需要注意ProducerType(SINGLE or MULTI 指示是单个生产者还是多个生产者模式)、WaitStrategy(策略选择,决定了消费者如何等待生产者) 。
Java并发编程框架Disruptor

文章插图
 
单独使用RingBuffer:WorkerPool
如果场景比较简单,我们完全可以不用创建Disruptor,而是仅仅使用RingBuffer功能 。
public static void main(String[] args) throws InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(3);RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.SINGLE,new OrderFactory(),1024,new YieldingWaitStrategy());WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,ringBuffer.newBarrier(),new IgnoreExceptionHandler(),new OrderHandler());workerPool.start(executor);//-------------生产数据for(int i = 0 ; i < 30 ; i++){long sequence = ringBuffer.next();Order order = ringBuffer.get(sequence);order.setId(i);ringBuffer.publish(sequence);System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);}Thread.sleep(1000);workerPool.halt();executor.shutdown();}实际上是利用WorkerPool辅助连接消费者 。
一个生产者+多个消费者
Java并发编程框架Disruptor

文章插图
 
public static void main(String[] args) throws InterruptedException {//创建订单工厂OrderFactory orderFactory = new OrderFactory();//ringbuffer的大小int RINGBUFFER_SIZE = 1024;//创建disruptorDisruptor<Order> disruptor = new Disruptor<Order>(orderFactory,RINGBUFFER_SIZE,Executors.defaultThreadFactory());//设置事件处理器 即消费者EventHandlerGroup<Order> eventHandlerGroup = disruptor.handleEventsWith(new OrderHandler(),new OrderHandler2());eventHandlerGroup.then(new OrderHandler3());disruptor.start();RingBuffer<Order> ringBuffer = disruptor.getRingBuffer();//-------------生产数据for(int i = 0 ; i < 3 ; i++){long sequence = ringBuffer.next();Order order = ringBuffer.get(sequence);order.setId(i);ringBuffer.publish(sequence);System.out.println(Thread.currentThread().getName() + " 生产者发布一条数据:" + sequence + " 订单ID:" + i);}Thread.sleep(1000);disruptor.shutdown();}运行结果:
Java并发编程框架Disruptor

文章插图
 
生产者生产了3条消息,一个消费者线程1消费了这3条数据,另一个消费者线程2也消费了这3条数据,2者是并行的,待消费者线程1和2完毕后,3条数据交给消费者线程3处理 。
如果我们想顺序的按照A->B->C呢?
disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());如果我们想六边形操作呢?
Java并发编程框架Disruptor

文章插图
 
Handler1 h1 = new Handler1();Handler2 h2 = new Handler2();Handler3 h3 = new Handler3();Handler4 h4 = new Handler4();Handler5 h5 = new Handler5();disruptor.handleEventsWith(h1, h2);disruptor.after(h1).handleEventsWith(h4);disruptor.after(h2).handleEventsWith(h5);disruptor.after(h4, h5).handleEventsWith(h3);到这里相信你对Disruptor已经有所了解了,那么多个生产者多个消费者如何实现呢,其实和上面的代码非常类似,无非是多个生产者都持有RingBuffer可以publish而已 。




推荐阅读