/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi.overflow;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Subscription;

public class MultiOnOverflowBufferOp<T>
extends AbstractMultiOperator<T, T> {
    private final int bufferSize;
    private final boolean unbounded;
    private final Consumer<T> dropConsumer;
    private final Function<T, Uni<?>> dropUniMapper;

    public MultiOnOverflowBufferOp(Multi<T> upstream, int bufferSize, boolean unbounded, Consumer<T> dropConsumer, Function<T, Uni<?>> dropUniMapper) {
        super(upstream);
        this.bufferSize = bufferSize;
        this.unbounded = unbounded;
        this.dropConsumer = dropConsumer;
        this.dropUniMapper = dropUniMapper;
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> downstream) {
        OnOverflowBufferProcessor subscriber = new OnOverflowBufferProcessor(downstream, this.bufferSize, this.unbounded);
        this.upstream.subscribe().withSubscriber(subscriber);
    }

    class OnOverflowBufferProcessor
    extends MultiOperatorProcessor<T, T> {
        private final Queue<T> queue;
        Throwable failure;
        private final AtomicLong requested;
        private final AtomicInteger wip;
        volatile boolean cancelled;
        volatile boolean done;

        OnOverflowBufferProcessor(MultiSubscriber<? super T> downstream, int bufferSize, boolean unbounded) {
            super(downstream);
            this.requested = new AtomicLong();
            this.wip = new AtomicInteger();
            this.queue = unbounded ? Queues.unbounded(bufferSize).get() : Queues.createStrictSizeQueue(bufferSize);
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            if (this.upstream.compareAndSet(null, subscription)) {
                this.downstream.onSubscribe(this);
                subscription.request(Long.MAX_VALUE);
            } else {
                subscription.cancel();
            }
        }

        @Override
        public void onItem(T t) {
            if (!this.queue.offer(t)) {
                BackPressureFailure bpf = new BackPressureFailure("The overflow buffer is full, which is due to the upstream sending too many items w.r.t. the downstream capacity and/or the downstream not consuming items fast enough");
                if (MultiOnOverflowBufferOp.this.dropUniMapper != null) {
                    this.notifyOnOverflowCall(t, bpf);
                } else {
                    this.notifyOnOverflowInvoke(t, bpf);
                }
            } else {
                this.drain();
            }
        }

        private void notifyOnOverflowInvoke(T t, BackPressureFailure bpf) {
            if (MultiOnOverflowBufferOp.this.dropConsumer != null) {
                try {
                    MultiOnOverflowBufferOp.this.dropConsumer.accept(t);
                }
                catch (Throwable e) {
                    bpf.addSuppressed(e);
                }
            }
            this.onFailure(bpf);
        }

        private void notifyOnOverflowCall(T t, BackPressureFailure bpf) {
            super.cancel();
            try {
                Uni uni = (Uni)ParameterValidation.nonNull(MultiOnOverflowBufferOp.this.dropUniMapper.apply(t), "uni");
                uni.subscribe().with(ignored -> this.downstream.onFailure(bpf), failure -> {
                    bpf.addSuppressed((Throwable)failure);
                    this.downstream.onFailure(bpf);
                });
            }
            catch (Throwable failure2) {
                bpf.addSuppressed(failure2);
                this.downstream.onFailure(bpf);
            }
        }

        @Override
        public void onFailure(Throwable failure) {
            this.failure = failure;
            this.done = true;
            this.drain();
        }

        @Override
        public void onCompletion() {
            this.done = true;
            this.drain();
        }

        @Override
        public void request(long n) {
            if (n > 0L) {
                Subscriptions.add(this.requested, n);
                this.drain();
            }
        }

        @Override
        public void cancel() {
            if (!this.cancelled) {
                this.cancelled = true;
                super.cancel();
                if (this.wip.getAndIncrement() == 0) {
                    this.queue.clear();
                }
            }
        }

        void drain() {
            block5: {
                if (this.wip.getAndIncrement() != 0) break block5;
                int missed = 1;
                Queue qe = this.queue;
                do {
                    boolean empty;
                    boolean d;
                    long emitted;
                    if (this.checkTerminated(this.done, qe.isEmpty())) {
                        return;
                    }
                    long req = this.requested.get();
                    for (emitted = 0L; emitted != req; ++emitted) {
                        boolean wasEmpty;
                        boolean wasDone = this.done;
                        Object item = qe.poll();
                        boolean bl = wasEmpty = item == null;
                        if (this.checkTerminated(wasDone, wasEmpty)) {
                            return;
                        }
                        if (wasEmpty) break;
                        this.downstream.onItem(item);
                    }
                    if (emitted == req && this.checkTerminated(d = this.done, empty = qe.isEmpty())) {
                        return;
                    }
                    if (emitted == 0L || req == Long.MAX_VALUE) continue;
                    this.requested.addAndGet(-emitted);
                } while ((missed = this.wip.addAndGet(-missed)) != 0);
            }
        }

        boolean checkTerminated(boolean wasDone, boolean wasEmpty) {
            if (this.cancelled) {
                this.queue.clear();
                return true;
            }
            if (wasDone) {
                if (this.failure != null) {
                    this.queue.clear();
                    if (this.failure instanceof BackPressureFailure) {
                        super.cancel();
                        this.downstream.onFailure(this.failure);
                    } else {
                        super.onFailure(this.failure);
                    }
                    return true;
                }
                if (wasEmpty) {
                    super.onCompletion();
                    return true;
                }
            }
            return false;
        }
    }
}

