WaitableImpl.java

package io.github.jonloucks.concurrency.impl;

import io.github.jonloucks.concurrency.api.Waitable;
import io.github.jonloucks.contracts.api.AutoClose;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static io.github.jonloucks.concurrency.impl.Internal.*;
import static io.github.jonloucks.contracts.api.Checks.nullCheck;
import static java.util.Optional.ofNullable;

final class WaitableImpl<T> implements Waitable<T> {

    @Override
    public void shutdown() {
        synchronized (simpleLock) {
            isShutdown = true;
            notifyValueListeners.forEach(NotifyValueSubscription::close);
            wakeUpWaitingThreads();
        }
    }
    
    @Override
    public Optional<T> getWhen(Predicate<T> predicate, Duration timeout) {
        synchronized (simpleLock) {
            return waitUntilSatisfied(predicate, timeout, ()->{});
        }
    }
    
    @Override
    public AutoClose notifyIf(Predicate<T> predicate, Consumer<T> listener) {
        final NotifyValueSubscription<T> notifyValueListener = new NotifyValueSubscription<>(predicate, listener, notifyValueListeners);
        notifyValueListener.process(get());
        return notifyValueListener.open();
    }
    
    @Override
    public void accept(T value) {
        synchronized (simpleLock) {
            setValue(value);
        }
    }
    
    @Override
    public Optional<T> acceptIf(Predicate<T> predicate, T value) {
        return acceptIf(predicate, () -> value);
    }
    
    @Override
    public Optional<T> acceptIf(Predicate<T> predicate, Supplier<T> valueSupplier) {
        final Predicate<T> validPredicate = predicateCheck(predicate);
        final Supplier<T> validValueSupplier = valueSupplierCheck(valueSupplier);
        synchronized (simpleLock) {
            final T currentValue = reference.get();
            if (validPredicate.test(currentValue)) {
                setValue(validValueSupplier.get());
                return ofNullable(currentValue);
            } else {
                return Optional.empty();
            }
        }
    }
    
    @Override
    public Optional<T> acceptWhen(Predicate<T> predicate, Supplier<T> valueSupplier, Duration timeout) {
        final Supplier<T> supplier = valueSupplierCheck(valueSupplier);
        final Runnable setValue = () -> setValue(supplier.get());
        
        synchronized (simpleLock) {
            return waitUntilSatisfied(predicate, timeout, setValue);
        }
    }
    
    @Override
    public T get() {
        synchronized (simpleLock) {
            return reference.get();
        }
    }

    WaitableImpl(T initialValue) {
        reference.set(initialValue);
    }
    
    private Optional<T> waitUntilSatisfied(Predicate<T> predicate, Duration timeout, Runnable block) {
        final Predicate<T> validPredicate = predicateCheck(predicate);
        final Duration validTimeout = timeoutCheck(timeout);
        
        final Instant start = Instant.now();
        do {
            final T value = reference.get();
            if (validPredicate.test(value)) {
                block.run();
                return ofNullable(value);
            }
        } while (keepWaiting(validTimeout, start));
        
        return Optional.empty();
    }
    
    private boolean keepWaiting(Duration validTimeout, Instant start) {
        if (shouldKeepWaiting(validTimeout, start)) {
            runWithIgnore(() -> simpleLock.wait(getWaitMillis(validTimeout, start, Instant.now())));
            return true;
        } else {
            return false;
        }
    }
    
    private boolean shouldKeepWaiting(Duration timeout, Instant start) {
        return !isShutdown && !hasTimedOut(timeout, start, Instant.now());
    }
    
    private void setValue(T newValue) {
        final T oldValue = reference.getAndSet(newValue);
        if (oldValue != newValue) {
            wakeUpWaitingThreads();
            notifyListeners(newValue);
        }
    }
    
    private void notifyListeners(T newValue) {
        if (!notifyValueListeners.isEmpty()) {
            notifyValueListeners.forEach(n -> n.process(newValue));
        }
    }
    
    private void wakeUpWaitingThreads() {
        simpleLock.notifyAll();
    }

    private static <T> T valueSupplierCheck(T t) {
        return nullCheck(t, "Value supplier must be present.");
    }
    
    private final Object simpleLock = new Object();
    private final AtomicReference<T> reference = new AtomicReference<>();
    private volatile boolean isShutdown = false;
    private final List<NotifyValueSubscription<T>> notifyValueListeners = new CopyOnWriteArrayList<>();
}