/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.reactivemessaging.http.runtime;

import io.quarkus.reactivemessaging.http.runtime.HttpMessage;
import io.quarkus.reactivemessaging.http.runtime.IncomingHttpMetadata;
import io.quarkus.reactivemessaging.http.runtime.ReactiveHandlerBeanBase;
import io.quarkus.reactivemessaging.http.runtime.StrictQueueSizeGuard;
import io.quarkus.reactivemessaging.http.runtime.config.HttpStreamConfig;
import io.quarkus.reactivemessaging.http.runtime.config.ReactiveHttpConfig;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import org.jboss.logging.Logger;

@Singleton
public class ReactiveHttpHandlerBean
extends ReactiveHandlerBeanBase<HttpStreamConfig, HttpMessage<?>> {
    private static final Logger log = Logger.getLogger(ReactiveHttpHandlerBean.class);
    @Inject
    ReactiveHttpConfig config;

    Multi<HttpMessage<?>> getProcessor(String path, HttpMethod method) {
        return ((ReactiveHandlerBeanBase.Bundle)this.processors.get(this.key(path, method))).getProcessor();
    }

    @Override
    protected Collection<HttpStreamConfig> configs() {
        return this.config.getHttpConfigs();
    }

    @Override
    protected String key(HttpStreamConfig streamConfig) {
        return this.key(streamConfig.path, streamConfig.method);
    }

    @Override
    protected String key(RoutingContext context) {
        return this.key(context.currentRoute().getPath(), context.request().method());
    }

    @Override
    protected String description(HttpStreamConfig streamConfig) {
        return String.format("path: %s, method %s", streamConfig.path, streamConfig.method);
    }

    @Override
    protected void handleRequest(RoutingContext event, MultiEmitter<? super HttpMessage<?>> emitter, StrictQueueSizeGuard guard, String path) {
        if (emitter == null) {
            this.onUnexpectedError(event, null, "No consumer subscribed for messages sent to Reactive Messaging HTTP endpoint on path: " + path);
        } else if (guard.prepareToEmit()) {
            try {
                HttpMessage<Buffer> message = new HttpMessage<Buffer>(event.getBody(), new IncomingHttpMetadata(event), () -> {
                    if (!event.response().ended()) {
                        event.response().setStatusCode(202).end();
                    }
                }, error -> this.onUnexpectedError(event, (Throwable)error, "Failed to process message."));
                emitter.emit(message);
            }
            catch (Exception any) {
                guard.dequeue();
                this.onUnexpectedError(event, any, "Emitting message failed");
            }
        } else {
            event.response().setStatusCode(503).end();
        }
    }

    private void onUnexpectedError(RoutingContext event, Throwable error, String message) {
        if (!event.response().ended()) {
            event.response().setStatusCode(500).end("Unexpected error while processing the message");
            log.error((Object)message, error);
        }
    }

    private String key(String path, HttpMethod method) {
        return String.format("%s:%s", path, method);
    }
}

