Depth: 4 |
Depth: 5 |
com.metamx.emitter.core.Batch#tryAddFirstEvent:151
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private boolean tryAddFirstEvent(byte[] event)
{
if (!tryReserveFirstEventSizeAndLock(event)) {
return false;
}
try {
int bufferOffset = emitter.batchingStrategy.writeBatchStart(buffer);
writeEvent(event, bufferOffset);
eventCount.incrementAndGet();
firstEventTimestamp = System.currentTimeMillis();
return true;
}
finally {
unlock();
}
}
|
com.metamx.emitter.core.Batch#unlockAndSealIfNeeded:216
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private void unlockAndSealIfNeeded()
{
if (eventCount.incrementAndGet() >= emitter.config.getFlushCount()) {
unlockAndSeal();
} else {
long timeSinceFirstEvent = System.currentTimeMillis() - firstEventTimestamp;
if (firstEventTimestamp > 0 && timeSinceFirstEvent > emitter.config.getFlushMillis()) {
unlockAndSeal();
} else {
unlock();
}
}
}
|
com.metamx.emitter.core.Batch#tryAddEvent:128
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
/**
* Tries to add (write) event to the batch, returns true, if successful. If fails, no subsequent attempts to add event
* to this batch will succeed, the next batch should be taken.
*/
boolean tryAddEvent(byte[] event)
{
while (true) {
long state = getState();
if (isSealed(state)) {
return false;
}
int bufferWatermark = bufferWatermark(state);
if (bufferWatermark == 0) {
if (tryAddFirstEvent(event)) {
return true;
}
} else if (newBufferWatermark(bufferWatermark, event) <= emitter.maxBufferWatermark) {
if (tryAddNonFirstEvent(state, event)) {
return true;
}
} else {
seal();
return false;
}
}
}
|
com.metamx.emitter.core.Batch#tryAddNonFirstEvent:181
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
private boolean tryAddNonFirstEvent(long state, byte[] event)
{
int bufferOffset = tryReserveEventSizeAndLock(state, emitter.batchingStrategy.separatorLength() + event.length);
if (bufferOffset < 0) {
return false;
}
try {
bufferOffset = emitter.batchingStrategy.writeMessageSeparator(buffer, bufferOffset);
writeEvent(event, bufferOffset);
return true;
}
finally {
unlockAndSealIfNeeded();
}
}
|
com.metamx.emitter.core.HttpPostEmitter#emitAndReturnBatch:244
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@VisibleForTesting
Batch emitAndReturnBatch(Event event)
{
awaitStarted();
final byte[] eventBytes = eventToBytes(event);
if (eventBytes.length > MAX_EVENT_SIZE) {
log.error(
"Event too large to emit (%,d > %,d): %s ...",
eventBytes.length,
MAX_EVENT_SIZE,
StringUtils.fromUtf8(ByteBuffer.wrap(eventBytes), 1024)
);
return null;
}
if (eventBytes.length > largeEventThreshold) {
writeLargeEvent(eventBytes);
return null;
}
while (true) {
Batch batch = concurrentBatch.get();
if (batch == null) {
throw new RejectedExecutionException("Service is closed.");
}
if (batch.tryAddEvent(eventBytes)) {
return batch;
}
// Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
// eventually, because concurrentBatch.get() is a volatile read.
}
}
|
com.metamx.emitter.core.Batch#tryAddEvent:132
java-util/src/main/java/com/metamx/emitter/core/Batch.java
Class
/**
* Buffer for batched data + synchronization state.
*
* The state structure ({@link AbstractQueuedLongSynchronizer#state}):
* Bits 0-30 - bufferWatermark
* Bit 31 - always 0
* Bits 32-62 - "parties" (the number of concurrent writers)
* Bit 63 - sealed flag
*
* Writer threads (callers of {@link HttpPostEmitter#emit(Event)}) are eligible to come, increment bufferWatermark and
* write data into the buffer, as long as sealed flag is false.
*
* {@link HttpPostEmitter#emittingThread} is eligible to emit the buffer, when sealed flag=true and parties=0 (all
* writes are completed). See {@link #isEmittingAllowed(long)}.
*
* In this class, "lock" means "increment number of parties by 1", i. e. lock the emitter thread from emitting this
* batch. "Unlock" means "decrement number of parties by 1".
*/
class Batch extends AbstractQueuedLongSynchronizer
{
Method
/**
* Tries to add (write) event to the batch, returns true, if successful. If fails, no subsequent attempts to add event
* to this batch will succeed, the next batch should be taken.
*/
boolean tryAddEvent(byte[] event)
{
while (true) {
long state = getState();
if (isSealed(state)) {
return false;
}
int bufferWatermark = bufferWatermark(state);
if (bufferWatermark == 0) {
if (tryAddFirstEvent(event)) {
return true;
}
} else if (newBufferWatermark(bufferWatermark, event) <= emitter.maxBufferWatermark) {
if (tryAddNonFirstEvent(state, event)) {
return true;
}
} else {
seal();
return false;
}
}
}
|
com.metamx.emitter.core.HttpPostEmitterStressTest$3#run:102
java-util/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java
Class
public class HttpPostEmitterStressTest
{
Method
@Test
public void eventCountBased() throws InterruptedException, IOException
{
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
// For this test, we don't need any batches to be dropped, i. e. "gaps" in data
.setBatchQueueSizeLimit(1000)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
for (int i = 0; i < nThreads; i++) {
eventsPerThread.add(new IntArrayList());
eventBatchesPerThread.add(new ArrayList<Batch>());
}
for (int i = 0; i < N; i++) {
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
}
final BitSet emittedEvents = new BitSet(N);
httpClient.setGoHandler(new GoHandler()
{
@Override
protected ListenableFuture<Response> go(Request request)
{
ByteBuffer batch = request.getByteBufferData().slice();
while (batch.remaining() > 0) {
emittedEvents.set(batch.getInt());
}
return GoHandlers.immediateFuture(EmitterTest.okResponse());
}
});
emitter.start();
final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
final int threadIndex = i;
new Thread() {
@Override
public void run()
{
IntList events = eventsPerThread.get(threadIndex);
List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
IntEvent event = new IntEvent();
for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
event.index = events.getInt(i);
eventBatches.add(emitter.emitAndReturnBatch(event));
if (i % 16 == 0) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
threadsCompleted.countDown();
}
}.start();
}
threadsCompleted.await();
emitter.flush();
System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
for (int eventIndex = 0; eventIndex < N; eventIndex++) {
if (!emittedEvents.get(eventIndex)) {
for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) {
IntList threadEvents = eventsPerThread.get(threadIndex);
int indexOfEvent = threadEvents.indexOf(eventIndex);
if (indexOfEvent >= 0) {
Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
System.err.println(batch);
int bufferWatermark = batch.getSealedBufferWatermark();
ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
batchBuffer.limit(bufferWatermark);
while (batchBuffer.remaining() > 0) {
System.err.println(batchBuffer.getInt());
}
break;
}
}
throw new AssertionError("event " + eventIndex);
}
}
}
|
com.metamx.emitter.core.HttpPostEmitter#emitAndReturnBatch:244
java-util/src/main/java/com/metamx/emitter/core/HttpPostEmitter.java
Class
public class HttpPostEmitter implements Flushable, Closeable, Emitter
{
Method
@VisibleForTesting
Batch emitAndReturnBatch(Event event)
{
awaitStarted();
final byte[] eventBytes = eventToBytes(event);
if (eventBytes.length > MAX_EVENT_SIZE) {
log.error(
"Event too large to emit (%,d > %,d): %s ...",
eventBytes.length,
MAX_EVENT_SIZE,
StringUtils.fromUtf8(ByteBuffer.wrap(eventBytes), 1024)
);
return null;
}
if (eventBytes.length > largeEventThreshold) {
writeLargeEvent(eventBytes);
return null;
}
while (true) {
Batch batch = concurrentBatch.get();
if (batch == null) {
throw new RejectedExecutionException("Service is closed.");
}
if (batch.tryAddEvent(eventBytes)) {
return batch;
}
// Spin loop, until the thread calling onSealExclusive() updates the concurrentBatch. This update becomes visible
// eventually, because concurrentBatch.get() is a volatile read.
}
}
|
|
com.metamx.emitter.core.HttpPostEmitterStressTest$3#run:102
java-util/src/test/java/com/metamx/emitter/core/HttpPostEmitterStressTest.java
Class
public class HttpPostEmitterStressTest
{
Method
@Test
public void eventCountBased() throws InterruptedException, IOException
{
HttpEmitterConfig config = new HttpEmitterConfig.Builder("http://foo.bar")
.setFlushMillis(100)
.setFlushCount(4)
.setBatchingStrategy(BatchingStrategy.ONLY_EVENTS)
.setMaxBatchSize(1024 * 1024)
// For this test, we don't need any batches to be dropped, i. e. "gaps" in data
.setBatchQueueSizeLimit(1000)
.build();
final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, objectMapper);
int nThreads = Runtime.getRuntime().availableProcessors() * 2;
final List<IntList> eventsPerThread = new ArrayList<>(nThreads);
final List<List<Batch>> eventBatchesPerThread = new ArrayList<>(nThreads);
for (int i = 0; i < nThreads; i++) {
eventsPerThread.add(new IntArrayList());
eventBatchesPerThread.add(new ArrayList<Batch>());
}
for (int i = 0; i < N; i++) {
eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i);
}
final BitSet emittedEvents = new BitSet(N);
httpClient.setGoHandler(new GoHandler()
{
@Override
protected ListenableFuture<Response> go(Request request)
{
ByteBuffer batch = request.getByteBufferData().slice();
while (batch.remaining() > 0) {
emittedEvents.set(batch.getInt());
}
return GoHandlers.immediateFuture(EmitterTest.okResponse());
}
});
emitter.start();
final CountDownLatch threadsCompleted = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
final int threadIndex = i;
new Thread() {
@Override
public void run()
{
IntList events = eventsPerThread.get(threadIndex);
List<Batch> eventBatches = eventBatchesPerThread.get(threadIndex);
IntEvent event = new IntEvent();
for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) {
event.index = events.getInt(i);
eventBatches.add(emitter.emitAndReturnBatch(event));
if (i % 16 == 0) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
threadsCompleted.countDown();
}
}.start();
}
threadsCompleted.await();
emitter.flush();
System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers());
for (int eventIndex = 0; eventIndex < N; eventIndex++) {
if (!emittedEvents.get(eventIndex)) {
for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) {
IntList threadEvents = eventsPerThread.get(threadIndex);
int indexOfEvent = threadEvents.indexOf(eventIndex);
if (indexOfEvent >= 0) {
Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent);
System.err.println(batch);
int bufferWatermark = batch.getSealedBufferWatermark();
ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer);
batchBuffer.limit(bufferWatermark);
while (batchBuffer.remaining() > 0) {
System.err.println(batchBuffer.getInt());
}
break;
}
}
throw new AssertionError("event " + eventIndex);
}
}
}
|