/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.websocketnotifier.infrastructure.persistence.message;

import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import br.pucrio.tecgraf.soma.websocketnotifier.application.configuration.Constants;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierInfo;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierStatus;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Observable;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

@Service
public class KafkaEventReader
extends Observable {
    private final Logger logger = LoggerFactory.getLogger(JsonObject.class);
    private final String KAFKA_SERVER;
    private final String KAFKA_SCHEMA_REGISTRY_URL;
    private final List<String> KAFKA_TOPICS;
    private final List<String> KAFKA_GROUP;

    @Autowired
    public KafkaEventReader(ServiceConfiguration serviceConfiguration) {
        this.KAFKA_SERVER = serviceConfiguration.getValue(Constants.Config.KAFKA_SERVER_ADDRESS.option.getLongName());
        this.KAFKA_SCHEMA_REGISTRY_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_SCHEMA_REGISTRY_URL.option.getLongName());
        this.KAFKA_TOPICS = Arrays.asList(serviceConfiguration.getValue(Constants.Config.KAFKA_TOPIC.option.getLongName()).split(";"));
        this.KAFKA_GROUP = Arrays.asList(serviceConfiguration.getValue(Constants.Config.KAFKA_CONSUMER_GROUP.option.getLongName()).split(";"));
    }

    @EventListener(value={ApplicationReadyEvent.class})
    public void run() {
        if (this.KAFKA_TOPICS.size() != this.KAFKA_GROUP.size()) {
            String msg = String.format("List arguments value of --%s and --%s must have the same length", Constants.Config.KAFKA_TOPIC.option.getLongName(), Constants.Config.KAFKA_CONSUMER_GROUP.option.getLongName());
            this.logger.error(msg);
            throw new IllegalArgumentException(msg);
        }
        ArrayList<Consumer> consumers = new ArrayList<Consumer>();
        for (int i = 0; i < this.KAFKA_TOPICS.size(); ++i) {
            consumers.add(this.buildKafkaConsumer(this.KAFKA_SERVER, this.KAFKA_SCHEMA_REGISTRY_URL, (String)this.KAFKA_TOPICS.get(i), (String)this.KAFKA_GROUP.get(i)));
        }
        while (!Thread.currentThread().isInterrupted()) {
            for (Consumer consumer : consumers) {
                this.readRecords(consumer);
            }
        }
    }

    protected Properties buildProperties(String kafkaServer, String schemaRegistryUrl, String group) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaServer);
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", schemaRegistryUrl);
        properties.setProperty("specific.avro.reader", "false");
        properties.setProperty("group.id", group);
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.commit.interval.ms", "1000");
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

    protected void readRecords(Consumer<String, Object> consumer) {
        ConsumerRecords records = consumer.poll(100L);
        if (!records.isEmpty()) {
            this.logger.debug("readRecords Got {} records from Kafka", (Object)records.count());
        }
        for (TopicPartition partition : records.partitions()) {
            List partitionRecords = records.records(partition);
            for (ConsumerRecord record : partitionRecords) {
                this.readRecord(consumer, record, WebsocketNotifierStatus.LIVE.name());
            }
        }
    }

    public List<JsonObject> readRecordsFromOffset(Consumer<String, Object> consumer, WebsocketNotifierInfo wsInfo, long startPositionOffset) {
        ArrayList<JsonObject> recordFromOffset = new ArrayList<JsonObject>();
        if (wsInfo.getOffset() == 0) {
            return recordFromOffset;
        }
        consumer.poll(100L);
        Set assignment = consumer.assignment();
        for (TopicPartition tp : assignment) {
            consumer.seek(tp, startPositionOffset);
            ConsumerRecords records = consumer.poll(1000L);
            if (!records.isEmpty()) {
                this.logger.info("readRecordsFromOffset Got {} records from topic {} Kafka", (Object)records.count(), (Object)tp.topic());
            }
            for (TopicPartition partition : records.partitions()) {
                List partitionRecords = records.records(partition);
                for (ConsumerRecord record : partitionRecords) {
                    JsonObject eventMessage = this.parseKafkaRecord(record, this.getEventStatus(record, wsInfo));
                    recordFromOffset.add(eventMessage);
                }
            }
        }
        return recordFromOffset;
    }

    private String getEventStatus(ConsumerRecord<String, Object> record, WebsocketNotifierInfo wsInfo) {
        if (record.offset() <= (long)wsInfo.getOffset().intValue()) {
            return WebsocketNotifierStatus.OLD.name();
        }
        return WebsocketNotifierStatus.LOST.name();
    }

    private JsonObject parseKafkaRecord(ConsumerRecord<String, Object> record, String eventStatus) {
        JsonObject eventMessage = new JsonParser().parse(record.value().toString()).getAsJsonObject();
        eventMessage.addProperty("topic_name", record.topic());
        eventMessage.addProperty("offset", (Number)record.offset());
        eventMessage.addProperty("eventStatus", eventStatus);
        return eventMessage;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void readRecord(Consumer<String, Object> consumer, ConsumerRecord<String, Object> record, String eventStatus) {
        try {
            this.setChanged();
            JsonObject eventMessage = this.parseKafkaRecord(record, eventStatus);
            this.logger.debug("readRecord got Job {}", (Object)eventMessage);
            this.notifyObservers((Object)eventMessage);
        }
        catch (MessagingException me) {
            this.logger.error("Error trying send message to job-info", (Throwable)me);
        }
        catch (Exception e) {
            this.logger.error("Unrecoverable error.", (Throwable)e);
        }
        finally {
            try {
                consumer.commitSync();
            }
            catch (Exception e) {
                this.logger.error("Unrecoverable error during commitSync.", (Throwable)e);
            }
        }
    }

    public Consumer<String, Object> buildKafkaConsumer(String kafkaServer, String schemaRegistryUrl, String topic, String group) {
        KafkaConsumer consumer = new KafkaConsumer(this.buildProperties(kafkaServer, schemaRegistryUrl, group));
        consumer.subscribe(Collections.singletonList(topic));
        return consumer;
    }
}

