高性能无锁并发框架Disruptor,太强了( 二 )

public class LongEventFactory implements EventFactory<LongEvent> {    public LongEvent newInstance() {        return new LongEvent();    }}//定义事件消费者public class LongEventHandler implements EventHandler<LongEvent>  {    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {         System.out.println("消费者:"+event.getValue());    }}//定义生产者public class LongEventProducer {    public final RingBuffer<LongEvent> ringBuffer;    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {        this.ringBuffer = ringBuffer;    }    public void onData(ByteBuffer byteBuffer) {        // 1.ringBuffer 事件队列 下一个槽        long sequence = ringBuffer.next();        Long data = null;        try {            //2.取出空的事件队列            LongEvent longEvent = ringBuffer.get(sequence);            data = byteBuffer.getLong(0);            //3.获取事件队列传递的数据            longEvent.setValue(data);            try {                Thread.sleep(10);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        } finally {            System.out.println("生产这准备发送数据");            //4.发布事件            ringBuffer.publish(sequence);        }    }}public class DisruptorMain {    public static void main(String[] args) {        // 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理        ExecutorService executor = Executors.newCachedThreadPool();        // 2.创建工厂        EventFactory<LongEvent> eventFactory = new LongEventFactory();        // 3.创建ringBuffer 大小        int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方        // 4.创建Disruptor        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,                ProducerType.SINGLE, new YieldingWaitStrategy());        // 5.连接消费端方法        disruptor.handleEventsWith(new LongEventHandler());        // 6.启动        disruptor.start();        // 7.创建RingBuffer容器        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();        // 8.创建生产者        LongEventProducer producer = new LongEventProducer(ringBuffer);        // 9.指定缓冲区大小        ByteBuffer byteBuffer = ByteBuffer.allocate(8);        for (int i = 1; i <= 100; i++) {            byteBuffer.putLong(0, i);            producer.onData(byteBuffer);        }        //10.关闭disruptor和executor        disruptor.shutdown();        executor.shutdown();    }}


推荐阅读