Depth: 11 |
Depth: 3 |
com.metamx.emitter.core.Batch#tryAddFirstEvent:151
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private boolean tryAddFirstEvent(byte[] event)
{
if (!tryReserveFirstEventSizeAndLock(event)) {
return false;
}
try {
int bufferOffset = emitter.batchingStrategy.writeBatchStart(buffer);
writeEvent(event, bufferOffset);
eventCount.incrementAndGet();
firstEventTimestamp = System.currentTimeMillis();
return true;
}
finally {
unlock();
}
}
|
com.metamx.emitter.core.Batch#sealIfFlushNeeded:226
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
void sealIfFlushNeeded() {
long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestamp;
if (firstEventTimestamp > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) {
seal();
}
}
|
com.metamx.emitter.core.Batch#tryAddEvent:128
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
/**
* Tries to add (write) event to the batch, returns true, if successful. If fails, no subsequent attempts to add event
* to this batch will succeed, the next batch should be taken.
*/
boolean tryAddEvent(byte[] event)
{
while (true) {
long state = getState();
if (isSealed(state)) {
return false;
}
int bufferWatermark = bufferWatermark(state);
if (bufferWatermark == 0) {
if (tryAddFirstEvent(event)) {
return true;
}
} else if (newBufferWatermark(bufferWatermark, event) <= emitter.maxBufferWatermark) {
if (tryAddNonFirstEvent(state, event)) {
return true;
}
} else {
seal();
return false;
}
}
}
|
com.metamx.emitter.core.HttpPostEmitter$EmittingThread#needsToShutdown:472
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
private boolean needsToShutdown()
{
boolean needsToShutdown = Thread.interrupted() || shuttingDown;
if (needsToShutdown) {
Batch lastBatch = concurrentBatch.getAndSet(null);
if (lastBatch != null) {
lastBatch.seal();
}
} else {
Batch batch = concurrentBatch.get();
if (batch != null) {
batch.sealIfFlushNeeded();
} else {
needsToShutdown = true;
}
}
return needsToShutdown;
}
|
com.metamx.emitter.core.HttpPostEmitter#emitAndReturnBatch:244
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@VisibleForTesting
Batch emitAndReturnBatch(Event event)
{
awaitStarted();
final byte[] eventBytes = eventToBytes(event);
if (eventBytes.length > MAX_EVENT_SIZE) {
log.error(
"Event too large to emit (%,d > %,d): %s ...",
eventBytes.length,
MAX_EVENT_SIZE,
StringUtils.fromUtf8(ByteBuffer.wrap(eventBytes), 1024)
);
return null;
}
if (eventBytes.length > largeEventThreshold) {
writeLargeEvent(eventBytes);
return null;
}
while (true) {
Batch batch = concurrentBatch.get();
if (batch == null) {
throw new RejectedExecutionException("Service is closed.");
}
if (batch.tryAddEvent(eventBytes)) {
return batch;
}
// Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
// eventually, because concurrentBatch.get() is a volatile read.
}
}
|
com.metamx.emitter.core.HttpPostEmitter$EmittingThread#run:436
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@Override
public void run()
{
while (true) {
boolean needsToShutdown = needsToShutdown();
try {
emitLargeEvents();
emitBatches();
tryEmitOneFailedBuffer();
if (needsToShutdown) {
tryEmitAndDrainAllFailedBuffers();
// Make GC life easier
drainBuffersToReuse();
return;
}
}
catch (Throwable t) {
log.error(t, "Uncaught exception in EmittingThread.run()");
}
if (failedBuffers.isEmpty()) {
// Waiting for 1/2 of config.getFlushMillis() in order to flush events not more than 50% later than specified.
// If nanos=0 parkNanos() doesn't wait at all, then we don't want.
long waitNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(config.getFlushMillis()) / 2, 1);
LockSupport.parkNanos(HttpPostEmitter.this, waitNanos);
}
}
}
|
com.metamx.emitter.core.HttpPostEmitter#emit:214
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@Override
public void emit(Event event)
{
emitAndReturnBatch(event);
}
| |
com.metamx.emitter.core.ParametrizedUriEmitter#emit:142
java-util/src/main/java/com/metamx/emitter/core/ParametrizedUriEmitter.java
Class
public class ParametrizedUriEmitter implements Flushable, Closeable, Emitter
{
Method
@Override
public void emit(Event event)
{
if (bwListEventFilter.isNotListed(event)) {
log.trace("Event won't be emitted since one of its field isn't whitelisted or is blacklisted");
return;
}
try {
URI uri = uriExtractor.apply(event);
HttpPostEmitter emitter = emitters.get(uri);
if (emitter == null) {
try {
emitter = emitters.computeIfAbsent(uri, u -> {
try {
return innerLifecycle.addMaybeStartManagedInstance(
new HttpPostEmitter(
config.buildHttpEmitterConfig(u.toString()),
client,
jsonMapper
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
});
}
catch (RuntimeException e) {
log.error(e, "Error while creating or starting an HttpPostEmitter for URI[%s]", uri);
return;
}
}
emitter.emit(event);
}
catch (URISyntaxException e) {
log.error(e, "Failed to extract URI for event[%s]", event.toMap());
}
}
| |
com.metamx.emitter.core.ParametrizedUriEmitterTest#testEmitterWithParametrizedUriExtractor:170
java-util/src/test/java/com/metamx/emitter/core/ParametrizedUriEmitterTest.java
Class
public class ParametrizedUriEmitterTest
{
Method
@Test
public void testEmitterWithParametrizedUriExtractor() throws Exception
{
Emitter emitter = parametrizedEmmiter("http://example.com/{key1}/{key2}");
final List<UnitEvent> events = Arrays.asList(
new UnitEvent("test", 1, ImmutableMap.of("key1", "val1", "key2", "val2")),
new UnitEvent("test", 2, ImmutableMap.of("key1", "val1", "key2", "val2"))
);
httpClient.setGoHandler(
new GoHandler()
{
@Override
protected ListenableFuture<Response> go(Request request) throws JsonProcessingException
{
Assert.assertEquals("http://example.com/val1/val2", request.getUrl());
Assert.assertEquals(
String.format(
"[%s,%s]\n",
jsonMapper.writeValueAsString(events.get(0)),
jsonMapper.writeValueAsString(events.get(1))
),
Charsets.UTF_8.decode(request.getByteBufferData().slice()).toString()
);
return GoHandlers.immediateFuture(okResponse());
}
}.times(1)
);
for (UnitEvent event : events) {
emitter.emit(event);
}
emitter.flush();
Assert.assertTrue(httpClient.succeeded());
}
| |