/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.job.infrastructure.persistence.message;

import br.pucrio.tecgraf.soma.job.JobHistoryEvent;
import br.pucrio.tecgraf.soma.job.SomaJobHistoryInfo;
import br.pucrio.tecgraf.soma.job.application.configuration.Constants;
import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Observable;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

@Service
public class JobHistoryEventReader
extends Observable {
    private final Logger logger = LoggerFactory.getLogger(SomaJobHistoryInfo.class);
    private final String KAFKA_SERVER;
    private final String KAFKA_SCHEMA_REGISTRY_URL;
    private final String KAFKA_TOPIC;
    private final String KAFKA_GROUP;

    @Autowired
    public JobHistoryEventReader(ServiceConfiguration serviceConfiguration) {
        this.KAFKA_SERVER = serviceConfiguration.getValue(Constants.Config.KAFKA_SERVER_ADDRESS.option.getLongName());
        this.KAFKA_SCHEMA_REGISTRY_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_SCHEMA_REGISTRY_URL.option.getLongName());
        this.KAFKA_TOPIC = serviceConfiguration.getValue(Constants.Config.KAFKA_TOPIC.option.getLongName());
        this.KAFKA_GROUP = serviceConfiguration.getValue(Constants.Config.KAFKA_CONSUMER_GROUP.option.getLongName());
    }

    @EventListener(value={ApplicationReadyEvent.class})
    public void run() throws IOException, InterruptedException {
        Consumer consumer = this.buildKafkaConsumer(this.KAFKA_SERVER, this.KAFKA_SCHEMA_REGISTRY_URL, this.KAFKA_TOPIC, this.KAFKA_GROUP);
        while (!Thread.currentThread().isInterrupted()) {
            this.readRecords(consumer);
        }
    }

    protected void readRecords(Consumer<String, JobHistoryEvent> consumer) throws InterruptedException, IOException {
        ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
        this.logger.debug("readRecords Got {} records from Kafka", (Object)records.count());
        for (TopicPartition partition : records.partitions()) {
            List partitionRecords = records.records(partition);
            for (ConsumerRecord record : partitionRecords) {
                this.readRecord(consumer, partition, partitionRecords, record);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void readRecord(Consumer<String, JobHistoryEvent> consumer, TopicPartition partition, List<ConsumerRecord<String, JobHistoryEvent>> partitionRecords, ConsumerRecord<String, JobHistoryEvent> record) throws InterruptedException, IOException {
        try {
            this.logger.info("readRecord got Job {} at Kafka topic {}:{}:{}:{}", new Object[]{record.key(), record.topic(), record.value(), partition.partition(), record.offset()});
            this.setChanged();
            this.notifyObservers(record.value());
        }
        catch (MessagingException me) {
            this.logger.error("Error trying send message to job-info", (Throwable)me);
        }
        catch (Exception e) {
            this.logger.error("Unrecoverable error.", (Throwable)e);
        }
        finally {
            consumer.commitSync();
        }
    }

    protected Consumer<String, JobHistoryEvent> buildKafkaConsumer(String kafkaServer, String schemaRegistryUrl, String topic, String group) {
        KafkaConsumer consumer = new KafkaConsumer(this.buildProperties(kafkaServer, schemaRegistryUrl, topic, group));
        consumer.subscribe(Collections.singletonList(topic));
        return consumer;
    }

    protected Properties buildProperties(String kafkaServer, String schemaRegistryUrl, String topic, String group) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaServer);
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", schemaRegistryUrl);
        properties.setProperty("specific.avro.reader", "true");
        properties.setProperty("group.id", group);
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.offset.reset", "latest");
        return properties;
    }
}

