/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.job;

import br.pucrio.tecgraf.soma.job.JobExecutingEvent;
import br.pucrio.tecgraf.soma.job.JobFinishedEvent;
import br.pucrio.tecgraf.soma.job.JobHistoryEvent;
import br.pucrio.tecgraf.soma.job.JobRescheduledEvent;
import br.pucrio.tecgraf.soma.job.JobScheduledEvent;
import br.pucrio.tecgraf.soma.job.JobStageInEvent;
import br.pucrio.tecgraf.soma.job.JobStageOutEvent;
import br.pucrio.tecgraf.soma.job.event.IJobHistoryEventFactory;
import br.pucrio.tecgraf.soma.job.event.JobHistoryEventFactory;
import csbase.server.plugin.service.IServiceManager;
import csbase.server.plugin.service.commandpersistenceservice.ICommandInfo;
import csbase.server.plugin.service.commandpersistenceservice.ICommandStatusListener;
import csbase.server.services.commandpersistenceservice.CommandPersistenceService;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class SomaCommandStatusListener
implements ICommandStatusListener {
    public static final String KAFKA_SCHEMA_REGISTRY_URL = "schema_registry_url";
    public static final String KAFKA_SERVER_ADDRESS = "kafka_server_address";
    public static final String KAFKA_TOPIC = "job-events";
    private String kafkaServer;
    private String schemaRegistry;
    private Properties pluginProperties;
    private CommandPersistenceService persistenceService;
    private IJobHistoryEventFactory eventFactory;
    private IServiceManager serviceManager;
    private Producer<String, JobHistoryEvent> producer;
    protected ExecutorService notifierExecutor;

    public SomaCommandStatusListener(IServiceManager serviceManager) {
        this.serviceManager = serviceManager;
        this.notifierExecutor = Executors.newSingleThreadExecutor();
    }

    public void statusChanged(ICommandInfo commandInfo) {
        Thread.currentThread().setContextClassLoader(null);
        this.notifierExecutor.execute(() -> {
            JobHistoryEvent jobHistoryEvent = this.createJobHistoryEvent(commandInfo);
            if (jobHistoryEvent != null) {
                this.getProducer().send(new ProducerRecord(KAFKA_TOPIC, (Object)commandInfo.getCommandId(), (Object)jobHistoryEvent));
            }
        });
    }

    public void setProperties(Properties properties) {
        this.pluginProperties = properties;
        this.kafkaServer = this.pluginProperties.getProperty(KAFKA_SERVER_ADDRESS);
        this.schemaRegistry = this.pluginProperties.getProperty(KAFKA_SCHEMA_REGISTRY_URL);
    }

    private JobHistoryEvent createJobHistoryEvent(ICommandInfo commandInfo) {
        switch (commandInfo.getStatus()) {
            case UPLOADING: {
                JobStageInEvent event = this.getEventFactory().buildStageInEvent(commandInfo);
                return new JobHistoryEvent(event.getClass().getSimpleName(), (Object)event);
            }
            case DOWNLOADING: {
                JobStageOutEvent event = this.getEventFactory().buildStageOutEvent(commandInfo);
                return new JobHistoryEvent(event.getClass().getSimpleName(), (Object)event);
            }
            case EXECUTING: {
                JobExecutingEvent event = this.getEventFactory().buildExecutingEvent(commandInfo);
                return new JobHistoryEvent(event.getClass().getSimpleName(), (Object)event);
            }
            case SCHEDULED: {
                JobScheduledEvent event = this.getEventFactory().buildScheduledEvent(commandInfo);
                return new JobHistoryEvent(event.getClass().getSimpleName(), (Object)event);
            }
            case RESCHEDULED: {
                JobRescheduledEvent event = this.getEventFactory().buildRescheduledEvent(commandInfo);
                return new JobHistoryEvent(event.getClass().getSimpleName(), (Object)event);
            }
            case FINISHED: {
                JobFinishedEvent event = this.getEventFactory().buildFinishedEvent(commandInfo);
                return new JobHistoryEvent(event.getClass().getSimpleName(), (Object)event);
            }
            case INIT: {
                return null;
            }
        }
        throw new IllegalArgumentException("Unsupported ICommandInfo status");
    }

    protected Producer<String, JobHistoryEvent> getProducer() {
        if (this.producer == null) {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", this.kafkaServer);
            properties.setProperty("schema.registry.url", this.schemaRegistry);
            properties.setProperty("key.serializer", StringSerializer.class.getName());
            properties.setProperty("value.serializer", KafkaAvroSerializer.class.getName());
            properties.setProperty("acks", "1");
            properties.setProperty("retries", "3");
            properties.setProperty("linger.ms", "1");
            this.producer = new KafkaProducer(properties);
        }
        return this.producer;
    }

    protected CommandPersistenceService getPersistenceService() {
        if (this.persistenceService == null) {
            this.persistenceService = (CommandPersistenceService)this.serviceManager.getService("CommandPersistenceService");
        }
        return this.persistenceService;
    }

    protected IJobHistoryEventFactory getEventFactory() {
        if (this.eventFactory == null) {
            this.eventFactory = new JobHistoryEventFactory(this.getPersistenceService());
        }
        return this.eventFactory;
    }
}

