/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.internals.AckMode;
import reactor.kafka.receiver.internals.AtmostOnceOffsets;
import reactor.kafka.receiver.internals.ConsumerEventLoop;
import reactor.kafka.receiver.internals.KafkaSchedulers;

class ConsumerHandler<K, V> {
    private static final Set<String> DELEGATE_METHODS = new HashSet<String>(Arrays.asList("assignment", "subscription", "seek", "seekToBeginning", "seekToEnd", "position", "committed", "metrics", "partitionsFor", "listTopics", "paused", "pause", "resume", "offsetsForTimes", "beginningOffsets", "endOffsets", "currentLag"));
    private static final AtomicInteger COUNTER = new AtomicInteger();
    final AtomicBoolean awaitingTransaction = new AtomicBoolean();
    private final AtmostOnceOffsets atmostOnceOffsets = new AtmostOnceOffsets();
    private final ReceiverOptions<K, V> receiverOptions;
    final Consumer<K, V> consumer;
    private final Scheduler eventScheduler;
    private final ConsumerEventLoop<K, V> consumerEventLoop;
    private final Sinks.Many<ConsumerRecords<K, V>> sink = Sinks.many().unicast().onBackpressureBuffer();
    private final ReceiverOptions.ConsumerListener consumerListener;
    private final String consumerId;
    private Consumer<K, V> consumerProxy;

    ConsumerHandler(ReceiverOptions<K, V> receiverOptions, Consumer<K, V> consumer, Predicate<Throwable> isRetriableException, AckMode ackMode) {
        this.receiverOptions = receiverOptions;
        this.consumer = consumer;
        this.consumerListener = receiverOptions.consumerListener();
        this.consumerId = "reactor-kafka-" + receiverOptions.groupId() + "-" + COUNTER.incrementAndGet();
        if (this.consumerListener != null) {
            this.consumerListener.consumerAdded(this.consumerId, consumer);
        }
        this.eventScheduler = KafkaSchedulers.newEvent(receiverOptions.groupId());
        this.consumerEventLoop = new ConsumerEventLoop<K, V>(ackMode, this.atmostOnceOffsets, receiverOptions, this.eventScheduler, consumer, isRetriableException, this.sink, this.awaitingTransaction);
        this.eventScheduler.init();
    }

    public Flux<ConsumerRecords<K, V>> receive() {
        return this.sink.asFlux().doOnRequest(this.consumerEventLoop::onRequest);
    }

    public Mono<Void> close() {
        if (this.consumerListener != null) {
            this.consumerListener.consumerRemoved(this.consumerId, this.consumer);
        }
        return this.consumerEventLoop.stop().doFinally(__ -> this.eventScheduler.dispose());
    }

    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return Mono.create(monoSink -> {
            Disposable disposable = this.eventScheduler.schedule(() -> {
                try {
                    Object result = function.apply(this.consumerProxy());
                    monoSink.success(result);
                }
                catch (Exception e) {
                    monoSink.error((Throwable)e);
                }
            });
            monoSink.onCancel(disposable);
        });
    }

    public Mono<Void> commit(ConsumerRecord<K, V> record) {
        long offset = record.offset();
        TopicPartition partition = new TopicPartition(record.topic(), record.partition());
        long committedOffset = this.atmostOnceOffsets.committedOffset(partition);
        this.atmostOnceOffsets.onDispatch(partition, offset);
        long commitAheadSize = this.receiverOptions.atmostOnceCommitAheadSize();
        CommittableOffset committable = new CommittableOffset(partition, offset + commitAheadSize, this.consumerEventLoop.commitEvent, this.receiverOptions.commitBatchSize());
        if (offset >= committedOffset) {
            return committable.commit();
        }
        if (committedOffset - offset >= commitAheadSize / 2L) {
            committable.commit().subscribe();
        }
        return Mono.empty();
    }

    public void acknowledge(ConsumerRecord<K, V> record) {
        this.toCommittableOffset(record).acknowledge();
    }

    public CommittableOffset<K, V> toCommittableOffset(ConsumerRecord<K, V> record) {
        return new CommittableOffset(new TopicPartition(record.topic(), record.partition()), record.offset(), this.consumerEventLoop.commitEvent, this.receiverOptions.commitBatchSize());
    }

    private Consumer<K, V> consumerProxy() {
        if (this.consumerProxy != null) {
            return this.consumerProxy;
        }
        Class[] interfaces = new Class[]{Consumer.class};
        InvocationHandler handler = (proxy, method, args) -> {
            if (DELEGATE_METHODS.contains(method.getName())) {
                try {
                    if (method.getName().equals("pause")) {
                        this.consumerEventLoop.paused((Collection)args[0]);
                    } else if (method.getName().equals("resume")) {
                        this.consumerEventLoop.resumed((Collection)args[0]);
                    }
                    return method.invoke(this.consumer, args);
                }
                catch (InvocationTargetException e) {
                    throw e.getCause();
                }
            }
            throw new UnsupportedOperationException("Method is not supported: " + method);
        };
        this.consumerProxy = (Consumer)Proxy.newProxyInstance(Consumer.class.getClassLoader(), interfaces, handler);
        return this.consumerProxy;
    }

    private static class CommittableOffset<K, V>
    implements ReceiverOffset {
        private final TopicPartition topicPartition;
        private final long commitOffset;
        private final ConsumerEventLoop.CommitEvent commitEvent;
        private final int commitBatchSize;
        private final AtomicBoolean acknowledged = new AtomicBoolean(false);

        public CommittableOffset(TopicPartition topicPartition, long nextOffset, ConsumerEventLoop.CommitEvent commitEvent, int commitBatchSize) {
            this.topicPartition = topicPartition;
            this.commitOffset = nextOffset;
            this.commitEvent = commitEvent;
            this.commitBatchSize = commitBatchSize;
        }

        @Override
        public Mono<Void> commit() {
            if (this.maybeUpdateOffset() > 0) {
                return this.scheduleCommit();
            }
            return Mono.empty();
        }

        @Override
        public void acknowledge() {
            long uncommittedCount = this.maybeUpdateOffset();
            if (this.commitBatchSize > 0 && uncommittedCount >= (long)this.commitBatchSize) {
                this.commitEvent.scheduleIfRequired();
            }
        }

        @Override
        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        @Override
        public long offset() {
            return this.commitOffset;
        }

        private int maybeUpdateOffset() {
            if (this.acknowledged.compareAndSet(false, true)) {
                return this.commitEvent.commitBatch.updateOffset(this.topicPartition, this.commitOffset);
            }
            return this.commitEvent.commitBatch.batchSize();
        }

        private Mono<Void> scheduleCommit() {
            return Mono.create(emitter -> {
                this.commitEvent.commitBatch.addCallbackEmitter((MonoSink<Void>)emitter);
                this.commitEvent.scheduleIfRequired();
            });
        }

        public String toString() {
            return this.topicPartition + "@" + this.commitOffset;
        }
    }
}

