/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    private volatile State state = State.CREATED;
    private StateListener stateListener = null;
    final PartitionGrouper partitionGrouper;
    private final StreamsMetadataState streamsMetadataState;
    public final String applicationId;
    public final String clientId;
    public final UUID processId;
    protected final StreamsConfig config;
    protected final TopologyBuilder builder;
    Producer<byte[], byte[]> threadProducer;
    private final KafkaClientSupplier clientSupplier;
    protected final Consumer<byte[], byte[]> consumer;
    final Consumer<byte[], byte[]> restoreConsumer;
    private final String logPrefix;
    private final String threadClientId;
    private final Pattern sourceTopicPattern;
    private final Map<TaskId, StreamTask> activeTasks;
    private final Map<TaskId, StandbyTask> standbyTasks;
    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
    private final Set<TaskId> prevActiveTasks;
    private final Map<TaskId, StreamTask> suspendedTasks;
    private final Map<TaskId, StandbyTask> suspendedStandbyTasks;
    private final Time time;
    private final int rebalanceTimeoutMs;
    private final long pollTimeMs;
    private final long cleanTimeMs;
    private final long commitTimeMs;
    private final StreamsMetricsThreadImpl streamsMetrics;
    final StateDirectory stateDirectory;
    private String originalReset;
    private StreamPartitionAssignor partitionAssignor;
    private long timerStartedMs;
    private long lastCleanMs;
    private long lastCommitMs;
    private Throwable rebalanceException = null;
    private final boolean eosEnabled;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    private boolean processStandbyRecords = false;
    private final ThreadCache cache;
    private StoreChangelogReader storeChangelogReader;
    private final TaskCreator taskCreator = new TaskCreator();
    final ConsumerRebalanceListener rebalanceListener;
    private static final int UNLIMITED_RECORDS = -1;

    public StreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time, StreamsMetadataState streamsMetadataState, long cacheSizeBytes) {
        super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
        this.applicationId = applicationId;
        this.config = config;
        this.builder = builder;
        this.clientSupplier = clientSupplier;
        this.sourceTopicPattern = builder.sourceTopicPattern();
        this.clientId = clientId;
        this.processId = processId;
        this.partitionGrouper = (PartitionGrouper)config.getConfiguredInstance("partition.grouper", PartitionGrouper.class);
        this.streamsMetadataState = streamsMetadataState;
        this.threadClientId = this.getName();
        this.logPrefix = String.format("stream-thread [%s]", this.threadClientId);
        this.streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + this.threadClientId, Collections.singletonMap("client-id", this.threadClientId));
        if (config.getLong("cache.max.bytes.buffering") < 0L) {
            log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", (Object)this.logPrefix);
        }
        this.cache = new ThreadCache(this.threadClientId, cacheSizeBytes, this.streamsMetrics);
        this.eosEnabled = "exactly_once".equals(config.getString("processing.guarantee"));
        log.info("{} Creating consumer client", (Object)this.logPrefix);
        Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, this.threadClientId);
        if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
            this.originalReset = (String)consumerConfigs.get("auto.offset.reset");
            log.info("{} Custom offset resets specified updating configs original auto offset reset {}", (Object)this.logPrefix, (Object)this.originalReset);
            consumerConfigs.put("auto.offset.reset", "none");
        }
        this.consumer = clientSupplier.getConsumer(consumerConfigs);
        log.info("{} Creating restore consumer client", (Object)this.logPrefix);
        this.restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(this.threadClientId));
        this.activeTasks = new ConcurrentHashMap<TaskId, StreamTask>();
        this.standbyTasks = new HashMap<TaskId, StandbyTask>();
        this.activeTasksByPartition = new HashMap<TopicPartition, StreamTask>();
        this.standbyTasksByPartition = new HashMap<TopicPartition, StandbyTask>();
        this.prevActiveTasks = new HashSet<TaskId>();
        this.suspendedTasks = new HashMap<TaskId, StreamTask>();
        this.suspendedStandbyTasks = new HashMap<TaskId, StandbyTask>();
        this.standbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
        this.stateDirectory = new StateDirectory(applicationId, this.threadClientId, config.getString("state.dir"), time);
        Object maxPollInterval = consumerConfigs.get("max.poll.interval.ms");
        this.rebalanceTimeoutMs = (Integer)ConfigDef.parseType((String)"max.poll.interval.ms", (Object)maxPollInterval, (ConfigDef.Type)ConfigDef.Type.INT);
        this.pollTimeMs = config.getLong("poll.ms");
        this.commitTimeMs = config.getLong("commit.interval.ms");
        this.cleanTimeMs = config.getLong("state.cleanup.delay.ms");
        this.time = time;
        this.timerStartedMs = time.milliseconds();
        this.lastCleanMs = Long.MAX_VALUE;
        this.lastCommitMs = this.timerStartedMs;
        this.rebalanceListener = new RebalanceListener(time, config.getInt("request.timeout.ms"));
        this.setState(State.RUNNING);
    }

    @Override
    public void run() {
        log.info("{} Starting", (Object)this.logPrefix);
        boolean cleanRun = false;
        try {
            this.runLoop();
            cleanRun = true;
        }
        catch (KafkaException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("{} Streams application error during processing: ", (Object)this.logPrefix, (Object)e);
            throw e;
        }
        finally {
            this.shutdown(cleanRun);
        }
    }

    private void runLoop() {
        long recordsProcessedBeforeCommit = -1L;
        this.consumer.subscribe(this.sourceTopicPattern, this.rebalanceListener);
        while (this.stillRunning()) {
            this.timerStartedMs = this.time.milliseconds();
            ConsumerRecords<byte[], byte[]> records = this.pollRequests();
            if (records != null && !records.isEmpty() && !this.activeTasks.isEmpty()) {
                this.streamsMetrics.pollTimeSensor.record((double)this.computeLatency(), this.timerStartedMs);
                this.addRecordsToTasks(records);
                long totalProcessed = this.processAndPunctuate(this.activeTasks, recordsProcessedBeforeCommit);
                if (totalProcessed > 0L) {
                    long processLatency = this.computeLatency();
                    this.streamsMetrics.processTimeSensor.record((double)processLatency / (double)totalProcessed, this.timerStartedMs);
                    recordsProcessedBeforeCommit = this.adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed, processLatency, this.commitTimeMs);
                }
            }
            this.maybeCommit(this.timerStartedMs);
            this.maybeUpdateStandbyTasks();
            this.maybeClean(this.timerStartedMs);
        }
        log.info("{} Shutting down at user request", (Object)this.logPrefix);
    }

    private ConsumerRecords<byte[], byte[]> pollRequests() {
        ConsumerRecords records = null;
        try {
            records = this.consumer.poll(this.pollTimeMs);
        }
        catch (InvalidOffsetException e) {
            this.resetInvalidOffsets(e);
        }
        if (this.rebalanceException != null && !(this.rebalanceException instanceof ProducerFencedException)) {
            throw new StreamsException(this.logPrefix + " Failed to rebalance.", this.rebalanceException);
        }
        return records;
    }

    private void resetInvalidOffsets(InvalidOffsetException e) {
        Set partitions = e.partitions();
        HashSet<String> loggedTopics = new HashSet<String>();
        HashSet<TopicPartition> seekToBeginning = new HashSet<TopicPartition>();
        HashSet<TopicPartition> seekToEnd = new HashSet<TopicPartition>();
        for (TopicPartition partition : partitions) {
            if (this.builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToBeginning, "{} Setting topic '{}' to consume from {} offset", "earliest", loggedTopics);
                continue;
            }
            if (this.builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
                this.addToResetList(partition, seekToEnd, "{} Setting topic '{}' to consume from {} offset", "latest", loggedTopics);
                continue;
            }
            if (this.originalReset == null || !this.originalReset.equals("earliest") && !this.originalReset.equals("latest")) {
                this.setState(State.PENDING_SHUTDOWN);
                String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
                throw new StreamsException(String.format("No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)", partition.topic(), partition.partition()), e);
            }
            if (this.originalReset.equals("earliest")) {
                this.addToResetList(partition, seekToBeginning, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", loggedTopics);
                continue;
            }
            if (!this.originalReset.equals("latest")) continue;
            this.addToResetList(partition, seekToEnd, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", loggedTopics);
        }
        if (!seekToBeginning.isEmpty()) {
            this.consumer.seekToBeginning(seekToBeginning);
        }
        if (!seekToEnd.isEmpty()) {
            this.consumer.seekToEnd(seekToEnd);
        }
    }

    private void addToResetList(TopicPartition partition, Set<TopicPartition> partitions, String logMessage, String resetPolicy, Set<String> loggedTopics) {
        String topic = partition.topic();
        if (loggedTopics.add(topic)) {
            log.info(logMessage, new Object[]{this.logPrefix, topic, resetPolicy});
        }
        partitions.add(partition);
    }

    private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) {
        if (records != null && !records.isEmpty()) {
            int numAddedRecords = 0;
            for (TopicPartition partition : records.partitions()) {
                StreamTask task = this.activeTasksByPartition.get(partition);
                numAddedRecords += task.addRecords(partition, records.records(partition));
            }
            this.streamsMetrics.skippedRecordsSensor.record((double)(records.count() - numAddedRecords), this.timerStartedMs);
        }
    }

    private long processAndPunctuate(Map<TaskId, StreamTask> tasks, long recordsProcessedBeforeCommit) {
        long totalProcessedEachRound;
        long totalProcessedSinceLastMaybeCommit = 0L;
        do {
            totalProcessedEachRound = 0L;
            Iterator<Map.Entry<TaskId, StreamTask>> it = tasks.entrySet().iterator();
            while (it.hasNext()) {
                StreamTask task = it.next().getValue();
                try {
                    if (!task.process()) continue;
                    ++totalProcessedEachRound;
                    ++totalProcessedSinceLastMaybeCommit;
                }
                catch (ProducerFencedException e) {
                    this.closeZombieTask(task);
                    it.remove();
                }
            }
            if (recordsProcessedBeforeCommit == -1L || totalProcessedSinceLastMaybeCommit < recordsProcessedBeforeCommit) continue;
            totalProcessedSinceLastMaybeCommit = 0L;
            long processLatency = this.computeLatency();
            this.streamsMetrics.processTimeSensor.record((double)processLatency / (double)totalProcessedSinceLastMaybeCommit, this.timerStartedMs);
            this.maybeCommit(this.timerStartedMs);
        } while (totalProcessedEachRound != 0L);
        RuntimeException e = this.performOnStreamTasks(new StreamTaskAction(){
            private String name;

            @Override
            public String name() {
                return this.name;
            }

            @Override
            public void apply(StreamTask task) {
                this.name = "punctuate";
                StreamThread.this.maybePunctuate(task);
                if (task.commitNeeded()) {
                    this.name = "commit";
                    StreamThread.this.commitOne(task);
                }
            }
        });
        if (e != null) {
            throw e;
        }
        return totalProcessedSinceLastMaybeCommit;
    }

    private void maybePunctuate(StreamTask task) {
        try {
            if (task.maybePunctuate()) {
                this.streamsMetrics.punctuateTimeSensor.record((double)this.computeLatency(), this.timerStartedMs);
            }
        }
        catch (KafkaException e) {
            log.error("{} Failed to punctuate active task {}: ", new Object[]{this.logPrefix, task.id(), e});
            throw e;
        }
    }

    private long adjustRecordsProcessedBeforeCommit(long prevRecordsProcessedBeforeCommit, long totalProcessed, long processLatency, long commitTime) {
        long recordsProcessedBeforeCommit = -1L;
        if (processLatency > 0L && processLatency > commitTime) {
            recordsProcessedBeforeCommit = Math.max(1L, commitTime * totalProcessed / processLatency);
            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", new Object[]{this.logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit});
        } else if (prevRecordsProcessedBeforeCommit != -1L && processLatency > 0L) {
            recordsProcessedBeforeCommit = Math.max(1L, commitTime * totalProcessed / processLatency);
            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", new Object[]{this.logPrefix, processLatency, commitTime, totalProcessed, recordsProcessedBeforeCommit});
        }
        return recordsProcessedBeforeCommit;
    }

    protected void maybeCommit(long now) {
        if (this.commitTimeMs >= 0L && this.lastCommitMs + this.commitTimeMs < now) {
            log.debug("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed by {}ms", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet(), this.commitTimeMs, now - this.lastCommitMs});
            this.commitAll();
            this.lastCommitMs = now;
            this.processStandbyRecords = true;
        }
    }

    private void commitAll() {
        RuntimeException e = this.performOnStreamTasks(new StreamTaskAction(){

            @Override
            public String name() {
                return "commit";
            }

            @Override
            public void apply(StreamTask task) {
                StreamThread.this.commitOne(task);
            }
        });
        if (e != null) {
            throw e;
        }
        for (StandbyTask task : this.standbyTasks.values()) {
            this.commitOne(task);
        }
    }

    private void commitOne(AbstractTask task) {
        log.trace("{} Committing {} {}", new Object[]{this.logPrefix, task.getClass().getSimpleName(), task.id()});
        try {
            task.commit();
        }
        catch (CommitFailedException e) {
            log.warn("{} Failed to commit {} {} state: ", new Object[]{this.logPrefix, task.getClass().getSimpleName(), task.id(), e});
        }
        catch (KafkaException e) {
            log.error("{} Failed to commit {} {} state: ", new Object[]{this.logPrefix, task.getClass().getSimpleName(), task.id(), e});
            throw e;
        }
        this.streamsMetrics.commitTimeSensor.record((double)this.computeLatency(), this.timerStartedMs);
    }

    private void maybeUpdateStandbyTasks() {
        if (!this.standbyTasks.isEmpty()) {
            ConsumerRecords records;
            List<ConsumerRecord<byte[], byte[]>> remaining;
            if (this.processStandbyRecords) {
                if (!this.standbyRecords.isEmpty()) {
                    HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
                    for (Map.Entry entry : this.standbyRecords.entrySet()) {
                        TopicPartition partition = (TopicPartition)entry.getKey();
                        remaining = (List<ConsumerRecord<byte[], byte[]>>)entry.getValue();
                        if (remaining == null) continue;
                        StandbyTask task = this.standbyTasksByPartition.get(partition);
                        remaining = task.update(partition, remaining);
                        if (remaining != null) {
                            remainingStandbyRecords.put(partition, remaining);
                            continue;
                        }
                        this.restoreConsumer.resume(Collections.singleton(partition));
                    }
                    this.standbyRecords = remainingStandbyRecords;
                }
                this.processStandbyRecords = false;
            }
            if (!(records = this.restoreConsumer.poll(0L)).isEmpty()) {
                for (TopicPartition topicPartition : records.partitions()) {
                    StandbyTask task = this.standbyTasksByPartition.get(topicPartition);
                    if (task == null) {
                        throw new StreamsException(this.logPrefix + " Missing standby task for partition " + topicPartition);
                    }
                    remaining = task.update(topicPartition, records.records(topicPartition));
                    if (remaining == null) continue;
                    this.restoreConsumer.pause(Collections.singleton(topicPartition));
                    this.standbyRecords.put(topicPartition, remaining);
                }
            }
        }
    }

    protected void maybeClean(long now) {
        if (now > this.lastCleanMs + this.cleanTimeMs) {
            this.stateDirectory.cleanRemovedTasks(this.cleanTimeMs);
            this.lastCleanMs = now;
        }
    }

    private long computeLatency() {
        long previousTimeMs = this.timerStartedMs;
        this.timerStartedMs = this.time.milliseconds();
        return Math.max(this.timerStartedMs - previousTimeMs, 0L);
    }

    public synchronized void close() {
        log.info("{} Informed thread to shut down", (Object)this.logPrefix);
        this.setState(State.PENDING_SHUTDOWN);
    }

    public synchronized boolean isInitialized() {
        return this.state == State.RUNNING;
    }

    public synchronized boolean stillRunning() {
        return this.state.isRunning();
    }

    public Map<TaskId, StreamTask> tasks() {
        return Collections.unmodifiableMap(this.activeTasks);
    }

    public Set<TaskId> prevActiveTasks() {
        return Collections.unmodifiableSet(this.prevActiveTasks);
    }

    public Set<TaskId> cachedTasks() {
        HashSet<TaskId> tasks = new HashSet<TaskId>();
        File[] stateDirs = this.stateDirectory.listTaskDirectories();
        if (stateDirs != null) {
            for (File dir : stateDirs) {
                try {
                    TaskId id = TaskId.parse(dir.getName());
                    if (!new File(dir, ".checkpoint").exists()) continue;
                    tasks.add(id);
                }
                catch (TaskIdFormatException taskIdFormatException) {
                    // empty catch block
                }
            }
        }
        return tasks;
    }

    public void setStateListener(StateListener listener) {
        this.stateListener = listener;
    }

    public synchronized State state() {
        return this.state;
    }

    private synchronized void setStateWhenNotInPendingShutdown(State newState) {
        if (this.state == State.PENDING_SHUTDOWN) {
            return;
        }
        this.setState(newState);
    }

    private synchronized void setState(State newState) {
        State oldState = this.state;
        if (!this.state.isValidTransition(newState)) {
            log.warn("{} Unexpected state transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
        } else {
            log.info("{} State transition from {} to {}.", new Object[]{this.logPrefix, oldState, newState});
        }
        this.state = newState;
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
    }

    @Override
    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder().append(indent).append("StreamsThread appId: ").append(this.applicationId).append("\n").append(indent).append("\tStreamsThread clientId: ").append(this.clientId).append("\n").append(indent).append("\tStreamsThread threadId: ").append(this.getName()).append("\n");
        if (this.activeTasks != null) {
            sb.append(indent).append("\tActive tasks:\n");
            for (Map.Entry entry : this.activeTasks.entrySet()) {
                StreamTask task = (StreamTask)entry.getValue();
                sb.append(indent).append(task.toString(indent + "\t\t"));
            }
        }
        if (this.standbyTasks != null) {
            sb.append(indent).append("\tStandby tasks:\n");
            for (StandbyTask standbyTask : this.standbyTasks.values()) {
                sb.append(indent).append(standbyTask.toString(indent + "\t\t"));
            }
            sb.append("\n");
        }
        return sb.toString();
    }

    String threadClientId() {
        return this.threadClientId;
    }

    void setPartitionAssignor(StreamPartitionAssignor partitionAssignor) {
        this.partitionAssignor = partitionAssignor;
    }

    private void shutdown(boolean cleanRun) {
        log.info("{} Shutting down", (Object)this.logPrefix);
        this.shutdownTasksAndState(cleanRun);
        if (this.threadProducer != null) {
            try {
                this.threadProducer.close();
            }
            catch (Throwable e) {
                log.error("{} Failed to close producer: ", (Object)this.logPrefix, (Object)e);
            }
        }
        try {
            this.consumer.close();
        }
        catch (Throwable e) {
            log.error("{} Failed to close consumer: ", (Object)this.logPrefix, (Object)e);
        }
        try {
            this.restoreConsumer.close();
        }
        catch (Throwable e) {
            log.error("{} Failed to close restore consumer: ", (Object)this.logPrefix, (Object)e);
        }
        try {
            this.partitionAssignor.close();
        }
        catch (Throwable e) {
            log.error("{} Failed to close KafkaStreamClient: ", (Object)this.logPrefix, (Object)e);
        }
        this.removeStreamTasks();
        this.removeStandbyTasks();
        log.info("{} Stream thread shutdown complete", (Object)this.logPrefix);
        this.setState(State.DEAD);
        this.streamsMetrics.removeAllSensors();
    }

    private void shutdownTasksAndState(boolean cleanRun) {
        log.debug("{} shutdownTasksAndState: shutting downactive tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet(), this.suspendedTasks.keySet(), this.suspendedStandbyTasks.keySet()});
        for (AbstractTask task : this.allTasks()) {
            try {
                task.close(cleanRun);
            }
            catch (RuntimeException e) {
                log.error("{} Failed while closing {} {}: ", new Object[]{this.logPrefix, task.getClass().getSimpleName(), task.id(), e});
            }
        }
        this.unAssignChangeLogPartitions();
    }

    private void suspendTasksAndState() {
        log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks {}", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet()});
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        firstException.compareAndSet(null, this.performOnStreamTasks(new StreamTaskAction(){

            @Override
            public String name() {
                return "suspend";
            }

            @Override
            public void apply(StreamTask task) {
                task.suspend();
            }
        }));
        for (StandbyTask task : this.standbyTasks.values()) {
            try {
                task.suspend();
            }
            catch (RuntimeException e) {
                firstException.compareAndSet(null, e);
            }
        }
        firstException.compareAndSet(null, this.unAssignChangeLogPartitions());
        this.updateSuspendedTasks();
        if (firstException.get() != null) {
            throw new StreamsException(this.logPrefix + " failed to suspend stream tasks", firstException.get());
        }
    }

    private RuntimeException unAssignChangeLogPartitions() {
        try {
            this.restoreConsumer.assign(Collections.emptyList());
        }
        catch (RuntimeException e) {
            log.error("{} Failed to un-assign change log partitions: ", (Object)this.logPrefix, (Object)e);
            return e;
        }
        return null;
    }

    private List<AbstractTask> allTasks() {
        List<AbstractTask> tasks = this.activeAndStandbytasks();
        tasks.addAll(this.suspendedAndSuspendedStandbytasks());
        return tasks;
    }

    private List<AbstractTask> activeAndStandbytasks() {
        ArrayList<AbstractTask> tasks = new ArrayList<AbstractTask>(this.activeTasks.values());
        tasks.addAll(this.standbyTasks.values());
        return tasks;
    }

    private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
        ArrayList<AbstractTask> tasks = new ArrayList<AbstractTask>(this.suspendedTasks.values());
        tasks.addAll(this.suspendedStandbyTasks.values());
        return tasks;
    }

    private StreamTask findMatchingSuspendedTask(TaskId taskId, Set<TopicPartition> partitions) {
        if (this.suspendedTasks.containsKey(taskId)) {
            StreamTask task = this.suspendedTasks.get(taskId);
            if (task.partitions.equals(partitions)) {
                return task;
            }
        }
        return null;
    }

    private StandbyTask findMatchingSuspendedStandbyTask(TaskId taskId, Set<TopicPartition> partitions) {
        if (this.suspendedStandbyTasks.containsKey(taskId)) {
            StandbyTask task = this.suspendedStandbyTasks.get(taskId);
            if (task.partitions.equals(partitions)) {
                return task;
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeNonAssignedSuspendedTasks() {
        Map<TaskId, Set<TopicPartition>> newTaskAssignment = this.partitionAssignor.activeTasks();
        Iterator<Map.Entry<TaskId, StreamTask>> suspendedTaskIterator = this.suspendedTasks.entrySet().iterator();
        while (suspendedTaskIterator.hasNext()) {
            Map.Entry<TaskId, StreamTask> next = suspendedTaskIterator.next();
            StreamTask task = next.getValue();
            Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
            if (task.partitions().equals(assignedPartitionsForTask)) continue;
            log.debug("{} Closing suspended non-assigned active task {}", (Object)this.logPrefix, (Object)task.id());
            try {
                task.close(true);
            }
            catch (Exception e) {
                log.error("{} Failed to remove suspended task {}: ", new Object[]{this.logPrefix, next.getKey(), e});
            }
            finally {
                suspendedTaskIterator.remove();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeNonAssignedSuspendedStandbyTasks() {
        Set<TaskId> currentSuspendedTaskIds = this.partitionAssignor.standbyTasks().keySet();
        Iterator<Map.Entry<TaskId, StandbyTask>> standByTaskIterator = this.suspendedStandbyTasks.entrySet().iterator();
        while (standByTaskIterator.hasNext()) {
            Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
            if (currentSuspendedTaskIds.contains(suspendedTask.getKey())) continue;
            StandbyTask task = suspendedTask.getValue();
            log.debug("{} Closing suspended non-assigned standby task {}", (Object)this.logPrefix, (Object)task.id());
            try {
                task.close(true);
            }
            catch (Exception e) {
                log.error("{} Failed to remove suspended standby task {}: ", new Object[]{this.logPrefix, task.id(), e});
            }
            finally {
                standByTaskIterator.remove();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
        StreamTask streamTask;
        log.info("{} Creating active task {} with assigned partitions [{}]", new Object[]{this.logPrefix, id, partitions});
        this.streamsMetrics.taskCreatedSensor.record();
        try {
            streamTask = new StreamTask(id, this.applicationId, partitions, this.builder.build(id.topicGroupId), this.consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, this.createProducer(id));
        }
        catch (Throwable throwable) {
            log.info("{} Created active task {} with assigned partitions {}", new Object[]{this.logPrefix, id, partitions});
            throw throwable;
        }
        log.info("{} Created active task {} with assigned partitions {}", new Object[]{this.logPrefix, id, partitions});
        return streamTask;
    }

    private Producer<byte[], byte[]> createProducer(TaskId id) {
        Producer<byte[], byte[]> producer;
        if (this.eosEnabled) {
            Map<String, Object> producerConfigs = this.config.getProducerConfigs(this.threadClientId + "-" + id);
            log.info("{} Creating producer client for task {}", (Object)this.logPrefix, (Object)id);
            producerConfigs.put("transactional.id", this.applicationId + "-" + id);
            producer = this.clientSupplier.getProducer(producerConfigs);
        } else {
            if (this.threadProducer == null) {
                Map<String, Object> producerConfigs = this.config.getProducerConfigs(this.threadClientId);
                log.info("{} Creating shared producer client", (Object)this.logPrefix);
                this.threadProducer = this.clientSupplier.getProducer(producerConfigs);
            }
            producer = this.threadProducer;
        }
        return producer;
    }

    private void addStreamTasks(Collection<TopicPartition> assignment, long start) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException(this.logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
        }
        HashMap<TaskId, Set<TopicPartition>> newTasks = new HashMap<TaskId, Set<TopicPartition>>();
        log.info("{} Adding assigned tasks as active {}", (Object)this.logPrefix, this.partitionAssignor.activeTasks());
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.activeTasks().entrySet()) {
            TaskId taskId = entry.getKey();
            Set<TopicPartition> partitions = entry.getValue();
            if (assignment.containsAll(partitions)) {
                try {
                    StreamTask task = this.findMatchingSuspendedTask(taskId, partitions);
                    if (task != null) {
                        this.suspendedTasks.remove(taskId);
                        task.resume();
                        this.activeTasks.put(taskId, task);
                        for (TopicPartition partition : partitions) {
                            this.activeTasksByPartition.put(partition, task);
                        }
                        continue;
                    }
                    newTasks.put(taskId, partitions);
                    continue;
                }
                catch (StreamsException e) {
                    log.error("{} Failed to create an active task {}: ", new Object[]{this.logPrefix, taskId, e});
                    throw e;
                }
            }
            log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", new Object[]{this.logPrefix, taskId, partitions, assignment});
        }
        log.debug("{} New active tasks to be created: {}", (Object)this.logPrefix, newTasks);
        this.taskCreator.retryWithBackoff(newTasks, start);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
        log.debug("{} Creating new standby task {} with assigned partitions {}", new Object[]{this.logPrefix, id, partitions});
        this.streamsMetrics.taskCreatedSensor.record();
        ProcessorTopology topology = this.builder.build(id.topicGroupId);
        if (!topology.stateStores().isEmpty()) {
            StandbyTask standbyTask;
            try {
                standbyTask = new StandbyTask(id, this.applicationId, partitions, topology, this.consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory);
            }
            catch (Throwable throwable) {
                log.info("{} Created standby task {} with assigned partitions {}", new Object[]{this.logPrefix, id, partitions});
                throw throwable;
            }
            log.info("{} Created standby task {} with assigned partitions {}", new Object[]{this.logPrefix, id, partitions});
            return standbyTask;
        }
        log.info("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", new Object[]{this.logPrefix, id, partitions});
        return null;
    }

    private void addStandbyTasks(long start) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException(this.logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen.");
        }
        HashMap<TopicPartition, Long> checkpointedOffsets = new HashMap<TopicPartition, Long>();
        HashMap<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
        log.info("{} Adding assigned standby tasks {}", (Object)this.logPrefix, this.partitionAssignor.standbyTasks());
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.standbyTasks().entrySet()) {
            Set<TopicPartition> partitions;
            TaskId taskId = entry.getKey();
            StandbyTask task = this.findMatchingSuspendedStandbyTask(taskId, partitions = entry.getValue());
            if (task != null) {
                this.suspendedStandbyTasks.remove(taskId);
                task.resume();
            } else {
                newStandbyTasks.put(taskId, partitions);
            }
            this.updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
        }
        log.debug("{} New standby tasks to be created: {}", (Object)this.logPrefix, newStandbyTasks);
        new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
        this.restoreConsumer.assign(new ArrayList(checkpointedOffsets.keySet()));
        for (Map.Entry<TaskId, Set<Object>> entry : checkpointedOffsets.entrySet()) {
            TopicPartition partition = (TopicPartition)entry.getKey();
            long offset = (Long)((Object)entry.getValue());
            if (offset >= 0L) {
                this.restoreConsumer.seek(partition, offset);
                continue;
            }
            this.restoreConsumer.seekToBeginning(Collections.singleton(partition));
        }
    }

    private void updateStandByTaskMaps(Map<TopicPartition, Long> checkpointedOffsets, TaskId taskId, Set<TopicPartition> partitions, StandbyTask task) {
        if (task != null) {
            this.standbyTasks.put(taskId, task);
            for (TopicPartition partition : partitions) {
                this.standbyTasksByPartition.put(partition, task);
            }
            for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
                this.standbyTasksByPartition.put(partition, task);
            }
            checkpointedOffsets.putAll(task.checkpointedOffsets());
        }
    }

    private void updateSuspendedTasks() {
        log.info("{} Updating suspended tasks to contain active tasks {}", (Object)this.logPrefix, this.activeTasks.keySet());
        this.suspendedTasks.clear();
        this.suspendedTasks.putAll(this.activeTasks);
        this.suspendedStandbyTasks.putAll(this.standbyTasks);
    }

    private void removeStreamTasks() {
        log.info("{} Removing all active tasks {}", (Object)this.logPrefix, this.activeTasks.keySet());
        try {
            this.prevActiveTasks.clear();
            this.prevActiveTasks.addAll(this.activeTasks.keySet());
            this.activeTasks.clear();
            this.activeTasksByPartition.clear();
        }
        catch (Exception e) {
            log.error("{} Failed to remove stream tasks: ", (Object)this.logPrefix, (Object)e);
        }
    }

    private void removeStandbyTasks() {
        log.info("{} Removing all standby tasks {}", (Object)this.logPrefix, this.standbyTasks.keySet());
        this.standbyTasks.clear();
        this.standbyTasksByPartition.clear();
        this.standbyRecords.clear();
    }

    private void closeZombieTask(StreamTask task) {
        log.warn("{} Producer of task {} fenced; closing zombie task.", (Object)this.logPrefix, (Object)task.id);
        try {
            task.close(false);
        }
        catch (Exception f) {
            if (!log.isDebugEnabled() && !log.isTraceEnabled()) {
                log.warn("{} Failed to close zombie task: {}", (Object)this.logPrefix, (Object)f.getMessage());
            }
            log.debug("{} Failed to close zombie task: ", (Object)this.logPrefix, (Object)f);
        }
        this.activeTasks.remove(task.id);
    }

    private RuntimeException performOnStreamTasks(StreamTaskAction action) {
        RuntimeException firstException = null;
        Iterator<Map.Entry<TaskId, StreamTask>> it = this.activeTasks.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = it.next().getValue();
            try {
                action.apply(task);
            }
            catch (ProducerFencedException e) {
                this.closeZombieTask(task);
                it.remove();
            }
            catch (RuntimeException t) {
                log.error("{} Failed to {} stream task {}: ", new Object[]{this.logPrefix, action.name(), task.id(), t});
                if (firstException != null) continue;
                firstException = t;
            }
        }
        return firstException;
    }

    private class StreamsMetricsThreadImpl
    extends StreamsMetricsImpl {
        final Sensor commitTimeSensor;
        final Sensor pollTimeSensor;
        final Sensor processTimeSensor;
        final Sensor punctuateTimeSensor;
        final Sensor taskCreatedSensor;
        final Sensor tasksClosedSensor;
        final Sensor skippedRecordsSensor;

        StreamsMetricsThreadImpl(Metrics metrics, String groupName, String prefix, Map<String, String> tags) {
            super(metrics, groupName, tags);
            this.commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
            this.commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), (MeasurableStat)new Avg());
            this.commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), (MeasurableStat)new Max());
            this.commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
            this.pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), (MeasurableStat)new Avg());
            this.pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), (MeasurableStat)new Max());
            this.pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
            this.processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), (MeasurableStat)new Avg());
            this.processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), (MeasurableStat)new Max());
            this.processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), (MeasurableStat)new Avg());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), (MeasurableStat)new Max());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
            this.taskCreatedSensor.add(metrics.metricName("task-created-rate", this.groupName, "The average per-second number of newly created tasks", this.tags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
            this.tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
            this.skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), (MeasurableStat)new Rate((SampledStat)new Sum()));
        }

        @Override
        public void recordLatency(Sensor sensor, long startNs, long endNs) {
            sensor.record((double)(endNs - startNs), StreamThread.this.timerStartedMs);
        }

        void removeAllSensors() {
            this.removeSensor(this.commitTimeSensor);
            this.removeSensor(this.pollTimeSensor);
            this.removeSensor(this.processTimeSensor);
            this.removeSensor(this.punctuateTimeSensor);
            this.removeSensor(this.taskCreatedSensor);
            this.removeSensor(this.tasksClosedSensor);
            this.removeSensor(this.skippedRecordsSensor);
        }
    }

    static interface StreamTaskAction {
        public String name();

        public void apply(StreamTask var1);
    }

    class StandbyTaskCreator
    extends AbstractTaskCreator {
        private final Map<TopicPartition, Long> checkpointedOffsets;

        StandbyTaskCreator(Map<TopicPartition, Long> checkpointedOffsets) {
            this.checkpointedOffsets = checkpointedOffsets;
        }

        @Override
        void createTask(TaskId taskId, Set<TopicPartition> partitions) {
            StandbyTask task = StreamThread.this.createStandbyTask(taskId, partitions);
            StreamThread.this.updateStandByTaskMaps(this.checkpointedOffsets, taskId, partitions, task);
        }
    }

    class TaskCreator
    extends AbstractTaskCreator {
        TaskCreator() {
        }

        @Override
        void createTask(TaskId taskId, Set<TopicPartition> partitions) {
            StreamTask task = StreamThread.this.createStreamTask(taskId, partitions);
            StreamThread.this.activeTasks.put(taskId, task);
            for (TopicPartition partition : partitions) {
                StreamThread.this.activeTasksByPartition.put(partition, task);
            }
        }
    }

    abstract class AbstractTaskCreator {
        static final long MAX_BACKOFF_TIME_MS = 1000L;

        AbstractTaskCreator() {
        }

        void retryWithBackoff(Map<TaskId, Set<TopicPartition>> tasksToBeCreated, long start) {
            long backoffTimeMs = 50L;
            HashSet<TaskId> retryingTasks = new HashSet<TaskId>();
            long nextLoggingTime = System.currentTimeMillis() + 10000L;
            while (true) {
                Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = tasksToBeCreated.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions = it.next();
                    TaskId taskId = newTaskAndPartitions.getKey();
                    Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
                    try {
                        this.createTask(taskId, partitions);
                        it.remove();
                        backoffTimeMs = 50L;
                        if (!retryingTasks.remove(taskId) || !log.isWarnEnabled()) continue;
                        log.info("{} Created task {}", (Object)StreamThread.this.logPrefix, (Object)taskId);
                    }
                    catch (LockException e) {
                        if (retryingTasks.contains(taskId)) continue;
                        log.warn("{} Could not create task {}. Will retry: ", new Object[]{StreamThread.this.logPrefix, taskId, e});
                        retryingTasks.add(taskId);
                    }
                }
                if (tasksToBeCreated.isEmpty() || StreamThread.this.time.milliseconds() - start > (long)StreamThread.this.rebalanceTimeoutMs) break;
                try {
                    Thread.sleep(backoffTimeMs);
                    backoffTimeMs <<= 1;
                    backoffTimeMs = Math.min(backoffTimeMs, 1000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (System.currentTimeMillis() <= nextLoggingTime) continue;
                nextLoggingTime += 10000L;
                log.warn("{} Still retrying to create tasks: {}", (Object)StreamThread.this.logPrefix, retryingTasks);
            }
        }

        abstract void createTask(TaskId var1, Set<TopicPartition> var2);
    }

    private class RebalanceListener
    implements ConsumerRebalanceListener {
        private final Time time;
        private final int requestTimeOut;

        RebalanceListener(Time time, int requestTimeOut) {
            this.time = time;
            this.requestTimeOut = requestTimeOut;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
            log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n\tassigned active tasks: {}\n\tassigned standby tasks: {}\n\tcurrent suspended active tasks: {}\n\tcurrent suspended standby tasks: {}\n\tprevious active tasks: {}", new Object[]{StreamThread.this.logPrefix, StreamThread.this.state, assignment, StreamThread.this.partitionAssignor.activeTasks().keySet(), StreamThread.this.partitionAssignor.standbyTasks().keySet(), StreamThread.this.suspendedTasks.keySet(), StreamThread.this.suspendedStandbyTasks.keySet(), StreamThread.this.prevActiveTasks});
            long start = this.time.milliseconds();
            try {
                StreamThread.this.storeChangelogReader = new StoreChangelogReader(StreamThread.this.getName(), StreamThread.this.restoreConsumer, this.time, this.requestTimeOut);
                StreamThread.this.setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
                StreamThread.this.closeNonAssignedSuspendedStandbyTasks();
                StreamThread.this.closeNonAssignedSuspendedTasks();
                StreamThread.this.addStreamTasks(assignment, start);
                StreamThread.this.storeChangelogReader.restore();
                StreamThread.this.addStandbyTasks(start);
                StreamThread.this.streamsMetadataState.onChange(StreamThread.this.partitionAssignor.getPartitionsByHostState(), StreamThread.this.partitionAssignor.clusterMetadata());
                StreamThread.this.lastCleanMs = this.time.milliseconds();
                StreamThread.this.setStateWhenNotInPendingShutdown(State.RUNNING);
            }
            catch (Throwable t) {
                try {
                    StreamThread.this.rebalanceException = t;
                    throw t;
                }
                catch (Throwable throwable) {
                    log.info("{} partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}", new Object[]{StreamThread.this.logPrefix, this.time.milliseconds() - start, StreamThread.this.activeTasks.keySet(), StreamThread.this.standbyTasks.keySet()});
                    throw throwable;
                }
            }
            log.info("{} partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}", new Object[]{StreamThread.this.logPrefix, this.time.milliseconds() - start, StreamThread.this.activeTasks.keySet(), StreamThread.this.standbyTasks.keySet()});
        }

        public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
            log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.\n\tcurrent assigned active tasks: {}\n\tcurrent assigned standby tasks: {}\n", new Object[]{StreamThread.this.logPrefix, StreamThread.this.state, assignment, StreamThread.this.activeTasks.keySet(), StreamThread.this.standbyTasks.keySet()});
            long start = this.time.milliseconds();
            try {
                StreamThread.this.setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                StreamThread.this.lastCleanMs = Long.MAX_VALUE;
                StreamThread.this.suspendTasksAndState();
                StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
            }
            catch (Throwable t) {
                try {
                    StreamThread.this.rebalanceException = t;
                    throw t;
                }
                catch (Throwable throwable) {
                    StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                    StreamThread.this.removeStreamTasks();
                    StreamThread.this.removeStandbyTasks();
                    log.info("{} partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{StreamThread.this.logPrefix, this.time.milliseconds() - start, StreamThread.this.suspendedTasks.keySet(), StreamThread.this.suspendedStandbyTasks.keySet(), StreamThread.this.prevActiveTasks});
                    throw throwable;
                }
            }
            StreamThread.this.removeStreamTasks();
            StreamThread.this.removeStandbyTasks();
            log.info("{} partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{StreamThread.this.logPrefix, this.time.milliseconds() - start, StreamThread.this.suspendedTasks.keySet(), StreamThread.this.suspendedStandbyTasks.keySet(), StreamThread.this.prevActiveTasks});
        }
    }

    public static interface StateListener {
        public void onChange(StreamThread var1, State var2, State var3);
    }

    public static enum State {
        CREATED(1),
        RUNNING(1, 2, 4),
        PARTITIONS_REVOKED(3, 4),
        ASSIGNING_PARTITIONS(1, 4),
        PENDING_SHUTDOWN(5),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return !this.equals((Object)PENDING_SHUTDOWN) && !this.equals((Object)CREATED) && !this.equals((Object)DEAD);
        }

        public boolean isValidTransition(State newState) {
            return this.validTransitions.contains(newState.ordinal());
        }
    }
}

