package br.pucrio.tecgraf.soma.job.infrastructure.persistence.message;

import br.pucrio.tecgraf.soma.job.JobHistoryEvent;
import br.pucrio.tecgraf.soma.job.SomaJobHistoryInfo;
import br.pucrio.tecgraf.soma.job.application.configuration.Constants;
import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Observable;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/br/pucrio/tecgraf/soma/job/infrastructure/persistence/message/JobHistoryEventReader.class */
public class JobHistoryEventReader extends Observable {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) SomaJobHistoryInfo.class);
    private final String KAFKA_SERVER;
    private final String KAFKA_SCHEMA_REGISTRY_URL;
    private final String KAFKA_TOPIC;
    private final String KAFKA_GROUP;

    @Autowired
    public JobHistoryEventReader(ServiceConfiguration serviceConfiguration) {
        this.KAFKA_SERVER = serviceConfiguration.getValue(Constants.Config.KAFKA_SERVER_ADDRESS.option.getLongName());
        this.KAFKA_SCHEMA_REGISTRY_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_SCHEMA_REGISTRY_URL.option.getLongName());
        this.KAFKA_TOPIC = serviceConfiguration.getValue(Constants.Config.KAFKA_TOPIC.option.getLongName());
        this.KAFKA_GROUP = serviceConfiguration.getValue(Constants.Config.KAFKA_CONSUMER_GROUP.option.getLongName());
    }

    @EventListener({ApplicationReadyEvent.class})
    public void run() throws IOException, InterruptedException {
        Consumer<String, JobHistoryEvent> buildKafkaConsumer = buildKafkaConsumer(this.KAFKA_SERVER, this.KAFKA_SCHEMA_REGISTRY_URL, this.KAFKA_TOPIC, this.KAFKA_GROUP);
        while (!Thread.currentThread().isInterrupted()) {
            readRecords(buildKafkaConsumer);
        }
    }

    protected void readRecords(Consumer<String, JobHistoryEvent> consumer) throws InterruptedException, IOException {
        ConsumerRecords<String, JobHistoryEvent> poll = consumer.poll(Duration.ofMillis(100L));
        this.logger.debug("readRecords Got {} records from Kafka", Integer.valueOf(poll.count()));
        for (TopicPartition topicPartition : poll.partitions()) {
            List<ConsumerRecord<String, JobHistoryEvent>> records = poll.records(topicPartition);
            Iterator<ConsumerRecord<String, JobHistoryEvent>> it = records.iterator();
            while (it.hasNext()) {
                readRecord(consumer, topicPartition, records, it.next());
            }
        }
    }

    private void readRecord(Consumer<String, JobHistoryEvent> consumer, TopicPartition topicPartition, List<ConsumerRecord<String, JobHistoryEvent>> list, ConsumerRecord<String, JobHistoryEvent> consumerRecord) throws InterruptedException, IOException {
        try {
            try {
                try {
                    this.logger.info("readRecord got Job {} at Kafka topic {}:{}:{}:{}", consumerRecord.key(), consumerRecord.topic(), consumerRecord.value(), Integer.valueOf(topicPartition.partition()), Long.valueOf(consumerRecord.offset()));
                    setChanged();
                    notifyObservers(consumerRecord.value());
                    consumer.commitSync();
                } catch (Exception e) {
                    this.logger.error("Unrecoverable error.", (Throwable) e);
                    consumer.commitSync();
                }
            } catch (MessagingException e2) {
                this.logger.error("Error trying send message to job-info", (Throwable) e2);
                consumer.commitSync();
            }
        } catch (Throwable th) {
            consumer.commitSync();
            throw th;
        }
    }

    protected Consumer<String, JobHistoryEvent> buildKafkaConsumer(String str, String str2, String str3, String str4) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(buildProperties(str, str2, str3, str4));
        kafkaConsumer.subscribe(Collections.singletonList(str3));
        return kafkaConsumer;
    }

    protected Properties buildProperties(String str, String str2, String str3, String str4) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, str2);
        properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, str4);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    }
}
