/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.kafka.client.producer.impl;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.tracing.ProducerTracer;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;

public class KafkaWriteStreamImpl<K, V>
implements KafkaWriteStream<K, V> {
    private long maxSize = 0x100000L;
    private long pending;
    private final Producer<K, V> producer;
    private Handler<Void> drainHandler;
    private Handler<Throwable> exceptionHandler;
    private final VertxInternal vertx;
    private final ProducerTracer tracer;
    private final Executor workerExec;

    public KafkaWriteStreamImpl(Vertx vertx, Producer<K, V> producer, KafkaClientOptions options) {
        ContextInternal ctxInt = ((ContextInternal)vertx.getOrCreateContext()).unwrap();
        this.producer = producer;
        this.vertx = (VertxInternal)vertx;
        this.tracer = ProducerTracer.create(ctxInt.tracer(), options);
        this.workerExec = ((VertxInternal)vertx).createWorkerContext().executor();
    }

    private int len(Object value) {
        if (value instanceof byte[]) {
            return ((byte[])value).length;
        }
        if (value instanceof String) {
            return ((String)value).length();
        }
        return 1;
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        ContextInternal ctx = this.vertx.getOrCreateContext();
        ProducerTracer.StartedSpan startedSpan = this.tracer == null ? null : this.tracer.prepareSendMessage((Context)ctx, record);
        int len = this.len(record.value());
        this.pending += (long)len;
        PromiseInternal prom = ctx.promise();
        this.workerExec.execute(() -> this.lambda$send$4(record, ctx, len, startedSpan, (Promise)prom));
        return prom.future();
    }

    public Future<Void> write(ProducerRecord<K, V> record) {
        return this.send(record).mapEmpty();
    }

    @Override
    public KafkaWriteStreamImpl<K, V> setWriteQueueMaxSize(int size) {
        this.maxSize = size;
        return this;
    }

    public synchronized boolean writeQueueFull() {
        return this.pending >= this.maxSize;
    }

    @Override
    public synchronized KafkaWriteStreamImpl<K, V> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    public Future<Void> end() {
        Promise promise = Promise.promise();
        this.vertx.runOnContext(v -> promise.complete());
        return promise.future();
    }

    @Override
    public Future<Void> initTransactions() {
        return this.executeBlocking(() -> this.producer.initTransactions());
    }

    @Override
    public Future<Void> beginTransaction() {
        return this.executeBlocking(() -> this.producer.beginTransaction());
    }

    @Override
    public Future<Void> commitTransaction() {
        return this.executeBlocking(() -> this.producer.commitTransaction());
    }

    @Override
    public Future<Void> abortTransaction() {
        return this.executeBlocking(() -> this.producer.abortTransaction());
    }

    @Override
    public KafkaWriteStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public Future<List<PartitionInfo>> partitionsFor(String topic) {
        return this.execute(() -> this.producer.partitionsFor(topic));
    }

    @Override
    public Future<Void> flush() {
        return this.execute(() -> {
            this.producer.flush();
            return null;
        });
    }

    @Override
    public Future<Void> close() {
        return this.close(0L);
    }

    @Override
    public Future<Void> close(long timeout) {
        return this.execute(() -> {
            if (timeout > 0L) {
                this.producer.close(Duration.ofMillis(timeout));
            } else {
                this.producer.close();
            }
            return null;
        });
    }

    private <T> Future<T> execute(Callable<T> callable) {
        ContextInternal ctx = this.vertx.getOrCreateContext();
        return ctx.future(promise -> this.workerExec.execute(() -> {
            try {
                Object res = callable.call();
                promise.complete(res);
            }
            catch (Exception e) {
                promise.fail((Throwable)e);
            }
        }));
    }

    @Override
    public Producer<K, V> unwrap() {
        return this.producer;
    }

    Future<Void> executeBlocking(BlockingStatement statement) {
        return this.execute(() -> {
            statement.execute();
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private /* synthetic */ void lambda$send$4(ProducerRecord record, ContextInternal ctx, int len, ProducerTracer.StartedSpan startedSpan, Promise prom) {
        try {
            this.producer.send(record, (metadata, err) -> {
                ctx.runOnContext(v1 -> {
                    Handler<Void> drainHandler;
                    Handler<Throwable> exceptionHandler;
                    KafkaWriteStreamImpl kafkaWriteStreamImpl = this;
                    synchronized (kafkaWriteStreamImpl) {
                        exceptionHandler = this.exceptionHandler;
                        long lowWaterMark = this.maxSize / 2L;
                        this.pending -= (long)len;
                        if (this.pending < lowWaterMark && this.drainHandler != null) {
                            drainHandler = this.drainHandler;
                            this.drainHandler = null;
                        } else {
                            drainHandler = null;
                        }
                    }
                    if (err != null) {
                        if (exceptionHandler != null) {
                            ctx.runOnContext(v2 -> exceptionHandler.handle((Object)err));
                        } else {
                            ctx.reportException((Throwable)err);
                        }
                    }
                    if (drainHandler != null) {
                        ctx.runOnContext(drainHandler);
                    }
                });
                if (err != null) {
                    if (startedSpan != null) {
                        startedSpan.fail((Context)ctx, err);
                    }
                    prom.fail((Throwable)err);
                } else {
                    if (startedSpan != null) {
                        startedSpan.finish((Context)ctx);
                    }
                    prom.complete((Object)metadata);
                }
            });
        }
        catch (Throwable e) {
            KafkaWriteStreamImpl kafkaWriteStreamImpl = this;
            synchronized (kafkaWriteStreamImpl) {
                if (this.exceptionHandler != null) {
                    Handler<Throwable> exceptionHandler = this.exceptionHandler;
                    ctx.runOnContext(v3 -> exceptionHandler.handle((Object)e));
                }
            }
            if (startedSpan != null) {
                startedSpan.fail((Context)ctx, e);
            }
            prom.fail(e);
        }
    }

    @FunctionalInterface
    private static interface BlockingStatement {
        public void execute();
    }
}

