ConsoleImpl.java
package io.github.jonloucks.metalog.impl;
import io.github.jonloucks.concurrency.api.Idempotent;
import io.github.jonloucks.concurrency.api.StateMachine;
import io.github.jonloucks.contracts.api.AutoClose;
import io.github.jonloucks.contracts.api.AutoOpen;
import io.github.jonloucks.metalog.api.*;
import java.io.PrintStream;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static io.github.jonloucks.concurrency.api.Idempotent.withClose;
import static io.github.jonloucks.concurrency.api.Idempotent.withOpen;
import static io.github.jonloucks.contracts.api.Checks.builderConsumerCheck;
import static io.github.jonloucks.contracts.api.Checks.configCheck;
import static io.github.jonloucks.metalog.impl.Internal.logCheck;
import static io.github.jonloucks.metalog.impl.Internal.metaCheck;
import static java.util.Optional.ofNullable;
final class ConsoleImpl implements Console, AutoOpen {
@Override
public Outcome output(Log log) {
return publish(log, outputMeta);
}
@Override
public Outcome error(Log log) {
return publish(log, errorMeta);
}
@Override
public Outcome publish(Log log) {
return publish(log, outputMeta);
}
@Override
public Outcome publish(Log log, Meta meta) {
if (test(meta)) {
final Outcome outcome = metalog.publish(log, meta);
if (outcome == Outcome.REJECTED) {
// directly process message
return receive(log,meta);
}
return outcome;
} else {
return Outcome.SKIPPED;
}
}
@Override
public Outcome publish(Log log, Consumer<Meta.Builder<?>> builderConsumer) {
final Meta.Builder<?> metaBuilder = metaFactory.get();
metaBuilder.channel(CONSOLE_OUTPUT_CHANNEL);
builderConsumerCheck(builderConsumer).accept(metaBuilder);
metaBuilder.key(CONSOLE_KEY);
return publish(logCheck(log), metaBuilder);
}
@Override
public Outcome receive(Log log, Meta meta) {
final Log validLog = logCheck(log);
final Meta validMeta = metaCheck(meta);
if (test(validMeta)) {
return getPrintStream(validMeta).map(toPrint(validLog, validMeta)).orElse(Outcome.SKIPPED);
}
return Outcome.SKIPPED;
}
@Override
public Optional<String> getKey() {
return Optional.of(CONSOLE_KEY);
}
private static Function<PrintStream, Outcome> toPrint(Log log, Meta meta) {
return printStream -> {
final CharSequence text = log.get();
if (meta.newLine()) {
printStream.println(text);
} else {
printStream.print(text);
}
return Outcome.CONSUMED;
};
}
private Optional<PrintStream> getPrintStream(Meta meta) {
return ofNullable(channelPrintStreamMap.get(meta.getChannel()));
}
@Override
public AutoClose addFilter(Predicate<Meta> filter) {
return filters.addFilter(filter);
}
@Override
public boolean test(Meta meta) {
return isSupported(metaCheck(meta)) && filters.test(meta);
}
@Override
public AutoClose open() {
return withOpen(stateMachine, this::realOpen);
}
ConsoleImpl(Metalog.Config config) {
this.config = configCheck(config);
this.stateMachine = Idempotent.createStateMachine(config.contracts());
this.metaFactory = config.contracts().claim(Meta.Builder.FACTORY);
this.errorMeta = metaFactory.get().key(CONSOLE_KEY).channel(CONSOLE_ERROR_CHANNEL);
this.outputMeta = metaFactory.get().key(CONSOLE_KEY).channel(CONSOLE_OUTPUT_CHANNEL);
fillChannelPrintStreamMap();
}
private void fillChannelPrintStreamMap() {
channelPrintStreamMap.put(SYSTEM_OUT_CHANNEL, System.out);
channelPrintStreamMap.put(SYSTEM_ERR_CHANNEL, System.err);
channelPrintStreamMap.put(CONSOLE_OUTPUT_CHANNEL, System.out);
channelPrintStreamMap.put(CONSOLE_ERROR_CHANNEL, System.err);
}
private AutoClose realOpen() {
metalog = config.contracts().claim(Metalog.CONTRACT);
closeSubscription = config.contracts().claim(Metalog.CONTRACT).subscribe(this);
return this::close;
}
private void close() {
withClose(stateMachine, this::realClose);
}
private void realClose() {
ofNullable(closeSubscription).ifPresent(close -> {
closeSubscription = null;
close.close();
});
}
private boolean isSupported(Meta meta) {
return channelPrintStreamMap.containsKey(meta.getChannel());
}
private static final String CONSOLE_KEY = "Console";
private static final String CONSOLE_OUTPUT_CHANNEL = "Console.output";
private static final String CONSOLE_ERROR_CHANNEL = "Console.error";
private static final String SYSTEM_ERR_CHANNEL = "System.err";
private static final String SYSTEM_OUT_CHANNEL = "System.out";
private final Map<String, PrintStream> channelPrintStreamMap = new HashMap<>();
private final Filterable filters = new FiltersImpl();
private final StateMachine<Idempotent> stateMachine;
private final Metalog.Config config;
private final Meta errorMeta;
private final Meta outputMeta;
private Metalog metalog;
private AutoClose closeSubscription;
private final Supplier<Meta.Builder<?>> metaFactory;
}