UnkeyedDispatcherImpl.java
package io.github.jonloucks.metalog.impl;
import io.github.jonloucks.concurrency.api.Idempotent;
import io.github.jonloucks.concurrency.api.StateMachine;
import io.github.jonloucks.contracts.api.AutoClose;
import io.github.jonloucks.contracts.api.AutoOpen;
import io.github.jonloucks.metalog.api.Dispatcher;
import io.github.jonloucks.metalog.api.Meta;
import io.github.jonloucks.metalog.api.Metalog;
import io.github.jonloucks.metalog.api.Outcome;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.github.jonloucks.concurrency.api.Idempotent.withClose;
import static io.github.jonloucks.concurrency.api.Idempotent.withOpen;
import static io.github.jonloucks.metalog.impl.Internal.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
final class UnkeyedDispatcherImpl implements Dispatcher, AutoOpen {
@Override
public AutoClose open() {
return withOpen(stateMachine, ()-> this::close);
}
@Override
public Outcome dispatch(Meta meta, Runnable work) {
final Runnable validCommand = commandCheck(work);
if (stateMachine.getState().isRejecting()) {
validCommand.run();
return Outcome.CONSUMED;
}
executor.execute(validCommand);
return Outcome.DISPATCHED;
}
UnkeyedDispatcherImpl(Metalog.Config config) {
// this.config = config;
this.executor = new ThreadPoolExecutor(
1,
config.unkeyedThreadCount(),
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(config.unkeyedFairness())
);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
stateMachine = Idempotent.createStateMachine(config.contracts());
}
private void close() {
withClose(stateMachine, this::realClose);
}
private void realClose() {
// final Instant start = Instant.now();
initiateShutdown();
//noinspection LoopStatementThatDoesntLoop
while (!checkIfShutdown()) {
// final Duration duration = Duration.between(start, Instant.now());
// if (isTimeToGiveUp(duration)) {
// forceShutdown();
return;
// }
}
}
// private boolean isTimeToGiveUp(Duration duration) {
// return duration.compareTo(config.shutdownTimeout()) > 0;
// }
private void initiateShutdown() {
executor.shutdown();
}
private boolean checkIfShutdown() {
final AtomicBoolean shutdown = new AtomicBoolean(false);
runWithIgnore(() -> shutdown.set(executor.awaitTermination(1, MILLISECONDS)));
return shutdown.get();
}
// private void forceShutdown() {
// executor.shutdownNow();
// }
private final StateMachine<Idempotent> stateMachine;
// private final Metalog.Config config;
private final ThreadPoolExecutor executor;
}