public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); }}
//定义事件消费者public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者:"+event.getValue()); }}
//定义生产者public class LongEventProducer { public final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 1.ringBuffer 事件队列 下一个槽 long sequence = ringBuffer.next(); Long data = null; try { //2.取出空的事件队列 LongEvent longEvent = ringBuffer.get(sequence); data = byteBuffer.getLong(0); //3.获取事件队列传递的数据 longEvent.setValue(data); try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } finally { System.out.println("生产这准备发送数据"); //4.发布事件 ringBuffer.publish(sequence); } }}
public class DisruptorMain { public static void main(String[] args) { // 1.创建一个可缓存的线程 提供线程来出发Consumer 的事件处理 ExecutorService executor = Executors.newCachedThreadPool(); // 2.创建工厂 EventFactory<LongEvent> eventFactory = new LongEventFactory(); // 3.创建ringBuffer 大小 int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方 // 4.创建Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 5.连接消费端方法 disruptor.handleEventsWith(new LongEventHandler()); // 6.启动 disruptor.start(); // 7.创建RingBuffer容器 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); // 8.创建生产者 LongEventProducer producer = new LongEventProducer(ringBuffer); // 9.指定缓冲区大小 ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (int i = 1; i <= 100; i++) { byteBuffer.putLong(0, i); producer.onData(byteBuffer); } //10.关闭disruptor和executor disruptor.shutdown(); executor.shutdown(); }}
推荐阅读
- Tinyid 深度解密滴滴的高性能ID生成器
- Java 并发基础之内存模型
- 网络高并发
- 10大高性能开发宝石,我要消灭一半程序员
- Linux定时器 - 高性能定时器
- 抢鲜!阿里架构师私藏并发编程笔记,公开前半段秒获8K标星
- JAVA并发-AtomicInteger
- 如何进行Jmeter多接口指定TPS同时并发?
- 喝什么茶能提高性能力,喝什么茶能软化血管呢
- M.2接口SSD就是高性能存储的代名词?不同插槽代表不同速度