/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.runtime;

import io.quarkus.arc.AlternativePriority;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry;
import io.smallrye.reactive.messaging.providers.helpers.Validation;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.WorkerExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.event.Observes;
import javax.enterprise.event.Reception;
import javax.inject.Inject;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.LoggerFactory;

@AlternativePriority(value=1)
@ApplicationScoped
public class QuarkusWorkerPoolRegistry
extends WorkerPoolRegistry {
    private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
    private static final String WORKER_CONCURRENCY = "max-concurrency";
    @Inject
    ExecutionHolder executionHolder;
    private final Map<String, Integer> workerConcurrency = new HashMap<String, Integer>();
    private final Map<String, WorkerExecutor> workerExecutors = new ConcurrentHashMap<String, WorkerExecutor>();

    public void terminate(@Observes(notifyObserver=Reception.IF_EXISTS) @Priority(value=100) @BeforeDestroyed(value=ApplicationScoped.class) Object event) {
        if (!this.workerExecutors.isEmpty()) {
            for (WorkerExecutor executor : this.workerExecutors.values()) {
                executor.close();
            }
        }
    }

    public <T> Uni<T> executeWork(Context currentContext, Uni<T> uni, String workerName, boolean ordered) {
        Objects.requireNonNull(uni, "Action to execute not provided");
        if (workerName == null) {
            if (currentContext != null) {
                return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered);
            }
            return this.executionHolder.vertx().executeBlocking(uni, ordered);
        }
        if (currentContext != null) {
            return this.getWorker(workerName).executeBlocking(uni, ordered).onItemOrFailure().transformToUni((item, failure) -> Uni.createFrom().emitter(emitter -> {
                if (failure != null) {
                    currentContext.runOnContext(() -> emitter.fail(failure));
                } else {
                    currentContext.runOnContext(() -> emitter.complete(item));
                }
            }));
        }
        return this.getWorker(workerName).executeBlocking(uni, ordered);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public WorkerExecutor getWorker(String workerName) {
        Objects.requireNonNull(workerName, "Worker Name not specified");
        if (this.workerExecutors.containsKey(workerName)) {
            return this.workerExecutors.get(workerName);
        }
        if (this.workerConcurrency.containsKey(workerName)) {
            WorkerExecutor executor = this.workerExecutors.get(workerName);
            if (executor == null) {
                QuarkusWorkerPoolRegistry quarkusWorkerPoolRegistry = this;
                synchronized (quarkusWorkerPoolRegistry) {
                    executor = this.workerExecutors.get(workerName);
                    if (executor == null) {
                        executor = this.executionHolder.vertx().createSharedWorkerExecutor(workerName, this.workerConcurrency.get(workerName).intValue());
                        LoggerFactory.getLogger(WorkerPoolRegistry.class).info("Created worker pool named " + workerName + " with concurrency of " + this.workerConcurrency.get(workerName));
                        this.workerExecutors.put(workerName, executor);
                    }
                }
            }
            if (executor != null) {
                return executor;
            }
            throw new RuntimeException("Failed to create Worker for " + workerName);
        }
        throw new IllegalArgumentException("@Blocking referred to invalid worker name.");
    }

    public void defineWorker(String className, String method, String poolName) {
        Objects.requireNonNull(className, "className was empty");
        Objects.requireNonNull(method, "Method was empty");
        if (!poolName.equals("<no-value>")) {
            if (Validation.isBlank((String)poolName)) {
                throw this.getBlockingError(className, method, "value is blank or null");
            }
            String workerConfigKey = "smallrye.messaging.worker." + poolName + ".max-concurrency";
            Optional concurrency = ConfigProvider.getConfig().getOptionalValue(workerConfigKey, Integer.class);
            if (!concurrency.isPresent()) {
                throw this.getBlockingError(className, method, workerConfigKey + " was not defined");
            }
            this.workerConcurrency.put(poolName, (Integer)concurrency.get());
        }
    }

    private IllegalArgumentException getBlockingError(String className, String method, String message) {
        return new IllegalArgumentException("Invalid method annotated with @Blocking: " + className + "#" + method + " - " + message);
    }
}

