Index: src/perf/com/lmax/disruptor/support/ValueAdditionQueueProcessor.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/perf/com/lmax/disruptor/support/ValueAdditionQueueProcessor.java	(revision 603)
+++ src/perf/com/lmax/disruptor/support/ValueAdditionQueueProcessor.java	(revision )
@@ -15,6 +15,8 @@
  */
 package com.lmax.disruptor.support;
 
+import java.io.IOException;
+import java.net.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 
@@ -28,11 +30,28 @@
     private final BlockingQueue<Long> blockingQueue;
     private final long count;
 
+    private final boolean doRealWork = Boolean.getBoolean("doRealWork");
+    private final InetAddress toAddr;
+    private final DatagramSocket s;
+    private final byte[] buffer = new byte[1024];
+
     public ValueAdditionQueueProcessor(final BlockingQueue<Long> blockingQueue, final long count)
     {
         this.blockingQueue = blockingQueue;
         this.count = count;
+
+        if(doRealWork) {
+            try {
+                s = new DatagramSocket(null);
+                toAddr = InetAddress.getByName("127.0.0.1");
+            } catch(Exception e){
+                throw new RuntimeException("unable to create socket",e);
-    }
+            }
+        } else {
+            s=null;
+            toAddr=null;
+        }
+    }
 
     public long getValue()
     {
@@ -61,6 +80,19 @@
             {
                 long value = blockingQueue.take().longValue();
                 this.value += value;
+
+                if(doRealWork) {
+                    // send a simple multi-cast packet
+
+                    DatagramPacket p = new DatagramPacket(buffer, buffer.length);
+                    p.setAddress(toAddr);
+                    p.setPort(10000);
+                    try {
+                        s.send(p);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
 
                 if (sequence++ == count)
                 {
Index: src/perf/com/lmax/disruptor/support/ValueAdditionEventHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/perf/com/lmax/disruptor/support/ValueAdditionEventHandler.java	(revision 603)
+++ src/perf/com/lmax/disruptor/support/ValueAdditionEventHandler.java	(revision )
@@ -18,6 +18,7 @@
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.util.PaddedLong;
 
+import java.net.*;
 import java.util.concurrent.CountDownLatch;
 
 public final class ValueAdditionEventHandler implements EventHandler<ValueEvent>
@@ -31,6 +32,26 @@
         return value.get();
     }
 
+    private final boolean doRealWork = Boolean.getBoolean("doRealWork");
+
+    private final InetAddress toAddr;
+    private final DatagramSocket s;
+    private final byte[] buffer = new byte[1];
+
+    public ValueAdditionEventHandler() {
+        if(doRealWork) {
+            try {
+                s = new DatagramSocket(null);
+                toAddr = InetAddress.getByName("127.0.0.1");
+            } catch(Exception e){
+                throw new RuntimeException("unable to create socket",e);
+            }
+        } else {
+            s=null;
+            toAddr=null;
+        }
+    }
+
     public void reset(final CountDownLatch latch, final long expectedCount)
     {
         value.set(0L);
@@ -42,6 +63,19 @@
     public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
     {
         value.set(value.get() + event.getValue());
+
+        if(doRealWork) {
+            // send a simple multi-cast packet
+            DatagramPacket p = new DatagramPacket(buffer, buffer.length);
+            p.setAddress(toAddr);
+            p.setPort(10000);
+
+            try {
+                s.send(p);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
 
         if (count == sequence)
         {
Index: src/perf/com/lmax/disruptor/OnePublisherToOneProcessorUniCastBatchThroughputTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/perf/com/lmax/disruptor/OnePublisherToOneProcessorUniCastBatchThroughputTest.java	(revision 603)
+++ src/perf/com/lmax/disruptor/OnePublisherToOneProcessorUniCastBatchThroughputTest.java	(revision )
@@ -17,6 +17,7 @@
 
 import com.lmax.disruptor.support.PerfTestUtil;
 import com.lmax.disruptor.support.ValueAdditionEventHandler;
+import com.lmax.disruptor.support.ValueAdditionQueueProcessor;
 import com.lmax.disruptor.support.ValueEvent;
 import org.junit.Assert;
 import org.junit.Test;
@@ -69,13 +70,23 @@
  */
 public final class OnePublisherToOneProcessorUniCastBatchThroughputTest extends AbstractPerfTestQueueVsDisruptor
 {
-    private static final int BUFFER_SIZE = 1024 * 8;
-    private static final long ITERATIONS = 1000L * 1000L * 100L;
+    private static final boolean largerBuffers = Boolean.getBoolean("largerBuffers");
+    private static final int BUFFER_SIZE = 1024 * 8 * (largerBuffers ? 128 : 1);
+    private static final boolean doRealWork = Boolean.getBoolean("doRealWork");
+    private static final long ITERATIONS = (1000L * 1000L * 100L) / (doRealWork ? 1000 : 1);
     private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
     private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS);
 
     ///////////////////////////////////////////////////////////////////////////////////////////////
 
+    ///////////////////////////////////////////////////////////////////////////////////////////////
+
+    private final BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<Long>(BUFFER_SIZE);
+    private final ValueAdditionQueueProcessor queueProcessor = new ValueAdditionQueueProcessor(blockingQueue, ITERATIONS - 1);
+
+    ///////////////////////////////////////////////////////////////////////////////////////////////
+
+
     private final RingBuffer<ValueEvent> ringBuffer =
         new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
                                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
@@ -105,8 +116,24 @@
     @Override
     protected long runQueuePass() throws InterruptedException
     {
-        // Same expected results as UniCast scenario
-        return 0L;
+        final CountDownLatch latch = new CountDownLatch(1);
+        queueProcessor.reset(latch);
+        Future<?> future = EXECUTOR.submit(queueProcessor);
+        long start = System.currentTimeMillis();
+
+        for (long i = 0; i < ITERATIONS; i++)
+        {
+            blockingQueue.put(Long.valueOf(i));
+        }
+
+        latch.await();
+        long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
+        queueProcessor.halt();
+        future.cancel(true);
+
+        Assert.assertEquals(expectedResult, queueProcessor.getValue());
+
+        return opsPerSecond;
     }
 
     @Override
Index: src/perf/com/lmax/disruptor/SimpleHandOffTest.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- src/perf/com/lmax/disruptor/SimpleHandOffTest.java	(revision )
+++ src/perf/com/lmax/disruptor/SimpleHandOffTest.java	(revision )
@@ -0,0 +1,68 @@
+package com.lmax.disruptor;
+
+import com.lmax.disruptor.support.PerfTestUtil;
+import org.junit.Assert;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+public class SimpleHandOffTest {
+    final int NELEMENTS = 8*1024;
+    final int MASK = NELEMENTS-1;
+    private static final long ITERATIONS = (1000L * 1000L * 100L);
+    final AtomicLongArray array = new AtomicLongArray(NELEMENTS);
+    private final long expectedResult = PerfTestUtil.accumulatedAddition(ITERATIONS);
+    private volatile long result;
+
+    public static void main(String[] args) throws InterruptedException {
+        SimpleHandOffTest test = new SimpleHandOffTest();
+        for(int i=0;i<20;i++){
+            test.doTest(i);
+            Thread.sleep(1000);
+        }
+    }
+
+    private void doTest(int pass) throws InterruptedException {
+        for(int i=0;i<NELEMENTS;i++) {
+            array.set(i,Long.MAX_VALUE);
+        }
+        final CountDownLatch latch = new CountDownLatch(1);
+        new Consumer(latch).start();
+        long start = System.currentTimeMillis();
+        for(int i=0;i<ITERATIONS;i++) {
+            while(!array.compareAndSet(i&MASK,Long.MAX_VALUE,i)){
+                Thread.yield();
+            }
+        }
+        latch.await();
+        long opsPerSecond = (ITERATIONS * 1000L) / (System.currentTimeMillis() - start);
+        System.out.format("run " + pass + ", ops per second %,d%n", opsPerSecond);
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    private class Consumer extends Thread {
+        int count;
+        long total;
+        private final CountDownLatch latch;
+
+        public Consumer(CountDownLatch latch) {
+            this.latch=latch;
+        }
+
+        public void run() {
+            while(true) {
+                long val;
+                while((val=array.getAndSet(count&MASK,Long.MAX_VALUE))==Long.MAX_VALUE){
+                    Thread.yield();
+                }
+                ++count;
+                total+=val;
+                if(count==ITERATIONS) {
+                    result=total;
+                    latch.countDown();
+                    return;
+                }
+            }
+        }
+    }
+}
