KeyedDispatcherImpl.java
package io.github.jonloucks.metalog.impl;
import io.github.jonloucks.concurrency.api.Idempotent;
import io.github.jonloucks.concurrency.api.StateMachine;
import io.github.jonloucks.contracts.api.AutoClose;
import io.github.jonloucks.contracts.api.AutoOpen;
import io.github.jonloucks.metalog.api.Dispatcher;
import io.github.jonloucks.metalog.api.Meta;
import io.github.jonloucks.metalog.api.Metalog;
import io.github.jonloucks.metalog.api.Outcome;
import java.time.Duration;
import java.util.concurrent.*;
import static io.github.jonloucks.concurrency.api.Idempotent.withClose;
import static io.github.jonloucks.concurrency.api.Idempotent.withOpen;
import static io.github.jonloucks.metalog.impl.Internal.commandCheck;
import static io.github.jonloucks.metalog.impl.Internal.runWithIgnore;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
final class KeyedDispatcherImpl implements Dispatcher, AutoOpen {
@Override
public AutoClose open() {
return withOpen(stateMachine, this::realOpen);
}
@Override
public Outcome dispatch(Meta meta, Runnable work) {
final Runnable validCommand = commandCheck(work);
if (stateMachine.getState().isRejecting()) {
validCommand.run();
return Outcome.CONSUMED;
}
inflightSemaphore.acquireUninterruptibly();
try {
workQueue.put(validCommand);
} catch (InterruptedException ignored) {
inflightSemaphore.release();
}
return Outcome.DISPATCHED;
}
KeyedDispatcherImpl(Metalog.Config config) {
this.inflightSemaphore = new Semaphore(config.keyedQueueLimit());
this.workQueue = new ArrayBlockingQueue<>(config.keyedQueueLimit());
this.workerThread = new Thread(this::consumeLoop);
this.shutdownTimeout = config.shutdownTimeout();
this.stateMachine = Idempotent.createStateMachine(config.contracts());
}
private AutoClose realOpen() {
workerThread.start();
return this::close;
}
private void close() {
withClose(stateMachine, this::realClose);
}
private void realClose() {
triggerWorkerExit.countDown();
runWithIgnore(() -> {
if (!workerExitedLatch.await(shutdownTimeout.toMillis(), MILLISECONDS)) {
workerThread.interrupt();
}
});
// taking over emptying the queue
while (!workQueue.isEmpty()) {
runQueueJob(workQueue.poll());
}
}
private void runQueueJob(Runnable command) {
if (ofNullable(command).isPresent()) {
try {
runWithIgnore(command::run);
} finally {
inflightSemaphore.release();
}
}
}
private void consumeLoop() {
try {
runWithIgnore(() -> {
while (!triggerWorkerExit.await(1, MILLISECONDS)) {
runQueueJob(workQueue.poll(1, MILLISECONDS));
}
});
} finally {
workerExitedLatch.countDown();
}
}
private final StateMachine<Idempotent> stateMachine;
private final Duration shutdownTimeout;
private final Semaphore inflightSemaphore;
private final Thread workerThread;
private final ArrayBlockingQueue<Runnable> workQueue;
private final CountDownLatch triggerWorkerExit = new CountDownLatch(1);
private final CountDownLatch workerExitedLatch = new CountDownLatch(1);
}