/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.consumer.impl;

import io.vertx.core.Completable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.internal.ContextInternal;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.CloseHandler;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import io.vertx.kafka.client.consumer.OffsetAndTimestamp;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordImpl;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerRecordsImpl;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;

public class KafkaConsumerImpl<K, V>
implements KafkaConsumer<K, V> {
    private static final Function<Map<?, Object>, Object> mapLongFunction = done -> done.values().stream().findFirst().get();
    private final KafkaReadStream<K, V> stream;
    private final CloseHandler closeHandler;

    private static <K, V> Function<Map<K, V>, V> foo() {
        return mapLongFunction;
    }

    public KafkaConsumerImpl(KafkaReadStream<K, V> stream) {
        this.stream = stream;
        this.closeHandler = new CloseHandler((timeout, ar) -> stream.close().onComplete(ar));
    }

    public synchronized KafkaConsumerImpl<K, V> registerCloseHook() {
        Context context = Vertx.currentContext();
        if (context == null) {
            return this;
        }
        this.closeHandler.registerCloseHook((ContextInternal)context);
        return this;
    }

    @Override
    public KafkaConsumer<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.stream.exceptionHandler((Handler)handler);
        return this;
    }

    @Override
    public KafkaConsumer<K, V> handler(Handler<KafkaConsumerRecord<K, V>> handler) {
        if (handler != null) {
            this.stream.handler(record -> handler.handle(new KafkaConsumerRecordImpl(record)));
        } else {
            this.stream.handler((Handler)null);
        }
        return this;
    }

    @Override
    public KafkaConsumer<K, V> pause() {
        this.stream.pause();
        return this;
    }

    @Override
    public KafkaConsumer<K, V> resume() {
        this.stream.resume();
        return this;
    }

    @Override
    public KafkaConsumer<K, V> fetch(long amount) {
        this.stream.fetch(amount);
        return this;
    }

    @Override
    public long demand() {
        return this.stream.demand();
    }

    @Override
    public Future<Void> pause(Set<TopicPartition> topicPartitions) {
        return this.stream.pause(Helper.to(topicPartitions));
    }

    @Override
    public Future<Set<TopicPartition>> paused() {
        return this.stream.paused().map(Helper::from);
    }

    @Override
    public Future<Void> resume(TopicPartition topicPartition) {
        return this.resume(Collections.singleton(topicPartition));
    }

    @Override
    public Future<Void> resume(Set<TopicPartition> topicPartitions) {
        return this.stream.resume(Helper.to(topicPartitions));
    }

    @Override
    public KafkaConsumer<K, V> endHandler(Handler<Void> endHandler) {
        this.stream.endHandler((Handler)endHandler);
        return this;
    }

    @Override
    public Future<Void> subscribe(String topic) {
        return this.subscribe(Collections.singleton(topic));
    }

    @Override
    public Future<Void> subscribe(Set<String> topics) {
        return this.stream.subscribe(topics);
    }

    @Override
    public Future<Void> subscribe(Pattern pattern) {
        return this.stream.subscribe(pattern);
    }

    @Override
    public Future<Void> assign(TopicPartition topicPartition) {
        return this.assign(Collections.singleton(topicPartition));
    }

    @Override
    public Future<Void> assign(Set<TopicPartition> topicPartitions) {
        return this.stream.assign(Helper.to(topicPartitions));
    }

    @Override
    public Future<Set<TopicPartition>> assignment() {
        return this.stream.assignment().map(Helper::from);
    }

    @Override
    public Future<Map<String, List<io.vertx.kafka.client.common.PartitionInfo>>> listTopics() {
        return this.stream.listTopics().map(done -> {
            HashMap topics = new HashMap();
            for (Map.Entry topicEntry : done.entrySet()) {
                ArrayList<io.vertx.kafka.client.common.PartitionInfo> partitions = new ArrayList<io.vertx.kafka.client.common.PartitionInfo>();
                for (PartitionInfo kafkaPartitionInfo : (List)topicEntry.getValue()) {
                    io.vertx.kafka.client.common.PartitionInfo partitionInfo = new io.vertx.kafka.client.common.PartitionInfo();
                    partitionInfo.setInSyncReplicas(Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList())).setLeader(Helper.from(kafkaPartitionInfo.leader())).setPartition(kafkaPartitionInfo.partition()).setReplicas(Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList())).setTopic(kafkaPartitionInfo.topic());
                    partitions.add(partitionInfo);
                }
                topics.put((String)topicEntry.getKey(), partitions);
            }
            return topics;
        });
    }

    @Override
    public Future<Void> unsubscribe() {
        return this.stream.unsubscribe();
    }

    @Override
    public Future<Set<String>> subscription() {
        return this.stream.subscription();
    }

    @Override
    public Future<Void> pause(TopicPartition topicPartition) {
        return this.pause(Collections.singleton(topicPartition));
    }

    @Override
    public KafkaConsumer<K, V> partitionsRevokedHandler(Handler<Set<TopicPartition>> handler) {
        this.stream.partitionsRevokedHandler(Helper.adaptHandler(handler));
        return this;
    }

    @Override
    public KafkaConsumer<K, V> partitionsAssignedHandler(Handler<Set<TopicPartition>> handler) {
        this.stream.partitionsAssignedHandler(Helper.adaptHandler(handler));
        return this;
    }

    @Override
    public Future<Void> seek(TopicPartition topicPartition, long offset) {
        return this.stream.seek(Helper.to(topicPartition), offset);
    }

    @Override
    public Future<Void> seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        return this.stream.seek(Helper.to(topicPartition), Helper.to(offsetAndMetadata));
    }

    @Override
    public Future<Void> seekToBeginning(TopicPartition topicPartition) {
        return this.seekToBeginning(Collections.singleton(topicPartition));
    }

    @Override
    public Future<Void> seekToBeginning(Set<TopicPartition> topicPartitions) {
        return this.stream.seekToBeginning(Helper.to(topicPartitions));
    }

    @Override
    public Future<Void> seekToEnd(TopicPartition topicPartition) {
        return this.seekToEnd(Collections.singleton(topicPartition));
    }

    @Override
    public Future<Void> seekToEnd(Set<TopicPartition> topicPartitions) {
        return this.stream.seekToEnd(Helper.to(topicPartitions));
    }

    @Override
    public Future<Void> commit() {
        return this.stream.commit().mapEmpty();
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        return this.stream.commit(Helper.to(offsets)).map(Helper::from);
    }

    @Override
    public Future<OffsetAndMetadata> committed(TopicPartition topicPartition) {
        return this.stream.committed(Helper.to(topicPartition)).map(Helper::from);
    }

    @Override
    public Future<List<io.vertx.kafka.client.common.PartitionInfo>> partitionsFor(String topic) {
        return this.stream.partitionsFor(topic).map(done -> {
            ArrayList<io.vertx.kafka.client.common.PartitionInfo> partitions = new ArrayList<io.vertx.kafka.client.common.PartitionInfo>();
            for (PartitionInfo kafkaPartitionInfo : done) {
                io.vertx.kafka.client.common.PartitionInfo partitionInfo = new io.vertx.kafka.client.common.PartitionInfo();
                partitionInfo.setInSyncReplicas(Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList())).setLeader(Helper.from(kafkaPartitionInfo.leader())).setPartition(kafkaPartitionInfo.partition()).setReplicas(Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList())).setTopic(kafkaPartitionInfo.topic());
                partitions.add(partitionInfo);
            }
            return partitions;
        });
    }

    @Override
    public Future<Void> close() {
        Promise promise = Promise.promise();
        this.closeHandler.close((Completable<Void>)promise);
        return promise.future();
    }

    @Override
    public Future<Long> position(TopicPartition partition) {
        return this.stream.position(Helper.to(partition));
    }

    @Override
    public Future<OffsetAndTimestamp> offsetsForTimes(TopicPartition topicPartition, Long timestamp) {
        HashMap<TopicPartition, Long> topicPartitions = new HashMap<TopicPartition, Long>();
        topicPartitions.put(topicPartition, timestamp);
        return this.stream.offsetsForTimes(Helper.toTopicPartitionTimes(topicPartitions)).map(done -> {
            if (done.values().size() == 1) {
                org.apache.kafka.common.TopicPartition kTopicPartition = new org.apache.kafka.common.TopicPartition(topicPartition.getTopic(), topicPartition.getPartition());
                org.apache.kafka.clients.consumer.OffsetAndTimestamp offsetAndTimestamp = (org.apache.kafka.clients.consumer.OffsetAndTimestamp)done.get(kTopicPartition);
                if (offsetAndTimestamp != null) {
                    OffsetAndTimestamp resultOffsetAndTimestamp = new OffsetAndTimestamp(offsetAndTimestamp.offset(), offsetAndTimestamp.timestamp());
                    return resultOffsetAndTimestamp;
                }
                return null;
            }
            if (done.values().size() == 0) {
                return null;
            }
            throw new VertxException("offsetsForTimes should return exactly one OffsetAndTimestamp", true);
        });
    }

    @Override
    public Future<Map<TopicPartition, OffsetAndTimestamp>> offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps) {
        return this.stream.offsetsForTimes(Helper.toTopicPartitionTimes(topicPartitionTimestamps)).map(Helper::fromTopicPartitionOffsetAndTimestamp);
    }

    @Override
    public Future<Map<TopicPartition, Long>> beginningOffsets(Set<TopicPartition> topicPartitions) {
        return this.stream.beginningOffsets(Helper.to(topicPartitions)).map(Helper::fromTopicPartitionOffsets);
    }

    @Override
    public Future<Long> beginningOffsets(TopicPartition topicPartition) {
        HashSet<TopicPartition> beginningOffsets = new HashSet<TopicPartition>();
        beginningOffsets.add(topicPartition);
        return this.stream.beginningOffsets(Helper.to(beginningOffsets)).map(KafkaConsumerImpl.foo());
    }

    @Override
    public Future<Map<TopicPartition, Long>> endOffsets(Set<TopicPartition> topicPartitions) {
        return this.stream.endOffsets(Helper.to(topicPartitions)).map(Helper::fromTopicPartitionOffsets);
    }

    @Override
    public Future<Long> endOffsets(TopicPartition topicPartition) {
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
        topicPartitions.add(topicPartition);
        return this.stream.endOffsets(Helper.to(topicPartitions)).map(KafkaConsumerImpl.foo());
    }

    @Override
    public KafkaReadStream<K, V> asStream() {
        return this.stream;
    }

    @Override
    public Consumer<K, V> unwrap() {
        return this.stream.unwrap();
    }

    @Override
    public KafkaConsumer<K, V> batchHandler(Handler<KafkaConsumerRecords<K, V>> handler) {
        this.stream.batchHandler(records -> handler.handle(new KafkaConsumerRecordsImpl(records)));
        return this;
    }

    @Override
    public KafkaConsumer<K, V> pollTimeout(Duration timeout) {
        this.stream.pollTimeout(timeout);
        return this;
    }

    @Override
    public Future<KafkaConsumerRecords<K, V>> poll(Duration timeout) {
        return this.stream.poll(timeout).map(done -> new KafkaConsumerRecordsImpl(done));
    }
}

