Originating Test:

com.metamx.emitter.core.HttpPostEmitterStressTest#eventCountBased

Algorithm:

shb

Project

Version

Commit

java-utiljava-util-1.3.8-1-g02dccb002dccb0bab36c3acaa3a8911ced34c9becf2ff50

Field Declaration:

/**
 * The time when the first event was written into this batch, needed for timeout-based batch emitting.
 */
private long firstEventTimestamp = -1;

Stack Trace 1

Stack Trace 2

Depth: 4 Depth: 5
com.metamx.emitter.core.Batch#tryAddFirstEvent:151

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private boolean tryAddFirstEvent(byte[] event)
  {
    if (!tryReserveFirstEventSizeAndLock(event)) {
      return false;
    }
    try {
      int bufferOffset = emitter.batchingStrategy.writeBatchStart(buffer);
      writeEvent(event, bufferOffset);
      eventCount.incrementAndGet();
      firstEventTimestamp = System.currentTimeMillis();
      return true;
    }
    finally {
      unlock();
    }
  }
com.metamx.emitter.core.Batch#unlockAndSealIfNeeded:216

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private void unlockAndSealIfNeeded()
  {
    if (eventCount.incrementAndGet() >= emitter.config.getFlushCount()) {
      unlockAndSeal();
    } else {
      long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestamp;
      if (firstEventTimestamp > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) {
        unlockAndSeal();
      } else {
        unlock();
      }
    }
  }
com.metamx.emitter.core.Batch#tryAddEvent:128

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
  /**
   * Tries to add (write) event to the batch, returns true, if successful. If fails, no subsequent attempts to add event
   * to this batch will succeed, the next batch should be taken.
   */
boolean tryAddEvent(byte[] event)
  {
    while (true) {
      long state = getState();
      if (isSealed(state)) {
        return false;
      }
      int bufferWatermark = bufferWatermark(state);
      if (bufferWatermark == 0) {
        if (tryAddFirstEvent(event)) {
          return true;
        }
      } else if (newBufferWatermark(bufferWatermark, event) <= emitter.maxBufferWatermark) {
        if (tryAddNonFirstEvent(state, event)) {
          return true;
        }
      } else {
        seal();
        return false;
      }
    }
  }
com.metamx.emitter.core.Batch#tryAddNonFirstEvent:181

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private boolean tryAddNonFirstEvent(long state, byte[] event)
  {
    int bufferOffset = tryReserveEventSizeAndLock(state, emitter.batchingStrategy.separatorLength() + event.length);
    if (bufferOffset < 0) {
      return false;
    }
    try {
      bufferOffset = emitter.batchingStrategy.writeMessageSeparator(buffer, bufferOffset);
      writeEvent(event, bufferOffset);
      return true;
    }
    finally {
      unlockAndSealIfNeeded();
    }
  }
com.metamx.emitter.core.HttpPostEmitter#emitAndReturnBatch:244

java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java

Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@VisibleForTesting
  Batch emitAndReturnBatch(Event event)
  {
    awaitStarted();

    final byte[] eventBytes = eventToBytes(event);

    if (eventBytes.length > MAX_EVENT_SIZE) {
      log.error(
          "Event too large to emit (%,d > %,d): %s ...",
          eventBytes.length,
          MAX_EVENT_SIZE,
          StringUtils.fromUtf8(ByteBuffer.wrap(eventBytes), 1024)
      );
      return null;
    }

    if (eventBytes.length > largeEventThreshold) {
      writeLargeEvent(eventBytes);
      return null;
    }

    while (true) {
      Batch batch = concurrentBatch.get();
      if (batch == null) {
        throw new RejectedExecutionException("Service is closed.");
      }
      if (batch.tryAddEvent(eventBytes)) {
        return batch;
      }
      // Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
      // eventually, because concurrentBatch.get() is a volatile read.
    }
  }
com.metamx.emitter.core.Batch#tryAddEvent:132

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
  /**
   * Tries to add (write) event to the batch, returns true, if successful. If fails, no subsequent attempts to add event
   * to this batch will succeed, the next batch should be taken.
   */
boolean tryAddEvent(byte[] event)
  {
    while (true) {
      long state = getState();
      if (isSealed(state)) {
        return false;
      }
      int bufferWatermark = bufferWatermark(state);
      if (bufferWatermark == 0) {
        if (tryAddFirstEvent(event)) {
          return true;
        }
      } else if (newBufferWatermark(bufferWatermark, event) <= emitter.maxBufferWatermark) {
        if (tryAddNonFirstEvent(state, event)) {
          return true;
        }
      } else {
        seal();
        return false;
      }
    }
  }
com.metamx.emitter.core.HttpPostEmitterStressTest$3#run:102

java-util/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java

Class
public class HttpPostEmitterStressTest
{
Method
@Test
  public void eventCountBased() throws InterruptedException, IOException
  {
    HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
        .setFlushMillis(100)
        .setFlushCount(4)
        .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
        .setMaxBatchSize(1024 * 1024)
        // For this test, we don't need any batches to be dropped, i. e. "gaps" in data
        .setBatchQueueSizeLimit(1000)
        .build();
    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
    int nThreads = Runtime.getRuntime().availableProcessors() * 2;
    final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
    final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
    for (int i = 0; i < nThreads; i++) {
      eventsPerThread.add(new IntArrayList());
      eventBatchesPerThread.add(new ArrayList<Batch>());
    }
    for (int i = 0; i < N; i++) {
      eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
    }
    final BitSet emittedEvents = new BitSet(N);
    httpClient.setGoHandler(new GoHandler()
    {
      @Override
      protected ListenableFuture<Response> go(Request request)
      {
        ByteBuffer batch = request.getByteBufferData().slice();
        while (batch.remaining() > 0) {
          emittedEvents.set(batch.getInt());
        }
        return GoHandlers.immediateFuture(EmitterTest.okResponse());
      }
    });
    emitter.start();
    final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
    for (int i = 0; i < nThreads; i++) {
      final int threadIndex = i;
      new Thread() {
        @Override
        public void run()
        {
          IntList events = eventsPerThread.get(threadIndex);
          List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
          IntEvent event = new IntEvent();
          for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
            event.index = events.getInt(i);
            eventBatches.add(emitter.emitAndReturnBatch(event));
            if (i % 16 == 0) {
              try {
                Thread.sleep(10);
              }
              catch (InterruptedException e) {
                throw new RuntimeException(e);
              }
            }
          }
          threadsCompleted.countDown();
        }
      }.start();
    }
    threadsCompleted.await();
    emitter.flush();
    System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
    for (int eventIndex = 0; eventIndex < N; eventIndex++) {
      if (!emittedEvents.get(eventIndex)) {
        for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) {
          IntList threadEvents = eventsPerThread.get(threadIndex);
          int indexOfEvent = threadEvents.indexOf(eventIndex);
          if (indexOfEvent >= 0) {
            Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
            System.err.println(batch);
            int bufferWatermark = batch.getSealedBufferWatermark();
            ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
            batchBuffer.limit(bufferWatermark);
            while (batchBuffer.remaining() > 0) {
              System.err.println(batchBuffer.getInt());
            }
            break;
          }
        }
        throw new AssertionError("event " + eventIndex);
      }
    }
  }
com.metamx.emitter.core.HttpPostEmitter#emitAndReturnBatch:244

java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java

Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@VisibleForTesting
  Batch emitAndReturnBatch(Event event)
  {
    awaitStarted();

    final byte[] eventBytes = eventToBytes(event);

    if (eventBytes.length > MAX_EVENT_SIZE) {
      log.error(
          "Event too large to emit (%,d > %,d): %s ...",
          eventBytes.length,
          MAX_EVENT_SIZE,
          StringUtils.fromUtf8(ByteBuffer.wrap(eventBytes), 1024)
      );
      return null;
    }

    if (eventBytes.length > largeEventThreshold) {
      writeLargeEvent(eventBytes);
      return null;
    }

    while (true) {
      Batch batch = concurrentBatch.get();
      if (batch == null) {
        throw new RejectedExecutionException("Service is closed.");
      }
      if (batch.tryAddEvent(eventBytes)) {
        return batch;
      }
      // Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
      // eventually, because concurrentBatch.get() is a volatile read.
    }
  }
com.metamx.emitter.core.HttpPostEmitterStressTest$3#run:102

java-util/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java

Class
public class HttpPostEmitterStressTest
{
Method
@Test
  public void eventCountBased() throws InterruptedException, IOException
  {
    HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
        .setFlushMillis(100)
        .setFlushCount(4)
        .setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
        .setMaxBatchSize(1024 * 1024)
        // For this test, we don't need any batches to be dropped, i. e. "gaps" in data
        .setBatchQueueSizeLimit(1000)
        .build();
    final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
    int nThreads = Runtime.getRuntime().availableProcessors() * 2;
    final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
    final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
    for (int i = 0; i < nThreads; i++) {
      eventsPerThread.add(new IntArrayList());
      eventBatchesPerThread.add(new ArrayList<Batch>());
    }
    for (int i = 0; i < N; i++) {
      eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
    }
    final BitSet emittedEvents = new BitSet(N);
    httpClient.setGoHandler(new GoHandler()
    {
      @Override
      protected ListenableFuture<Response> go(Request request)
      {
        ByteBuffer batch = request.getByteBufferData().slice();
        while (batch.remaining() > 0) {
          emittedEvents.set(batch.getInt());
        }
        return GoHandlers.immediateFuture(EmitterTest.okResponse());
      }
    });
    emitter.start();
    final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
    for (int i = 0; i < nThreads; i++) {
      final int threadIndex = i;
      new Thread() {
        @Override
        public void run()
        {
          IntList events = eventsPerThread.get(threadIndex);
          List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
          IntEvent event = new IntEvent();
          for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
            event.index = events.getInt(i);
            eventBatches.add(emitter.emitAndReturnBatch(event));
            if (i % 16 == 0) {
              try {
                Thread.sleep(10);
              }
              catch (InterruptedException e) {
                throw new RuntimeException(e);
              }
            }
          }
          threadsCompleted.countDown();
        }
      }.start();
    }
    threadsCompleted.await();
    emitter.flush();
    System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
    for (int eventIndex = 0; eventIndex < N; eventIndex++) {
      if (!emittedEvents.get(eventIndex)) {
        for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) {
          IntList threadEvents = eventsPerThread.get(threadIndex);
          int indexOfEvent = threadEvents.indexOf(eventIndex);
          if (indexOfEvent >= 0) {
            Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
            System.err.println(batch);
            int bufferWatermark = batch.getSealedBufferWatermark();
            ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
            batchBuffer.limit(bufferWatermark);
            while (batchBuffer.remaining() > 0) {
              System.err.println(batchBuffer.getInt());
            }
            break;
          }
        }
        throw new AssertionError("event " + eventIndex);
      }
    }
  }