/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.mqtt;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mutiny.core.Vertx;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class Clients {
    private static final Map<String, ClientHolder> clients = new ConcurrentHashMap<String, ClientHolder>();

    private Clients() {
    }

    static ClientHolder getHolder(Vertx vertx, MqttClientSessionOptions options) {
        String host = options.getHostname();
        int port = options.getPort();
        String clientId = options.getClientId();
        String server = options.getServerName().orElse(null);
        String username = options.getUsername();
        String password = options.getPassword();
        String id = username + ":" + password + "@" + host + ":" + port + "<" + (server == null ? "" : server) + ">-[" + (clientId != null ? clientId : "") + "]";
        return clients.computeIfAbsent(id, key -> {
            MqttLogging.log.infof("Create MQTT Client for %s.", id);
            MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options);
            return new ClientHolder(client);
        });
    }

    public static void clear() {
        clients.values().forEach(ClientHolder::close);
        clients.clear();
    }

    public static class ClientHolder {
        private final MqttClientSession client;
        private final BroadcastProcessor<io.vertx.mutiny.mqtt.messages.MqttPublishMessage> messages;

        public ClientHolder(MqttClientSession client) {
            this.client = client;
            this.messages = BroadcastProcessor.create();
            client.messageHandler((Handler<MqttPublishMessage>)((Handler)m -> this.messages.onNext((Object)io.vertx.mutiny.mqtt.messages.MqttPublishMessage.newInstance((MqttPublishMessage)m))));
        }

        public Future<Void> start() {
            return this.client.start();
        }

        public Future<Void> close() {
            return this.client.stop();
        }

        public Multi<io.vertx.mutiny.mqtt.messages.MqttPublishMessage> stream() {
            return this.messages;
        }

        public MqttClientSession getClient() {
            return this.client;
        }
    }
}

