MainDispatcherImpl.java

package io.github.jonloucks.metalog.impl;

import io.github.jonloucks.contracts.api.AutoClose;
import io.github.jonloucks.contracts.api.Contract;
import io.github.jonloucks.contracts.api.Repository;
import io.github.jonloucks.metalog.api.*;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static io.github.jonloucks.contracts.api.GlobalContracts.lifeCycle;
import static io.github.jonloucks.metalog.impl.Internal.*;
import static java.util.Optional.ofNullable;

final class MainDispatcherImpl implements MainDispatcher {

    @Override
    public Outcome dispatch(Meta meta, Runnable job) {
        final Optional<Dispatcher> foundDispatcher = findDispatcher(meta);
        if (foundDispatcher.isPresent()) {
            return foundDispatcher.get().dispatch(meta, job);
        }
        return Outcome.SKIPPED;
    }
    
    @Override
    public AutoClose open() {
        return AutoClose.NONE;
    }
    
    @Override
    public void registerSubscriber(Subscriber subscriber) {
        if (config.keyedSubscription()) {
            final Subscriber validSubscriber = subscriberCheck(subscriber);
            final String key = validSubscriber.getKey().orElse("");
            incrementKeyUsage(key);
        }
    }
    
    @Override
    public void unregisterSubscriber(Subscriber subscriber) {
        if (config.keyedSubscription()) {
            final Subscriber validSubscriber = subscriberCheck(subscriber);
            final String key = validSubscriber.getKey().orElse("");
            decrementKeyUsage(key);
        }
    }
    
    MainDispatcherImpl(Metalog.Config config, Repository repository) {
        this.config = config;
        this.repository = repository;
    }
    
    private Optional<Dispatcher> findDispatcher(Meta meta) {
        final Meta validMeta = metaCheck(meta);
        final String key = validMeta.getKey().orElse("");
        if (config.keyedSubscription()) {
            return ofNullable(keySmartMap.get(key)).flatMap(Smart::get);
        } else {
            return keySmartMap.computeIfAbsent(key, k -> {
                final Smart smart = new Smart(k);
                smart.incrementUsage();
                return smart;
            }).get();
        }
    }
    
    private void incrementKeyUsage(String key) {
        keySmartMap.computeIfAbsent(keyCheck(key), Smart::new).incrementUsage();
    }
    
    private void decrementKeyUsage(String key) {
        ofNullable(keySmartMap.get(keyCheck(key))).ifPresent(Smart::decrementUsage);
    }
    
    private Supplier<Dispatcher> getDispatcherFactory(String key) {
        return config.contracts().claim(key.isEmpty() ? UNKEYED_FACTORY : KEYED_FACTORY);
    }
    
    private final class Smart {
        Smart(String key) {
            this.key = key;
            this.contract = Contract.create(Dispatcher.class, n -> n.name("Dispatcher " + key));
        }
        
        Optional<Dispatcher> get() {
            final Optional<Dispatcher> optionalDispatcher = ofNullable(dispatcher);
            if (optionalDispatcher.isPresent()) {
                return optionalDispatcher;
            }
            return acquire();
        }
        
        private synchronized Optional<Dispatcher> acquire() {
            final Optional<Dispatcher> optionalDispatcher = ofNullable(dispatcher);
            if (optionalDispatcher.isPresent()) {
                return optionalDispatcher;
            }
            closeBinding = repository.store(contract, lifeCycle(getDispatcherFactory(key)::get));
            dispatcher = config.contracts().claim(contract);
            
            return Optional.of(dispatcher);
        }
        
        synchronized void incrementUsage() {
            ++usageCount;
        }
        
        synchronized void decrementUsage() {
            if (--usageCount == 0) {
                ofNullable(closeBinding).ifPresent(AutoClose::close);
                keySmartMap.remove(key, this);
            }
        }
        
        private final String key;
        private final Contract<Dispatcher> contract;
        private volatile Dispatcher dispatcher;
        private int usageCount = 0;
        private AutoClose closeBinding;
    }
    
    private final Metalog.Config config;
    private final Repository repository;
    private final ConcurrentHashMap<String, Smart> keySmartMap = new ConcurrentHashMap<>();
}