/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;

public class ProcessorNode<K, V> {
    private final List<ProcessorNode<?, ?>> children;
    private final String name;
    private final Processor<K, V> processor;
    NodeMetrics nodeMetrics;
    private Time time;
    private K key;
    private V value;
    private Runnable processDelegate = new Runnable(){

        @Override
        public void run() {
            ProcessorNode.this.processor.process(ProcessorNode.this.key, ProcessorNode.this.value);
        }
    };
    private ProcessorContext context;
    private Runnable initDelegate = new Runnable(){

        @Override
        public void run() {
            if (ProcessorNode.this.processor != null) {
                ProcessorNode.this.processor.init(ProcessorNode.this.context);
            }
        }
    };
    private Runnable closeDelegate = new Runnable(){

        @Override
        public void run() {
            if (ProcessorNode.this.processor != null) {
                ProcessorNode.this.processor.close();
            }
        }
    };
    private long timestamp;
    private Runnable punctuateDelegate = new Runnable(){

        @Override
        public void run() {
            ProcessorNode.this.processor().punctuate(ProcessorNode.this.timestamp);
        }
    };
    public final Set<String> stateStores;

    public ProcessorNode(String name) {
        this(name, null, null);
    }

    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
        this.name = name;
        this.processor = processor;
        this.children = new ArrayList();
        this.stateStores = stateStores;
        this.time = new SystemTime();
    }

    public final String name() {
        return this.name;
    }

    public final Processor<K, V> processor() {
        return this.processor;
    }

    public final List<ProcessorNode<?, ?>> children() {
        return this.children;
    }

    public void addChild(ProcessorNode<?, ?> child) {
        this.children.add(child);
    }

    public void init(ProcessorContext context) {
        this.context = context;
        try {
            this.nodeMetrics = new NodeMetrics(context.metrics(), this.name, "task." + context.taskId());
            this.nodeMetrics.metrics.measureLatencyNs(this.time, this.initDelegate, this.nodeMetrics.nodeCreationSensor);
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to initialize processor %s", this.name), e);
        }
    }

    public void close() {
        try {
            this.nodeMetrics.metrics.measureLatencyNs(this.time, this.closeDelegate, this.nodeMetrics.nodeDestructionSensor);
            this.nodeMetrics.removeAllSensors();
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to close processor %s", this.name), e);
        }
    }

    public void process(K key, V value) {
        this.key = key;
        this.value = value;
        this.nodeMetrics.metrics.measureLatencyNs(this.time, this.processDelegate, this.nodeMetrics.nodeProcessTimeSensor);
    }

    public void punctuate(long timestamp) {
        this.timestamp = timestamp;
        this.nodeMetrics.metrics.measureLatencyNs(this.time, this.punctuateDelegate, this.nodeMetrics.nodePunctuateTimeSensor);
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder(indent + this.name + ":\n");
        if (this.stateStores != null && !this.stateStores.isEmpty()) {
            sb.append(indent).append("\tstates:\t\t[");
            for (String store : this.stateStores) {
                sb.append(store);
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    protected static final class NodeMetrics {
        final StreamsMetricsImpl metrics;
        final String metricGrpName;
        final Map<String, String> metricTags;
        final Sensor nodeProcessTimeSensor;
        final Sensor nodePunctuateTimeSensor;
        final Sensor sourceNodeForwardSensor;
        final Sensor nodeCreationSensor;
        final Sensor nodeDestructionSensor;

        public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) {
            String scope = "processor-node";
            String tagKey = "processor-node-id";
            String tagValue = name;
            this.metrics = (StreamsMetricsImpl)metrics;
            this.metricGrpName = "stream-processor-node-metrics";
            this.metricTags = new LinkedHashMap<String, String>();
            this.metricTags.put("processor-node-id", tagValue);
            this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor("processor-node", sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, "processor-node-id", tagValue);
            this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor("processor-node", sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, "processor-node-id", tagValue);
            this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor("processor-node", sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, "processor-node-id", tagValue);
            this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor("processor-node", sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, "processor-node-id", tagValue);
            this.sourceNodeForwardSensor = metrics.addThroughputSensor("processor-node", sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, "processor-node-id", tagValue);
        }

        public void removeAllSensors() {
            this.metrics.removeSensor(this.nodeProcessTimeSensor);
            this.metrics.removeSensor(this.nodePunctuateTimeSensor);
            this.metrics.removeSensor(this.sourceNodeForwardSensor);
            this.metrics.removeSensor(this.nodeCreationSensor);
            this.metrics.removeSensor(this.nodeDestructionSensor);
        }
    }
}

