MetalogImpl.java
package io.github.jonloucks.metalog.impl;
import io.github.jonloucks.concurrency.api.Idempotent;
import io.github.jonloucks.concurrency.api.StateMachine;
import io.github.jonloucks.contracts.api.*;
import io.github.jonloucks.metalog.api.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static io.github.jonloucks.concurrency.api.Idempotent.withClose;
import static io.github.jonloucks.concurrency.api.Idempotent.withOpen;
import static io.github.jonloucks.contracts.api.Checks.*;
import static io.github.jonloucks.metalog.impl.Internal.*;
final class MetalogImpl implements Metalog {
@Override
public Outcome publish(Log log, Meta meta) {
final Log validLog = new InvokeOnlyOnce(log);
final Meta validMeta = metaCheck(meta);
if (stateMachine.getState().isRejecting()) {
return Outcome.REJECTED;
}
if (test(validMeta)) {
if (shouldTransmitNow(validMeta)) {
return transmitNow(validLog, validMeta);
} else {
return transmitLater(validMeta, validLog);
}
} else {
return Outcome.SKIPPED;
}
}
@Override
public Outcome publish(Log log, Consumer<Meta.Builder<?>> builderConsumer) {
final Meta.Builder<?> metaBuilder = metaFactory.get();
builderConsumerCheck(builderConsumer).accept(metaBuilder);
return publish(logCheck(log), metaBuilder);
}
@Override
public AutoClose addFilter(Predicate<Meta> filter) {
return filters.addFilter(filter);
}
@Override
public boolean test(Meta meta) {
final Meta validMeta = metaCheck(meta);
return filters.test(validMeta) && subscribers.stream().anyMatch(matcher(validMeta));
}
@Override
public AutoClose subscribe(Subscriber subscriber) {
final Subscriber validSubscriber = subscriberCheck(subscriber);
subscribers.add(validSubscriber);
mainDispatcher.registerSubscriber(validSubscriber);
return () -> removeSubscriber(validSubscriber);
}
@Override
public AutoClose open() {
return withOpen(stateMachine, this::realOpen);
}
MetalogImpl(Config config, Repository repository, boolean openRepository) {
final Repository validRepository = nullCheck(repository, "Repository must be present.");
this.config = configCheck(config);
this.closeRepository = openRepository ? validRepository.open() : AutoClose.NONE;
this.stateMachine = Idempotent.createStateMachine(config.contracts());
}
private AutoClose realOpen() {
metaFactory = config.contracts().claim(Meta.Builder.FACTORY);
this.mainDispatcher = config.contracts().claim(MainDispatcher.CONTRACT);
activateConsole();
return this::close;
}
private Predicate<Subscriber> matcher(Meta meta) {
if (config.keyedSubscription()) {
final Optional<String> optionalKey = meta.getKey();
if (optionalKey.isPresent()) {
final String key = optionalKey.get();
return s -> s.getKey().isPresent() && key.equals(s.getKey().get()) && s.test(meta);
} else {
return s -> !s.getKey().isPresent() && s.test(meta);
}
} else {
return s -> s.test(meta); // legacy, subscriptions can receive any log
}
}
private void activateConsole() {
config.contracts().claim(Console.CONTRACT);
}
private void close() {
withClose(stateMachine, this::realClose);
}
private void realClose() {
closeRepository.close();
}
private boolean shouldTransmitNow(Meta meta) {
return meta.isBlocking();
}
private Outcome transmitLater(Meta meta, Log log) {
return forSubscriber(meta, s -> mainDispatcher.dispatch(meta, ()-> guardedDelivery(s, log, meta)));
}
private Outcome forSubscriber(Meta meta, Function<Subscriber,Outcome> consumer) {
final AtomicReference<Outcome> outcomeReference = new AtomicReference<>(Outcome.SKIPPED);
subscribers.stream().filter(matcher(meta)).forEach( subscriber -> {
final Outcome outcome = consumer.apply(subscriber);
if (outcome != Outcome.SKIPPED) {
outcomeReference.set(outcome);
}
});
return outcomeReference.get();
}
private Outcome transmitNow(Log log, Meta meta) {
return forSubscriber(meta, s -> guardedDelivery(s, log, meta));
}
private void removeSubscriber(Subscriber subscriber) {
if (subscribers.removeIf( x -> x == subscriber)) {
mainDispatcher.unregisterSubscriber(subscriber);
}
}
private final Config config;
private final StateMachine<Idempotent> stateMachine;
private final AutoClose closeRepository;
private final List<Subscriber> subscribers = new CopyOnWriteArrayList<>();
private final Filterable filters = new FiltersImpl();
private MainDispatcher mainDispatcher;
private Supplier<Meta.Builder<?>> metaFactory;
}