ContractsImpl.java

package io.github.jonloucks.contracts.impl;

import io.github.jonloucks.contracts.api.*;

import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import static io.github.jonloucks.contracts.api.Checks.*;
import static java.util.Optional.ofNullable;

/**
 * Implementation for {@link io.github.jonloucks.contracts.api.Contracts}
 * @see io.github.jonloucks.contracts.api.Contracts
 */
final class ContractsImpl implements Contracts {

    @Override
    public AutoClose open() {
        if (openState.transitionToOpen()) {
            closeRepository = repository.open();
            return this::close;
        }
        return AutoClose.NONE;
    }

    @Override
    public <T> T claim(Contract<T> contract) {
        final Contract<T> validContract = contractCheck(contract);
        final Optional<Promisor<?>> promisor = getFromPromisorMap(validContract);
        
        if (promisor.isPresent()) {
            return validContract.cast(promisor.get().demand());
        } else {
            return claimFromPartners(validContract);
        }
    }
    
    @Override
    public <T> boolean isBound(Contract<T> contract) {
        final Contract<?> validContract = contractCheck(contract);
        final Optional<Promisor<?>> promisor = getFromPromisorMap(validContract);
        
        return promisor.isPresent() || isAnyPartnerBound(contract);
    }
    
    @Override
    public <T> AutoClose bind(Contract<T> contract, Promisor<T> promisor, BindStrategy bindStrategy) {
        final Contract<T> validContract = contractCheck(contract);
        final Promisor<T> validPromisor = promisorCheck(promisor);
        final BindStrategy validBindStrategy = nullCheck(bindStrategy, "Bind strategy must be present.");
        
        return maybeBind(validContract, validPromisor, validBindStrategy);
    }
    
    ContractsImpl(Contracts.Config config) {
        final Contracts.Config validConfig = configCheck(config);
        
        // keeping the promises open permanently
        repository.keep(Promisors.CONTRACT, PromisorsImpl::new);
        repository.keep(Repository.FACTORY, () -> () -> new RepositoryImpl(this));
        
        partners.addAll(nullCheck(validConfig.getPartners(), "Partners must be present."));
        
        if (validConfig.useShutdownHooks()) {
            Runtime.getRuntime().addShutdownHook(new Thread(this::close));
        }
    }
    
    private void close() {
        if (openState.transitionToClosed()) {
            try {
                for (int attempts = 1, broken = breakAllBindings(); broken > 0; broken = breakAllBindings(), attempts++) {
                    if (attempts > 5) {
                        throw newCloseDidNotCompleteException();
                    }
                }
            } finally {
                ofNullable(closeRepository).ifPresent( close -> {
                    closeRepository = null;
                    close.close();
                });
                
            }
        }
    }
    
    private <T> AutoClose maybeBind(Contract<T> contract, Promisor<T> newPromisor, BindStrategy bindStrategy) {
        if (checkBind(contract, newPromisor, bindStrategy)) {
            return doBind(contract, newPromisor);
        } else {
            return AutoClose.NONE;
        }
    }
    
    private boolean checkBind(Contract<?> contract, Promisor<?> newPromisor, BindStrategy bindStrategy) {
        final Optional<Promisor<?>> optionalCurrent = getFromPromisorMap(contract);
        
        //noinspection OptionalIsPresent
        if (optionalCurrent.isPresent()) {
            return checkReplacement(contract, newPromisor, bindStrategy, optionalCurrent.get());
        } else {
            return true;
        }
    }
    
    private static boolean checkReplacement(Contract<?> contract, Promisor<?> newPromisor, BindStrategy bindStrategy, Promisor<?> currentPromisor) {
        // Double bind of same promisor, do not rebind
        if (currentPromisor == newPromisor) {
            return false;
        }
        
        switch (bindStrategy) {
            case ALWAYS:
                if (contract.isReplaceable()) {
                    return true;
                }
                throw newContractNotReplaceableException(contract);
            case IF_NOT_BOUND:
                return false;
            case IF_ALLOWED:
            default:
                return contract.isReplaceable();
        }
    }
    
    private <T> AutoClose doBind(Contract<T> contract, Promisor<T> promisor) {
        // Since ReentrantReadWriteLock does not support lock upgrade, there are opportunities
        // for changes by other threads between the reads and writes.
        // This is mitigated by always incrementing the new value and decrementing the old value.
        promisor.incrementUsage();
        return applyWithLock(mapLock.writeLock(), () -> {
            ofNullable(promisorMap.put(contract, promisor)).ifPresent(Promisor::decrementUsage);
            final IdempotentImpl breakBindingOnce = new IdempotentImpl();
            breakBindingOnce.transitionToOpen();
            return () -> {
                if (breakBindingOnce.transitionToClosed()) {
                    breakBinding(contract, promisor);
                }
            };
        });
    }
    
    private void breakBinding(Contract<?> contract, Promisor<?> promisor) {
        // it is possible the Contract has already been removed or updated with a new Promisor
        // Checking the removed promisor is required to avoid:
        //   1. Calling decrementUsage twice on Promisors already removed
        //   2. Not calling decrementUsage enough times
        // decrementing usage too many times.
        try {
            removeFromPromisorMap(contract, promisor);
        } finally {
            promisor.decrementUsage();
        }
    }
    
    private void removeFromPromisorMap(Contract<?> contract, Promisor<?> promisor) {
        applyWithLock(mapLock.writeLock(), () -> promisorMap.remove(contract, promisor));
    }
    
    private <T> Optional<Promisor<?>> getFromPromisorMap(Contract<T> validContract) {
        return ofNullable(applyWithLock(mapLock.readLock(), () -> promisorMap.get(validContract)));
    }
    
    private int breakAllBindings() {
        final Stack<Contract<?>> contracts = new Stack<>();
        final Stack<Promisor<?>> promisors = new Stack<>();
        
        final int contractCount = copyBindings(contracts, promisors);
        
        while (!contracts.isEmpty()) {
            breakBinding(contracts.pop(), promisors.pop());
        }
        return contractCount;
    }
    
    private int copyBindings(Stack<Contract<?>> contracts, Stack<Promisor<?>> promisors) {
        // During shutdown other threads should be able to acquire read and write locks
        // The following attains the write lock to attain all the current keys and values
        // in the reverse order from insertion.
        // The last to be inserted is the first to be removed.
        final AtomicInteger contractCount = new AtomicInteger();
        return applyWithLock(mapLock.writeLock(), () -> {
            promisorMap.forEach((contract, promisor) -> {
                contracts.push(contract);
                promisors.push(promisor);
                contractCount.incrementAndGet();
            });
            return contractCount.get();
        });
    }
    
    private <T> T claimFromPartners(Contract<T> contract) {
        if (!partners.isEmpty()) {
            for (Contracts partner : partners) {
                if (partner.isBound(contract)) {
                    return partner.claim(contract);
                }
            }
        }
        throw newContractNotPromisedException(contract);
    }
    
    private <T> boolean isAnyPartnerBound(Contract<T> contract) {
        if (!partners.isEmpty()) {
            return partners.stream().anyMatch(partner -> partner.isBound(contract));
        }
        return false;
    }
    
    private static <T> T applyWithLock(Lock requestedLock, Supplier<T> block) {
        requestedLock.lock();
        try {
            return block.get();
        } finally {
            requestedLock.unlock();
        }
    }
    
    private static ContractException newCloseDidNotCompleteException() {
        return new ContractException("Contracts failed to close after trying multiple times.");
    }
    
    private static <T> ContractException newContractNotPromisedException(Contract<T> contract) {
        return new ContractException("Contract " + contract + " was not promised.");
    }
    
    private static <T> ContractException newContractNotReplaceableException(Contract<T> contract) {
        return new ContractException("Contract " + contract + " is not replaceable.");
    }

    private final IdempotentImpl openState = new IdempotentImpl();
    private final ReentrantReadWriteLock mapLock = new ReentrantReadWriteLock();
    private final LinkedHashMap<Contract<?>, Promisor<?>> promisorMap = new LinkedHashMap<>();
    private final RepositoryImpl repository = new RepositoryImpl(this);
    private final List<Contracts> partners = new ArrayList<>();
    private AutoClose closeRepository;
}