NotifyValueSubscription.java
package io.github.jonloucks.concurrency.impl;
import io.github.jonloucks.contracts.api.AutoClose;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import static io.github.jonloucks.concurrency.impl.Internal.*;
final class NotifyValueSubscription<T> {
void process(T value) {
if (isActive() && predicate.test(value)) {
listener.accept(value);
}
}
NotifyValueSubscription(Predicate<T> predicate, Consumer<T> listener, List<NotifyValueSubscription<T>> ownerList) {
this.predicate = predicateCheck(predicate);
this.listener = listenerCheck(listener);
this.ownerList = ownerList;
}
AutoClose open() {
ownerList.add(this);
return this::close;
}
void close() {
if (isClosed.compareAndSet(false, true)) {
removeExact(ownerList,this);
}
}
private boolean isActive() {
return !isClosed.get();
}
private final Predicate<T> predicate;
private final Consumer<T> listener;
private final List<NotifyValueSubscription<T>> ownerList;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
}