/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.MqttConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessageMetadata;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import jakarta.enterprise.inject.Instance;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;

public class MqttSink {
    private final String channel;
    private final String topic;
    private final int qos;
    private final boolean healthEnabled;
    private final Flow.Subscriber<? extends Message<?>> sink;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean alive = new AtomicBoolean();
    private final AtomicReference<Clients.ClientHolder> reference = new AtomicReference();

    public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config, Instance<MqttClientSessionOptions> instances) {
        MqttClientSessionOptions options = MqttHelpers.createClientOptions(config, instances);
        this.channel = config.getChannel();
        this.topic = config.getTopic().orElse(this.channel);
        this.qos = config.getQos();
        this.healthEnabled = config.getHealthEnabled();
        this.sink = MultiUtils.via(m -> m.onSubscription().call(() -> {
            Clients.ClientHolder client = this.reference.get();
            if (client == null) {
                client = Clients.getHolder(vertx, options);
                this.reference.set(client);
            }
            return AsyncResultUni.toUni(h -> this.reference.get().start().onComplete(h)).onItem().invoke(() -> {
                this.started.set(true);
                this.alive.set(true);
            });
        }).onItem().transformToUniAndConcatenate(this::send).onCompletion().invoke(() -> {
            Clients.ClientHolder c = this.reference.getAndSet(null);
            if (c != null) {
                c.close();
            }
            this.alive.set(false);
        }).onFailure().invoke(e -> {
            this.alive.set(false);
            MqttLogging.log.errorWhileSendingMessageToBroker((Throwable)e);
        }));
    }

    private Uni<? extends Message<?>> send(Message<?> msg) {
        boolean isRetain;
        MqttQoS actualQoS;
        String actualTopicToBeUsed;
        MqttClientSession client = this.reference.get().getClient();
        Optional metadata = msg.getMetadata(SendingMqttMessageMetadata.class);
        if (metadata.isPresent()) {
            SendingMqttMessageMetadata mm = (SendingMqttMessageMetadata)metadata.get();
            actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
            actualQoS = mm.getQosLevel() == null ? MqttQoS.valueOf((int)this.qos) : mm.getQosLevel();
            isRetain = mm.isRetain();
        } else {
            actualTopicToBeUsed = this.topic;
            isRetain = false;
            actualQoS = MqttQoS.valueOf((int)this.qos);
        }
        if (actualTopicToBeUsed == null) {
            MqttLogging.log.ignoringNoTopicSet();
            return Uni.createFrom().item(msg);
        }
        return AsyncResultUni.toUni(h -> client.publish(actualTopicToBeUsed, this.convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain).onComplete(h)).onItemOrFailure().transformToUni((s, f) -> {
            if (f != null) {
                return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
            }
            OutgoingMessageMetadata.setResultOnMessage((Message)msg, (Object)s);
            return Uni.createFrom().completionStage(msg.ack().thenApply(x -> msg));
        });
    }

    private Buffer convert(Object payload) {
        if (payload == null) {
            return Buffer.buffer();
        }
        if (payload instanceof JsonObject) {
            return new Buffer(((JsonObject)payload).toBuffer());
        }
        if (payload instanceof JsonArray) {
            return new Buffer(((JsonArray)payload).toBuffer());
        }
        if (payload instanceof String || payload.getClass().isPrimitive()) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((String)payload.toString()));
        }
        if (payload instanceof byte[]) {
            return new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[])((byte[])payload)));
        }
        if (payload instanceof Buffer) {
            return (Buffer)payload;
        }
        if (payload instanceof io.vertx.core.buffer.Buffer) {
            return new Buffer((io.vertx.core.buffer.Buffer)payload);
        }
        return new Buffer(Json.encodeToBuffer((Object)payload));
    }

    public Flow.Subscriber<? extends Message<?>> getSink() {
        return this.sink;
    }

    private boolean isConnected() {
        return this.reference.get() != null && this.reference.get().getClient().isConnected();
    }

    public void isStarted(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.started.get());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.isConnected());
        }
    }

    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.healthEnabled) {
            builder.add(this.channel, this.alive.get());
        }
    }
}

