package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.2.jar:org/apache/kafka/streams/state/internals/CachingKeyValueStore.class */
class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<Bytes, byte[]>, CachedStateStore<K, V> {
    private final KeyValueStore<Bytes, byte[]> underlying;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private CacheFlushListener<K, V> flushListener;
    private boolean sendOldValues;
    private String cacheName;
    private ThreadCache cache;
    private InternalProcessorContext context;
    private StateSerdes<K, V> serdes;
    private Thread streamThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingKeyValueStore(KeyValueStore<Bytes, byte[]> keyValueStore, Serde<K> serde, Serde<V> serde2) {
        super(keyValueStore);
        this.underlying = keyValueStore;
        this.keySerde = serde;
        this.valueSerde = serde2;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.underlying.init(processorContext, stateStore);
        initInternal(processorContext);
        this.streamThread = Thread.currentThread();
    }

    private void initInternal(final ProcessorContext processorContext) {
        this.context = (InternalProcessorContext) processorContext;
        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(processorContext.applicationId(), this.underlying.name()), this.keySerde == null ? processorContext.keySerde() : this.keySerde, this.valueSerde == null ? processorContext.valueSerde() : this.valueSerde);
        this.cache = this.context.getCache();
        this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(processorContext.taskId().toString(), this.underlying.name());
        this.cache.addDirtyEntryFlushListener(this.cacheName, new ThreadCache.DirtyEntryFlushListener() { // from class: org.apache.kafka.streams.state.internals.CachingKeyValueStore.1
            @Override // org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener
            public void apply(List<ThreadCache.DirtyEntry> list) {
                Iterator<ThreadCache.DirtyEntry> it = list.iterator();
                while (it.hasNext()) {
                    CachingKeyValueStore.this.putAndMaybeForward(it.next(), (InternalProcessorContext) processorContext);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext internalProcessorContext) {
        RecordContext recordContext = internalProcessorContext.recordContext();
        try {
            internalProcessorContext.setRecordContext(dirtyEntry.recordContext());
            if (this.flushListener != null) {
                this.flushListener.apply(this.serdes.keyFrom(dirtyEntry.key().get()), this.serdes.valueFrom(dirtyEntry.newValue()), this.sendOldValues ? this.serdes.valueFrom(this.underlying.get(dirtyEntry.key())) : null);
            }
            this.underlying.put(dirtyEntry.key(), dirtyEntry.newValue());
            internalProcessorContext.setRecordContext(recordContext);
        } catch (Throwable th) {
            internalProcessorContext.setRecordContext(recordContext);
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.state.internals.CachedStateStore
    public void setFlushListener(CacheFlushListener<K, V> cacheFlushListener, boolean z) {
        this.flushListener = cacheFlushListener;
        this.sendOldValues = z;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public synchronized void flush() {
        this.cache.flush(this.cacheName);
        this.underlying.flush();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public void close() {
        flush();
        this.underlying.close();
        this.cache.close(this.cacheName);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return this.underlying.persistent();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.underlying.isOpen();
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized byte[] get(Bytes bytes) {
        validateStoreOpen();
        Objects.requireNonNull(bytes);
        return getInternal(bytes);
    }

    private byte[] getInternal(Bytes bytes) {
        LRUCacheEntry lRUCacheEntry = this.cache.get(this.cacheName, bytes);
        if (lRUCacheEntry != null) {
            if (lRUCacheEntry.value == null) {
                return null;
            }
            return lRUCacheEntry.value;
        }
        byte[] bArr = this.underlying.get(bytes);
        if (bArr == null) {
            return null;
        }
        if (Thread.currentThread().equals(this.streamThread)) {
            this.cache.put(this.cacheName, bytes, new LRUCacheEntry(bArr));
        }
        return bArr;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.cache.range(this.cacheName, bytes, bytes2), this.underlying.range(bytes, bytes2));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public KeyValueIterator<Bytes, byte[]> all() {
        validateStoreOpen();
        return new MergedSortedCacheKeyValueBytesStoreIterator(this.cache.all(this.cacheName), new DelegatingPeekingKeyValueIterator(name(), this.underlying.all()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized long approximateNumEntries() {
        validateStoreOpen();
        return this.underlying.approximateNumEntries();
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized void put(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        putInternal(bytes, bArr);
    }

    private synchronized void putInternal(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        this.cache.put(this.cacheName, bytes, new LRUCacheEntry(bArr, true, this.context.offset(), this.context.timestamp(), this.context.partition(), this.context.topic()));
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        Objects.requireNonNull(bytes, "key cannot be null");
        validateStoreOpen();
        byte[] internal = getInternal(bytes);
        if (internal == null) {
            putInternal(bytes, bArr);
        }
        return internal;
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized void putAll(List<KeyValue<Bytes, byte[]>> list) {
        for (KeyValue<Bytes, byte[]> keyValue : list) {
            put(keyValue.key, keyValue.value);
        }
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] delete(Bytes bytes) {
        validateStoreOpen();
        Objects.requireNonNull(bytes);
        byte[] internal = getInternal(bytes);
        this.cache.delete(this.cacheName, bytes);
        this.underlying.delete(bytes);
        return internal;
    }

    KeyValueStore<Bytes, byte[]> underlying() {
        return this.underlying;
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore, org.apache.kafka.streams.state.internals.WrappedStateStore
    public StateStore inner() {
        return this.underlying instanceof WrappedStateStore ? ((WrappedStateStore) this.underlying).inner() : this.underlying;
    }
}
