/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class KafkaLatestCommit
extends ContextHolder
implements KafkaCommitHandler {
    private final io.vertx.kafka.client.consumer.KafkaConsumer<?, ?> consumer;
    private final Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();

    public KafkaLatestCommit(Vertx vertx, KafkaConnectorIncomingConfiguration configuration, KafkaConsumer<?, ?> consumer) {
        super(vertx, configuration.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000));
        this.consumer = consumer.getDelegate();
    }

    @Override
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> record) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.runOnContext(() -> {
            HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
            TopicPartition key = new TopicPartition(record.getTopic(), record.getPartition());
            Long last = this.offsets.get(key);
            if (last == null || last < record.getOffset() + 1L) {
                this.offsets.put(key, record.getOffset() + 1L);
                map.put(key, new OffsetAndMetadata(record.getOffset() + 1L, null));
                this.consumer.commit(map, ar -> {
                    if (ar.failed()) {
                        future.completeExceptionally(ar.cause());
                    } else {
                        future.complete(null);
                    }
                });
            } else {
                future.complete(null);
            }
        });
        return future;
    }
}

