Originating Test:

com.metamx.emitter.core.ParametrizedUriEmitterTest#testEmitterWithParametrizedUriExtractor

Algorithm:

shb

Project

Version

Commit

java-utiljava-util-1.3.8-1-g02dccb002dccb0bab36c3acaa3a8911ced34c9becf2ff50

Field Declaration:

/**
 * The time when the first event was written into this batch, needed for timeout-based batch emitting.
 */
private long firstEventTimestamp = -1;

Stack Trace 1

Stack Trace 2

Depth: 11 Depth: 3
com.metamx.emitter.core.Batch#tryAddFirstEvent:151

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private boolean tryAddFirstEvent(byte[] event)
  {
    if (!tryReserveFirstEventSizeAndLock(event)) {
      return false;
    }
    try {
      int bufferOffset = emitter.batchingStrategy.writeBatchStart(buffer);
      writeEvent(event, bufferOffset);
      eventCount.incrementAndGet();
      firstEventTimestamp = System.currentTimeMillis();
      return true;
    }
    finally {
      unlock();
    }
  }
com.metamx.emitter.core.Batch#sealIfFlushNeeded:226

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
void sealIfFlushNeeded() {
    long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestamp;
    if (firstEventTimestamp > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) {
      seal();
    }
  }
com.metamx.emitter.core.Batch#tryAddEvent:128

java-util/src/main/java/com/metamx/emitter/core/Batch.java

Class
/**
 * Buffer for batched data + synchronization state.
 *
 * The state structure ({@link AbstractQueuedLongSynchronizer#state}):
 * Bits 0-30 - bufferWatermark
 * Bit 31 - always 0
 * Bits 32-62 - "parties" (the number of concurrent writers)
 * Bit 63 - sealed flag
 *
 * Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
 * write data into the buffer, as long as sealed flag is false.
 *
 * {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
 * writes are completed). See {@link #isEmittingAllowed(long)}.
 *
 * In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
 * batch. "Unlock" means "decrement number of parties by 1".
 */
class Batch extends AbstractQueuedLongSynchronizer
{
Method
  /**
   * Tries to add (write) event to the batch, returns true, if successful. If fails, no subsequent attempts to add event
   * to this batch will succeed, the next batch should be taken.
   */
boolean tryAddEvent(byte[] event)
  {
    while (true) {
      long state = getState();
      if (isSealed(state)) {
        return false;
      }
      int bufferWatermark = bufferWatermark(state);
      if (bufferWatermark == 0) {
        if (tryAddFirstEvent(event)) {
          return true;
        }
      } else if (newBufferWatermark(bufferWatermark, event) <= emitter.maxBufferWatermark) {
        if (tryAddNonFirstEvent(state, event)) {
          return true;
        }
      } else {
        seal();
        return false;
      }
    }
  }
com.metamx.emitter.core.HttpPostEmitter$EmittingThread#needsToShutdown:472

java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java

Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
private boolean needsToShutdown()
    {
      boolean needsToShutdown = Thread.interrupted() || shuttingDown;
      if (needsToShutdown) {
        Batch lastBatch = concurrentBatch.getAndSet(null);
        if (lastBatch != null) {
          lastBatch.seal();
        }
      } else {
        Batch batch = concurrentBatch.get();
        if (batch != null) {
          batch.sealIfFlushNeeded();
        } else {
          needsToShutdown = true;
        }
      }
      return needsToShutdown;
    }
com.metamx.emitter.core.HttpPostEmitter#emitAndReturnBatch:244

java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java

Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@VisibleForTesting
  Batch emitAndReturnBatch(Event event)
  {
    awaitStarted();

    final byte[] eventBytes = eventToBytes(event);

    if (eventBytes.length > MAX_EVENT_SIZE) {
      log.error(
          "Event too large to emit (%,d > %,d): %s ...",
          eventBytes.length,
          MAX_EVENT_SIZE,
          StringUtils.fromUtf8(ByteBuffer.wrap(eventBytes), 1024)
      );
      return null;
    }

    if (eventBytes.length > largeEventThreshold) {
      writeLargeEvent(eventBytes);
      return null;
    }

    while (true) {
      Batch batch = concurrentBatch.get();
      if (batch == null) {
        throw new RejectedExecutionException("Service is closed.");
      }
      if (batch.tryAddEvent(eventBytes)) {
        return batch;
      }
      // Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
      // eventually, because concurrentBatch.get() is a volatile read.
    }
  }
com.metamx.emitter.core.HttpPostEmitter$EmittingThread#run:436

java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java

Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@Override
    public void run()
    {
      while (true) {
        boolean needsToShutdown = needsToShutdown();
        try {
          emitLargeEvents();
          emitBatches();
          tryEmitOneFailedBuffer();

          if (needsToShutdown) {
            tryEmitAndDrainAllFailedBuffers();
            // Make GC life easier
            drainBuffersToReuse();
            return;
          }
        }
        catch (Throwable t) {
          log.error(t, "Uncaught exception in EmittingThread.run()");
        }
        if (failedBuffers.isEmpty()) {
          // Waiting for 1/2 of config.getFlushMillis() in order to flush events not more than 50% later than specified.
          // If nanos=0 parkNanos() doesn't wait at all, then we don't want.
          long waitNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(config.getFlushMillis()) / 2, 1);
          LockSupport.parkNanos(HttpPostEmitter.this, waitNanos);
        }
      }
    }
com.metamx.emitter.core.HttpPostEmitter#emit:214

java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java

Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@Override
  public void emit(Event event)
  {
    emitAndReturnBatch(event);
  }
com.metamx.emitter.core.ParametrizedUriEmitter#emit:142

java-util/src/main/java/com/metamx/emitter/core/ParametrizedUriEmitter.java

Class
public class ParametrizedUriEmitter implements Flushable, Closeable, Emitter
{
Method
@Override
  public void emit(Event event)
  {
    if (bwListEventFilter.isNotListed(event)) {
      log.trace("Event won't be emitted since one of its field isn't whitelisted or is blacklisted");
      return;
    }
    try {
      URI uri = uriExtractor.apply(event);
      HttpPostEmitter emitter = emitters.get(uri);
      if (emitter == null) {
        try {
          emitter = emitters.computeIfAbsent(uri, u -> {
            try {
              return innerLifecycle.addMaybeStartManagedInstance(
                  new HttpPostEmitter(
                      config.buildHttpEmitterConfig(u.toString()),
                      client,
                      jsonMapper
                  )
              );
            }
            catch (Exception e) {
              throw Throwables.propagate(e);
            }
          });
        }
        catch (RuntimeException e) {
          log.error(e, "Error while creating or starting an HttpPostEmitter for URI[%s]", uri);
          return;
        }
      }
      emitter.emit(event);
    }
    catch (URISyntaxException e) {
      log.error(e, "Failed to extract URI for event[%s]", event.toMap());
    }
  }
com.metamx.emitter.core.ParametrizedUriEmitterTest#testEmitterWithParametrizedUriExtractor:170

java-util/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterTest.java

Class
public class ParametrizedUriEmitterTest
{
Method
@Test
  public void testEmitterWithParametrizedUriExtractor() throws Exception
  {
    Emitter emitter = parametrizedEmmiter("http://example.com/{key1}/{key2}");
    final List<UnitEvent> events = Arrays.asList(
        new UnitEvent("test", 1, ImmutableMap.of("key1", "val1", "key2", "val2")),
        new UnitEvent("test", 2, ImmutableMap.of("key1", "val1", "key2", "val2"))
    );

    httpClient.setGoHandler(
        new GoHandler()
        {
          @Override
          protected ListenableFuture<Response> go(Request request) throws JsonProcessingException
          {
            Assert.assertEquals("http://example.com/val1/val2", request.getUrl());
            Assert.assertEquals(
                String.format(
                    "[%s,%s]\n",
                    jsonMapper.writeValueAsString(events.get(0)),
                    jsonMapper.writeValueAsString(events.get(1))
                ),
                Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
            );

            return GoHandlers.immediateFuture(okResponse());
          }
        }.times(1)
    );

    for (UnitEvent event : events) {
      emitter.emit(event);
    }
    emitter.flush();
    Assert.assertTrue(httpClient.succeeded());
  }