WaitableImpl.java

package io.github.jonloucks.concurrency.impl;

import io.github.jonloucks.concurrency.api.Waitable;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static io.github.jonloucks.concurrency.impl.Internal.*;
import static io.github.jonloucks.contracts.api.Checks.nullCheck;

final class WaitableImpl<T> implements Waitable<T> {

    @Override
    public void shutdown() {
        synchronized (simpleLock) {
            isShutdown = true;
            wakeUpWaitingThreads();
        }
    }
    
    @Override
    public Optional<T> getWhen(Predicate<T> predicate) {
        return getWhen(predicate, Duration.ofSeconds(Long.MAX_VALUE));
    }
    
    @Override
    public Optional<T> getWhen(Predicate<T> predicate, Duration timeout) {
        final Predicate<T> validPredicate = predicateCheck(predicate);
        final Duration validDuration = timeoutCheck(timeout);
        
        synchronized (simpleLock) {
            final T currentValue = reference.get();
            if (validPredicate.test(currentValue)) {
                return Optional.of(currentValue);
            } else if (isShutdown || validDuration.isZero()) {
                return Optional.empty();
            }
            return waitForLoop(validPredicate, validDuration);
        }
    }
    
    @Override
    public void accept(T value) {
        final T validValue = valueCheck(value);
        synchronized (simpleLock) {
            setAndNotifyIfChanged(validValue);
        }
    }

    @Override
    public Optional<T> acceptIf(Predicate<T> predicate, T value) {
        final T validValue = valueCheck(value);
        final Predicate<T> validPredicate = predicateCheck(predicate);
        
        synchronized (simpleLock) {
            final T currentValue = reference.get();
            if (validPredicate.test(currentValue)) {
                setAndNotifyIfChanged(validValue);
                return Optional.of(currentValue);
            } else {
                return Optional.empty();
            }
        }
    }
    
    @Override
    public Optional<T> getIf(Predicate<T> predicate) {
        final Predicate<T> validPredicate = predicateCheck(predicate);
        synchronized (simpleLock) {
            final T currentValue = reference.get();
            return validPredicate.test(currentValue) ? Optional.of(currentValue) : Optional.empty();
        }
    }
    
    @Override
    public T get() {
        synchronized (simpleLock) {
            return reference.get();
        }
    }
    
    WaitableImpl(T initialValue) {
        reference.set(valueCheck(initialValue));
    }
    
    private Optional<T> waitForLoop(Predicate<T> predicate, Duration timeout) {
        final Instant start = Instant.now();
        do {
            runWithIgnore(()-> simpleLock.wait(getWaitMillis(timeout, start)));
            final T value = reference.get();
            if (predicate.test(value)) {
                return Optional.of(value);
            }
        } while (shouldKeepWaiting(timeout, start));
        
        return Optional.empty();
    }
    
    private static long getWaitMillis(Duration timeout, Instant start) {
        return Long.max(1, timeout.minus(Duration.between(start, Instant.now())).toMillis());
    }
    
    private boolean shouldKeepWaiting(Duration timeout, Instant start) {
        if (isShutdown) {
            return false;
        }
        return Duration.between(start, Instant.now()).compareTo(timeout) < 0;
    }
    
    private void setAndNotifyIfChanged(T newValue) {
        final T oldValue = reference.getAndSet(newValue);
        if (oldValue != newValue) {
            wakeUpWaitingThreads();
        }
    }
    
    private void wakeUpWaitingThreads() {
        simpleLock.notifyAll();
    }
    
    private static <T> T valueCheck(T t) {
        return nullCheck(t, "Value must be present.");
    }
    
    private final Object simpleLock = new Object();
    private final AtomicReference<T> reference = new AtomicReference<>();
    private volatile boolean isShutdown = false;
}