/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.AbstractMediator;
import io.smallrye.reactive.messaging.ChannelRegistar;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.MediatorFactory;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.PublisherDecorator;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.WeavingException;
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.extension.AbstractEmitter;
import io.smallrye.reactive.messaging.extension.CollectedMediatorMetadata;
import io.smallrye.reactive.messaging.extension.EmitterConfiguration;
import io.smallrye.reactive.messaging.extension.EmitterImpl;
import io.smallrye.reactive.messaging.extension.HealthCenter;
import io.smallrye.reactive.messaging.extension.LazySource;
import io.smallrye.reactive.messaging.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.i18n.ProviderLogging;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.spi.Contextual;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.DeploymentException;
import javax.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

@ApplicationScoped
public class MediatorManager {
    private static final int DEFAULT_BUFFER_SIZE = 128;
    public static final String STRICT_MODE_PROPERTY = "smallrye-messaging-strict-binding";
    private final boolean strictMode = Boolean.parseBoolean(System.getProperty("smallrye-messaging-strict-binding", "false"));
    private final CollectedMediatorMetadata collected = new CollectedMediatorMetadata();
    private final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
    private final List<AbstractMediator> mediators = new ArrayList<AbstractMediator>();
    @Inject
    @ConfigProperty(name="mp.messaging.emitter.default-buffer-size", defaultValue="128")
    int defaultBufferSize;
    @Inject
    @ConfigProperty(name="smallrye.messaging.emitter.default-buffer-size", defaultValue="128")
    @Deprecated
    int defaultBufferSizeLegacy;
    @Inject
    @Any
    Instance<ChannelRegistar> streamRegistars;
    @Inject
    MediatorFactory mediatorFactory;
    @Inject
    ChannelRegistry channelRegistry;
    @Inject
    BeanManager beanManager;
    @Inject
    WorkerPoolRegistry workerPoolRegistry;
    @Inject
    Instance<PublisherDecorator> decorators;
    @Inject
    Instance<MessageConverter> converters;
    @Inject
    HealthCenter health;
    private volatile boolean initialized;

    public MediatorManager() {
        if (this.strictMode) {
            ProviderLogging.log.strictModeEnabled();
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
        ProviderLogging.log.scanningType(annotatedType.getJavaClass());
        Set methods = annotatedType.getMethods();
        methods.stream().filter(this::hasMediatorAnnotations).forEach(method -> this.collected.add(method.getJavaMember(), bean));
    }

    private <T> boolean hasMediatorAnnotations(AnnotatedMethod<? super T> method) {
        return method.isAnnotationPresent(Incomings.class) || method.isAnnotationPresent(Incoming.class) || method.isAnnotationPresent(Outgoing.class);
    }

    private boolean hasMediatorAnnotations(Method m) {
        return m.isAnnotationPresent(Incomings.class) || m.isAnnotationPresent(Incoming.class) || m.isAnnotationPresent(Outgoing.class);
    }

    public <T> void analyze(Class<?> beanClass, Bean<T> bean) {
        for (Class<?> current = beanClass; current != Object.class; current = current.getSuperclass()) {
            Arrays.stream(current.getDeclaredMethods()).filter(this::hasMediatorAnnotations).forEach(m -> this.collected.add((Method)m, bean));
        }
    }

    public void addAnalyzed(Collection<? extends MediatorConfiguration> mediators) {
        this.collected.addAll(mediators);
    }

    @PreDestroy
    void shutdown() {
        ProviderLogging.log.cancelSubscriptions();
        this.subscriptions.forEach(Subscription::cancel);
        this.subscriptions.clear();
    }

    public void initializeAndRun() {
        if (this.initialized) {
            throw ProviderExceptions.ex.illegalStateForMediatorManagerAlreadyInitialized();
        }
        ProviderLogging.log.deploymentDoneStartProcessing();
        this.streamRegistars.stream().forEach(ChannelRegistar::initialize);
        Set unmanagedSubscribers = this.channelRegistry.getOutgoingNames();
        ProviderLogging.log.initializingMediators();
        this.collected.mediators().forEach(configuration -> {
            AbstractMediator mediator = this.createMediator((MediatorConfiguration)configuration);
            ProviderLogging.log.initializingMethod(mediator.getMethodAsString());
            mediator.setDecorators(this.decorators);
            mediator.setConverters(this.converters);
            mediator.setHealth(this.health);
            mediator.setWorkerPoolRegistry(this.workerPoolRegistry);
            try {
                Object beanInstance = this.beanManager.getReference(configuration.getBean(), Object.class, this.beanManager.createCreationalContext((Contextual)configuration.getBean()));
                if (configuration.getInvokerClass() != null) {
                    try {
                        Constructor constructorUsingBeanInstance = configuration.getInvokerClass().getConstructor(Object.class);
                        if (constructorUsingBeanInstance != null) {
                            mediator.setInvoker((Invoker)constructorUsingBeanInstance.newInstance(beanInstance));
                        } else {
                            mediator.setInvoker((Invoker)configuration.getInvokerClass().getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
                        }
                    }
                    catch (IllegalAccessException | InstantiationException e) {
                        ProviderLogging.log.unableToCreateInvoker(configuration.getInvokerClass(), e);
                        return;
                    }
                }
                mediator.initialize(beanInstance);
            }
            catch (Throwable e) {
                ProviderLogging.log.unableToInitializeMediator(mediator.getMethodAsString(), e);
                return;
            }
            if (mediator.getConfiguration().shape() == Shape.PUBLISHER) {
                ProviderLogging.log.registeringAsPublisher(mediator.getConfiguration().methodAsString(), mediator.getConfiguration().getOutgoing());
                this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
            }
            if (mediator.getConfiguration().shape() == Shape.SUBSCRIBER) {
                List list = mediator.getConfiguration().getIncoming();
                ProviderLogging.log.registeringAsSubscriber(mediator.getConfiguration().methodAsString(), list);
                for (String l : list) {
                    this.channelRegistry.register(l, mediator.getComputedSubscriber());
                }
            }
        });
        try {
            this.weaving(unmanagedSubscribers);
        }
        catch (WeavingException e) {
            throw new DeploymentException((Throwable)e);
        }
    }

    private void weaving(Set<String> unmanagedSubscribers) {
        ProviderLogging.log.connectingMediators();
        List<AbstractMediator> unsatisfied = this.getAllNonSatisfiedMediators();
        ArrayList lazy = new ArrayList();
        while (!unsatisfied.isEmpty()) {
            int numberOfUnsatisfiedBeforeLoop = unsatisfied.size();
            unsatisfied.forEach(mediator -> {
                ProviderLogging.log.attemptToResolve(mediator.getMethodAsString());
                List list = mediator.configuration().getIncoming();
                if (list.size() == 1) {
                    List sources = this.channelRegistry.getPublishers((String)list.get(0));
                    Optional<PublisherBuilder<Message<?>>> maybeSource = this.getAggregatedSource(sources, (String)list.get(0), (AbstractMediator)mediator, lazy);
                    maybeSource.ifPresent(publisher -> {
                        mediator.connectToUpstream((PublisherBuilder<? extends Message<?>>)publisher);
                        ProviderLogging.log.connectingTo(mediator.getMethodAsString(), list, (PublisherBuilder)publisher);
                        if (mediator.configuration().getOutgoing() != null) {
                            this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
                        }
                    });
                } else {
                    ArrayList upstreams = new ArrayList();
                    for (String sn : list) {
                        List sources = this.channelRegistry.getPublishers(sn);
                        Optional<PublisherBuilder<Message<?>>> maybeSource = this.getAggregatedSource(sources, sn, (AbstractMediator)mediator, lazy);
                        maybeSource.ifPresent(upstreams::add);
                    }
                    if (upstreams.size() == list.size()) {
                        Multi merged = Multi.createBy().merging().streams((Iterable)upstreams.stream().map(PublisherBuilder::buildRs).collect(Collectors.toList()));
                        mediator.connectToUpstream(ReactiveStreams.fromPublisher((Publisher)merged));
                        ProviderLogging.log.connectingTo(mediator.getMethodAsString(), list);
                        if (mediator.configuration().getOutgoing() != null) {
                            this.channelRegistry.register(mediator.getConfiguration().getOutgoing(), mediator.getStream());
                        }
                    }
                }
            });
            unsatisfied = this.getAllNonSatisfiedMediators();
            int numberOfUnsatisfiedAfterLoop = unsatisfied.size();
            if (numberOfUnsatisfiedAfterLoop != numberOfUnsatisfiedBeforeLoop) continue;
            if (this.strictMode) {
                throw ProviderExceptions.ex.weavingImposibleToBind(unsatisfied.stream().map(m -> m.configuration().methodAsString()).collect(Collectors.toList()), this.channelRegistry.getIncomingNames(), this.channelRegistry.getEmitterNames());
            }
            ProviderLogging.log.impossibleToBindMediators(unsatisfied.stream().map(m -> m.configuration().methodAsString()).collect(Collectors.toList()), this.channelRegistry.getIncomingNames(), this.channelRegistry.getEmitterNames());
            break;
        }
        lazy.forEach(l -> l.configure(this.channelRegistry));
        this.mediators.stream().filter(m -> m.configuration().shape() == Shape.SUBSCRIBER).filter(AbstractMediator::isConnected).forEach(AbstractMediator::run);
        for (String name : unmanagedSubscribers) {
            List<AbstractMediator> list = this.lookupForMediatorsWithMatchingDownstream(name);
            AbstractEmitter emitter = (AbstractEmitter)this.channelRegistry.getEmitter(name);
            List subscribers = this.channelRegistry.getSubscribers(name);
            for (AbstractMediator mediator2 : list) {
                if (subscribers.size() == 1) {
                    ProviderLogging.log.connectingMethodToSink(mediator2.getMethodAsString(), name);
                    mediator2.getStream().to((SubscriberBuilder)subscribers.get(0)).run();
                    continue;
                }
                if (subscribers.size() <= 2) continue;
                ProviderLogging.log.numberOfSubscribersConsumingStream(subscribers.size(), name);
                subscribers.forEach(s -> {
                    ProviderLogging.log.connectingMethodToSink(mediator2.getMethodAsString(), name);
                    mediator2.getStream().to(s).run();
                });
            }
            if (!list.isEmpty() || emitter == null) continue;
            if (subscribers.size() == 1) {
                ProviderLogging.log.connectingEmitterToSink(name);
                ReactiveStreams.fromPublisher(emitter.getPublisher()).to((SubscriberBuilder)subscribers.get(0)).run();
                continue;
            }
            if (subscribers.size() <= 2) continue;
            ProviderLogging.log.numberOfSubscribersConsumingStream(subscribers.size(), name);
            subscribers.forEach(s -> {
                ProviderLogging.log.connectingEmitterToSink(name);
                ReactiveStreams.fromPublisher(emitter.getPublisher()).to(s).run();
            });
        }
        this.initialized = true;
    }

    private List<AbstractMediator> lookupForMediatorsWithMatchingDownstream(String name) {
        return this.mediators.stream().filter(m -> m.configuration().getOutgoing() != null).filter(m -> m.configuration().getOutgoing().equalsIgnoreCase(name)).collect(Collectors.toList());
    }

    private List<AbstractMediator> getAllNonSatisfiedMediators() {
        return this.mediators.stream().filter(mediator -> !mediator.isConnected()).collect(Collectors.toList());
    }

    private AbstractMediator createMediator(MediatorConfiguration configuration) {
        AbstractMediator mediator = this.mediatorFactory.create(configuration);
        ProviderLogging.log.mediatorCreated(configuration.methodAsString());
        this.mediators.add(mediator);
        return mediator;
    }

    private Optional<PublisherBuilder<? extends Message<?>>> getAggregatedSource(List<PublisherBuilder<? extends Message<?>>> sources, String sourceName, AbstractMediator mediator, List<LazySource> lazy) {
        if (sources.isEmpty()) {
            return Optional.empty();
        }
        Merge.Mode merge = mediator.getConfiguration().getMerge();
        if (merge != null) {
            LazySource lazySource = new LazySource(sourceName, merge);
            lazy.add(lazySource);
            return Optional.of(ReactiveStreams.fromPublisher((Publisher)lazySource));
        }
        if (sources.size() > 1) {
            throw new WeavingException(sourceName, mediator.getMethodAsString(), sources.size());
        }
        return Optional.of(sources.get(0));
    }

    public void initializeEmitters(List<EmitterConfiguration> emitters) {
        for (EmitterConfiguration config : emitters) {
            int bufferSize = this.getDefaultBufferSize();
            this.initializeEmitter(config, bufferSize);
        }
    }

    private int getDefaultBufferSize() {
        if (this.defaultBufferSize == 128 && this.defaultBufferSizeLegacy != 128) {
            return this.defaultBufferSizeLegacy;
        }
        return this.defaultBufferSize;
    }

    public void initializeEmitter(EmitterConfiguration emitterConfiguration, long defaultBufferSize) {
        Publisher publisher;
        if (emitterConfiguration.isMutinyEmitter) {
            MutinyEmitterImpl mutinyEmitter = new MutinyEmitterImpl(emitterConfiguration, defaultBufferSize);
            publisher = mutinyEmitter.getPublisher();
            this.channelRegistry.register(emitterConfiguration.name, mutinyEmitter);
        } else {
            EmitterImpl emitter = new EmitterImpl(emitterConfiguration, defaultBufferSize);
            publisher = emitter.getPublisher();
            this.channelRegistry.register(emitterConfiguration.name, emitter);
        }
        this.channelRegistry.register(emitterConfiguration.name, ReactiveStreams.fromPublisher(publisher));
    }
}

