CompletableImpl.java

package io.github.jonloucks.concurrency.impl;

import io.github.jonloucks.concurrency.api.*;
import io.github.jonloucks.concurrency.api.Completion.State;
import io.github.jonloucks.contracts.api.AutoClose;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;

import static io.github.jonloucks.concurrency.api.Idempotent.withClose;
import static io.github.jonloucks.concurrency.api.Idempotent.withOpen;
import static io.github.jonloucks.concurrency.impl.Internal.completionCheck;
import static java.util.Optional.ofNullable;

final class CompletableImpl<T> implements Completable<T>, OnCompletion<T> {
    
    @Override
    public boolean isCompleted() {
        return completionStateMachine.getState().isCompleted();
    }
    
    @Override
    public WaitableNotify<State> notifyState() {
        return completionStateMachine;
    }
    
    @Override
    public WaitableNotify<T> notifyValue() {
        return waitableValue;
    }
    
    @Override
    public Optional<Completion<T>> getCompletion() {
        return ofNullable(completion);
    }
    
    @Override
    public void onCompletion(Completion<T> completion) {
        if (idempotentStateMachine.getState().isRejecting()) {
            throw new IllegalStateException("Completable must be open.");
        }
        final Completion<T> validCompletion = completionCheck(completion);
        if (completionStateMachine.setState("onCompletion", validCompletion.getState())) {
            this.completion = validCompletion;
            this.waitableValue.accept(validCompletion.getValue().orElse(null));
            subscriptions.forEach(s -> s.onCompletion(validCompletion));
        }
    }

    @Override
    public AutoClose notify(OnCompletion<T> onCompletion) {
        return new NotifyCompletionSubscription<>(onCompletion, subscriptions).open();
    }
    
    @Override
    public AutoClose open() {
        return withOpen(idempotentStateMachine, this::realOpen);
    }
  
    void close() {
        withClose(idempotentStateMachine, this::realClose);
    }
    
    CompletableImpl(Concurrency.Config concurrencyConfig, Completable.Config<T> ignored) {
        this.completionStateMachine = State.createStateMachine(concurrencyConfig.contracts());
        this.idempotentStateMachine = Idempotent.createStateMachine(concurrencyConfig.contracts());
        this.waitableValue = GlobalConcurrency.createWaitable(null);
    }
    
    private AutoClose realOpen() {
        return this::close;
    }
    
    private void realClose() {
        subscriptions.forEach(NotifyCompletionSubscription::close);
    }
    
    private final StateMachine<State> completionStateMachine;
    private final StateMachine<Idempotent> idempotentStateMachine;
    private Completion<T> completion;
    private final Waitable<T> waitableValue;
    private final List<NotifyCompletionSubscription<T>> subscriptions = new CopyOnWriteArrayList<>();
}