package br.pucrio.tecgraf.soma.job.infrastructure.persistence.message;

import br.pucrio.tecgraf.soma.job.JobHistoryEvent;
import br.pucrio.tecgraf.soma.job.SomaJobHistoryConsumer;
import br.pucrio.tecgraf.soma.job.application.appservice.JobHistoryEventService;
import br.pucrio.tecgraf.soma.job.application.appservice.LostEventAppService;
import br.pucrio.tecgraf.soma.job.domain.model.LostEvent;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import javax.persistence.RollbackException;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.hibernate.exception.JDBCConnectionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:br/pucrio/tecgraf/soma/job/infrastructure/persistence/message/JobHistoryEventReader.class */
public class JobHistoryEventReader {
    private JobHistoryEventService jobHistoryEventService;
    private LostEventAppService lostEventAppService;
    private final Logger logger = LoggerFactory.getLogger(SomaJobHistoryConsumer.class);
    private RecordReader recordReader = new RecordReader();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:br/pucrio/tecgraf/soma/job/infrastructure/persistence/message/JobHistoryEventReader$RecordReader.class */
    public class RecordReader {
        private RecordReader() {
        }

        public void readRecord(KafkaConsumer<String, JobHistoryEvent> kafkaConsumer, TopicPartition topicPartition, String str, List<ConsumerRecord<String, JobHistoryEvent>> list, ConsumerRecord<String, JobHistoryEvent> consumerRecord) throws InterruptedException, IOException {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    long offset = list.get(list.size() - 1).offset();
                    int partition = topicPartition.partition();
                    JobHistoryEventReader.this.logger.info("Job {} at Kafka topic {}:{}:{}", consumerRecord.key(), consumerRecord.topic(), Integer.valueOf(partition), Long.valueOf(offset));
                    JobHistoryEventReader.this.jobHistoryEventService.process(consumerRecord.value(), partition, offset);
                    kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)));
                    return;
                } catch (IOException | RollbackException e) {
                    long offset2 = list.get(list.size() - 1).offset();
                    int partition2 = topicPartition.partition();
                    if (consumerRecord.topic().equals(str)) {
                        LostEvent lostEvent = new LostEvent(getEventId(consumerRecord.value().getEvent()), partition2, offset2, consumerRecord.value().toString());
                        JobHistoryEventReader.this.lostEventAppService.saveLostEvent(lostEvent, partition2, offset2);
                        JobHistoryEventReader.this.logger.error("Error trying to process a kafka message. A lost event with id {} was persisted in the database for further inspection.", Long.valueOf(lostEvent.getId()), e);
                    } else {
                        JobHistoryEventReader.this.logger.error("Error trying to process a kafka message. The event was not persisted.", e);
                    }
                    kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset2 + 1)));
                    return;
                } catch (JDBCConnectionException e2) {
                    JobHistoryEventReader.this.logger.warn("No DB connection. Retrying in {} seconds.", Long.valueOf(Metadata.TOPIC_EXPIRY_MS));
                    Thread.sleep(Metadata.TOPIC_EXPIRY_MS);
                } catch (Exception e3) {
                    JobHistoryEventReader.this.logger.error("Unrecoverable error.", (Throwable) e3);
                    return;
                }
            }
        }

        private String getEventId(Object obj) {
            try {
                try {
                    return (String) obj.getClass().getMethod("getEventId", new Class[0]).invoke(obj, new Object[0]);
                } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                    return null;
                }
            } catch (NoSuchMethodException | SecurityException e2) {
                return null;
            }
        }
    }

    @Autowired
    public JobHistoryEventReader(JobHistoryEventService jobHistoryEventService, LostEventAppService lostEventAppService) {
        this.jobHistoryEventService = jobHistoryEventService;
        this.lostEventAppService = lostEventAppService;
    }

    public void run(String str, String str2, String str3, String str4, String str5) throws IOException, InterruptedException {
        KafkaConsumer<String, JobHistoryEvent> buildKafkaConsumer = buildKafkaConsumer(str, str2, str5);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str3);
        arrayList.add(str4);
        buildKafkaConsumer.subscribe(arrayList);
        while (!Thread.currentThread().isInterrupted()) {
            readRecords(buildKafkaConsumer, str3);
        }
    }

    protected void readRecords(KafkaConsumer<String, JobHistoryEvent> kafkaConsumer, String str) throws InterruptedException, IOException {
        ConsumerRecords<String, JobHistoryEvent> poll = kafkaConsumer.poll(Duration.ofMillis(100L));
        this.logger.debug("Got {} records from Kafka", Integer.valueOf(poll.count()));
        for (TopicPartition topicPartition : poll.partitions()) {
            List<ConsumerRecord<String, JobHistoryEvent>> records = poll.records(topicPartition);
            Iterator<ConsumerRecord<String, JobHistoryEvent>> it = records.iterator();
            while (it.hasNext()) {
                this.recordReader.readRecord(kafkaConsumer, topicPartition, str, records, it.next());
            }
        }
    }

    protected KafkaConsumer<String, JobHistoryEvent> buildKafkaConsumer(String str, String str2, String str3) {
        return new KafkaConsumer<>(buildProperties(str, str2, str3));
    }

    private Properties buildProperties(String str, String str2, String str3) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, str2);
        properties.setProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, str3);
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return properties;
    }
}
