NotifyCompletionSubscription.java

package io.github.jonloucks.concurrency.impl;

import io.github.jonloucks.concurrency.api.Completion;
import io.github.jonloucks.concurrency.api.OnCompletion;
import io.github.jonloucks.contracts.api.AutoClose;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.github.jonloucks.concurrency.impl.Internal.onCompletionCheck;
import static io.github.jonloucks.concurrency.impl.Internal.removeExact;

final class NotifyCompletionSubscription<T> implements OnCompletion<T> {

    NotifyCompletionSubscription(OnCompletion<T> referent, List<NotifyCompletionSubscription<T>> ownerList) {
        this.referent = onCompletionCheck(referent);
        this.ownerList = ownerList;
    }
    
    AutoClose open() {
        ownerList.add(this);
        return this::close;
    }
    
    void close() {
        if (isClosed.compareAndSet(false, true)) {
            removeExact(ownerList, this);
        }
    }
    
    @Override
    public void onCompletion(Completion<T> completion) {
        if (isActive()) {
            referent.onCompletion(completion);
        }
    }
    
    private boolean isActive() {
        return !isClosed.get();
    }
    
    private final OnCompletion<T> referent;
    private final List<NotifyCompletionSubscription<T>> ownerList;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
}