/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.websocketnotifier.application.service;

import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import br.pucrio.tecgraf.soma.websocketnotifier.application.configuration.Constants;
import br.pucrio.tecgraf.soma.websocketnotifier.application.service.KtableService;
import br.pucrio.tecgraf.soma.websocketnotifier.infrastructure.persistence.message.KafkaEventReader;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierInfo;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierStatus;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class ProducerHandler {
    private final Logger logger = LoggerFactory.getLogger(ProducerHandler.class);
    private Producer<String, String> producer;
    private final String KAFKA_SERVER;
    private final String KAFKA_NOTIFICATION_TOPIC;
    private final List<String> KAFKA_TOPICS;
    private final String KAFKA_SCHEMA_REGISTRY_URL;
    private final List<Integer> LAST_EVENTS_MINIMUM_NUMBER;
    private KtableService ktableService;
    private KafkaEventReader eventReader;

    @Autowired
    public ProducerHandler(ServiceConfiguration serviceConfiguration, KtableService ktableService, KafkaEventReader eventReader) {
        this.KAFKA_SERVER = serviceConfiguration.getValue(Constants.Config.KAFKA_SERVER_ADDRESS.option.getLongName());
        this.KAFKA_NOTIFICATION_TOPIC = serviceConfiguration.getValue(Constants.Config.KAFKA_NOTIFICATION_TOPIC.option.getLongName());
        this.KAFKA_TOPICS = Arrays.asList(serviceConfiguration.getValue(Constants.Config.KAFKA_TOPIC.option.getLongName()).split(";"));
        this.KAFKA_SCHEMA_REGISTRY_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_SCHEMA_REGISTRY_URL.option.getLongName());
        this.LAST_EVENTS_MINIMUM_NUMBER = Arrays.stream(serviceConfiguration.getValue(Constants.Config.LAST_EVENTS_MINIMUM_NUMBER.option.getLongName()).split(";")).map(Integer::parseInt).collect(Collectors.toList());
        this.ktableService = ktableService;
        this.eventReader = eventReader;
        if (this.KAFKA_TOPICS.size() != this.LAST_EVENTS_MINIMUM_NUMBER.size()) {
            String msg = String.format("List arguments value of --%s and --%s must have the same length", Constants.Config.KAFKA_TOPIC.option.getLongName(), Constants.Config.LAST_EVENTS_MINIMUM_NUMBER.option.getLongName());
            this.logger.error(msg);
            throw new IllegalArgumentException(msg);
        }
    }

    public WebsocketNotifierInfo getLastOffset(WebsocketNotifierInfo wsInfo) {
        return this.ktableService.getLastUserOffset(wsInfo);
    }

    protected Producer<String, String> getProducer() {
        if (this.producer == null) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", this.KAFKA_SERVER);
            properties.setProperty("key.serializer", StringSerializer.class.getName());
            properties.setProperty("value.serializer", StringSerializer.class.getName());
            properties.setProperty("acks", "1");
            properties.setProperty("retries", "3");
            properties.setProperty("linger.ms", "1");
            this.producer = new KafkaProducer(properties);
        }
        return this.producer;
    }

    public void publishKafkaOffset(WebsocketNotifierInfo message, WebsocketNotifierStatus status) {
        if (!status.equals((Object)WebsocketNotifierStatus.OLD)) {
            String eventMessage = null;
            JsonObject jsonObject = null;
            try {
                Gson gson = new Gson();
                eventMessage = gson.toJson((Object)message);
                JsonParser parser = new JsonParser();
                jsonObject = (JsonObject)parser.parse(eventMessage);
            }
            catch (Error err) {
                this.logger.error("Error during push event: " + message.toString(), (Throwable)err);
            }
            if (eventMessage != null && !eventMessage.isEmpty()) {
                this.getProducer().send(new ProducerRecord(this.KAFKA_NOTIFICATION_TOPIC, (Object)jsonObject.get("user").getAsString(), (Object)jsonObject.toString()));
                this.logger.debug("Published topic {} user {}", (Object)this.KAFKA_NOTIFICATION_TOPIC, (Object)jsonObject);
            }
        }
    }

    public List<WebsocketNotifierInfo> buildWebsocketInfos(String user, String application, List<String> topics) {
        ArrayList<WebsocketNotifierInfo> wsInfoList = new ArrayList<WebsocketNotifierInfo>();
        for (String topic_name : topics) {
            WebsocketNotifierInfo wsInfo = new WebsocketNotifierInfo();
            wsInfo.setTopicName((CharSequence)topic_name);
            wsInfo.setUser((CharSequence)user);
            wsInfo.setApplication((CharSequence)application);
            wsInfo = this.getLastOffset(wsInfo);
            wsInfoList.add(wsInfo);
        }
        return wsInfoList;
    }

    private int getLastMinimumNumberParam(CharSequence topic) {
        int minimumOffset = 0;
        for (int i = 0; i < this.KAFKA_TOPICS.size(); ++i) {
            if (!((String)this.KAFKA_TOPICS.get(i)).equals(topic)) continue;
            minimumOffset = (Integer)this.LAST_EVENTS_MINIMUM_NUMBER.get(i);
            break;
        }
        if (minimumOffset < 0) {
            minimumOffset = 0;
        }
        return minimumOffset;
    }

    private int getLastMinimumNumber(String topic, int currentUserOffset) {
        int minimumOffset = this.getLastMinimumNumberParam((CharSequence)topic);
        if ((currentUserOffset -= minimumOffset) > 0) {
            return currentUserOffset + 1;
        }
        return 0;
    }

    private void publishUserFirstOffset(Consumer<String, Object> consumer, WebsocketNotifierInfo wsInfo) {
        Set assignment = consumer.assignment();
        long maxOffset = 1L;
        for (TopicPartition tp : assignment) {
            if (consumer.position(tp) <= maxOffset) continue;
            maxOffset = consumer.position(tp);
        }
        wsInfo.setOffset(Integer.valueOf((int)maxOffset));
        this.publishKafkaOffset(wsInfo, WebsocketNotifierStatus.LIVE);
    }

    public List<JsonObject> getLostMessagesToUser(List<WebsocketNotifierInfo> wsInfos) {
        ArrayList<JsonObject> recordFromOffset = new ArrayList<JsonObject>();
        if (wsInfos != null) {
            for (WebsocketNotifierInfo wsInfo : wsInfos) {
                int beginOffset;
                Consumer consumer = this.eventReader.buildKafkaConsumer(this.KAFKA_SERVER, this.KAFKA_SCHEMA_REGISTRY_URL, (String)wsInfo.getTopicName(), wsInfo.getTopicName() + "-lostevents-" + System.currentTimeMillis());
                List events = this.eventReader.readRecordsFromOffset(consumer, wsInfo, (long)(beginOffset = this.getLastMinimumNumber(wsInfo.getTopicName().toString(), wsInfo.getOffset().intValue())));
                if (!events.isEmpty()) {
                    recordFromOffset.addAll(events);
                } else {
                    this.publishUserFirstOffset(consumer, wsInfo);
                }
                consumer.close();
            }
        }
        return recordFromOffset;
    }
}

