/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.BasicProperties;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import io.vertx.rabbitmq.impl.RabbitMQClientImpl;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;

public class RabbitMQPublisherImpl
implements RabbitMQPublisher,
ReadStream<RabbitMQPublisherConfirmation> {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisherImpl.class);
    private final RabbitMQClient client;
    private final ConfirmationQueue confirmationQueue;
    private final ContextInternal context;
    private final RabbitMQPublisherOptions options;
    private final Deque<MessageDetails> pendingAcks = new ArrayDeque<MessageDetails>();
    private final SendQueue sendQueue;
    private long lastChannelInstance = 0L;
    private volatile boolean stopped = false;

    public RabbitMQPublisherImpl(Vertx vertx, RabbitMQClient client, RabbitMQPublisherOptions options) {
        this.client = client;
        this.context = (ContextInternal)vertx.getOrCreateContext();
        this.confirmationQueue = new ConfirmationQueue(this.context);
        this.sendQueue = new SendQueue(this.context);
        this.options = options;
        this.client.addConnectionEstablishedCallback((Handler<Promise<Void>>)((Handler)p -> {
            this.addConfirmListener(client, options, (Promise<Void>)p);
            if (client instanceof RabbitMQClientImpl) {
                if (this.lastChannelInstance == 0L) {
                    this.lastChannelInstance = ((RabbitMQClientImpl)client).getChannelInstance();
                } else if (this.lastChannelInstance != ((RabbitMQClientImpl)client).getChannelInstance()) {
                    this.pendingAcks.clear();
                    this.lastChannelInstance = ((RabbitMQClientImpl)client).getChannelInstance();
                }
            }
        }));
    }

    @Override
    public Future<Void> start() {
        Promise<Void> promise = this.startForPromise();
        return promise.future();
    }

    private void stop(Promise<Void> resultHandler) {
        this.stopped = true;
        this.sendQueue.checkpoint(resultHandler);
    }

    @Override
    public Future<Void> stop() {
        Promise promise = Promise.promise();
        this.stop((Promise<Void>)promise);
        return promise.future();
    }

    @Override
    public void restart() {
        this.stopped = false;
        this.sendQueue.fetch(Long.MAX_VALUE);
    }

    private Promise<Void> startForPromise() {
        Promise promise = Promise.promise();
        this.addConfirmListener(this.client, this.options, (Promise<Void>)promise);
        return promise;
    }

    protected final void addConfirmListener(RabbitMQClient client1, RabbitMQPublisherOptions options1, Promise<Void> promise) {
        this.context.runOnContext(unused -> client1.addConfirmListener(options1.getMaxInternalQueueSize()).onComplete(ar -> {
            if (ar.succeeded()) {
                ((ReadStream)ar.result()).handler(confirmation -> this.handleConfirmation((RabbitMQConfirmation)confirmation));
                promise.complete();
            } else {
                log.error((Object)"Failed to add confirmListener: ", ar.cause());
                promise.fail(ar.cause());
            }
        }));
    }

    @Override
    public ReadStream<RabbitMQPublisherConfirmation> getConfirmationStream() {
        return this;
    }

    @Override
    public int queueSize() {
        return this.sendQueue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleMessageSend(MessageDetails md) {
        this.sendQueue.pause();
        Deque<MessageDetails> deque = this.pendingAcks;
        synchronized (deque) {
            this.pendingAcks.add(md);
        }
        this.doSend(md);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doSend(MessageDetails md) {
        try {
            this.client.basicPublishWithDeliveryTag(md.exchange, md.routingKey, md.properties, md.message, (Handler<Long>)((Handler)dt -> md.setDeliveryTag((long)dt))).onComplete(publishResult -> {
                if (publishResult.succeeded()) {
                    if (md.publishHandler != null) {
                        try {
                            md.publishHandler.handle(publishResult);
                        }
                        catch (Throwable ex) {
                            log.warn((Object)"Failed to handle publish result", ex);
                        }
                    }
                    this.sendQueue.fetch(Long.MAX_VALUE);
                } else {
                    log.info((Object)("Failed to publish message: " + publishResult.cause().toString()));
                    Deque<MessageDetails> deque = this.pendingAcks;
                    synchronized (deque) {
                        this.pendingAcks.remove(md);
                    }
                    this.client.restartConnect(0).onComplete(rcRt -> this.doSend(md));
                }
            });
        }
        catch (Throwable ex) {
            Deque<MessageDetails> deque = this.pendingAcks;
            synchronized (deque) {
                this.pendingAcks.remove(md);
            }
            this.client.restartConnect(0).onComplete(rcRt -> this.doSend(md));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleConfirmation(RabbitMQConfirmation rawConfirmation) {
        Deque<MessageDetails> deque = this.pendingAcks;
        synchronized (deque) {
            if (rawConfirmation.isMultiple()) {
                Iterator<MessageDetails> iter = this.pendingAcks.iterator();
                while (iter.hasNext()) {
                    MessageDetails md = iter.next();
                    if (md.deliveryTag > rawConfirmation.getDeliveryTag()) break;
                    String messageId = md.properties == null ? null : md.properties.getMessageId();
                    this.confirmationQueue.enqueue(new RabbitMQPublisherConfirmation(messageId, rawConfirmation.getDeliveryTag(), rawConfirmation.isSucceeded()));
                    if (md.confirmHandler != null) {
                        try {
                            md.confirmHandler.handle((AsyncResult)(rawConfirmation.isSucceeded() ? Future.succeededFuture((Object)md.deliveryTag) : Future.failedFuture((String)"Message publish nacked by the broker")));
                        }
                        catch (Throwable ex) {
                            log.warn((Object)"Failed to handle publish confirm", ex);
                        }
                    }
                    iter.remove();
                }
            } else {
                Iterator<MessageDetails> iter = this.pendingAcks.iterator();
                while (iter.hasNext()) {
                    MessageDetails md = iter.next();
                    if (md.deliveryTag != rawConfirmation.getDeliveryTag()) continue;
                    String messageId = md.properties == null ? null : md.properties.getMessageId();
                    this.confirmationQueue.enqueue(new RabbitMQPublisherConfirmation(messageId, rawConfirmation.getDeliveryTag(), rawConfirmation.isSucceeded()));
                    if (md.confirmHandler != null) {
                        try {
                            md.confirmHandler.handle((AsyncResult)(rawConfirmation.isSucceeded() ? Future.succeededFuture((Object)md.deliveryTag) : Future.failedFuture((String)"Message publish nacked by the broker")));
                        }
                        catch (Throwable ex) {
                            log.warn((Object)"Failed to handle publish confirm", ex);
                        }
                    }
                    iter.remove();
                    break;
                }
            }
        }
    }

    @Override
    public Future<Void> publish(String exchange, String routingKey, BasicProperties properties, Buffer body) {
        Promise promise = Promise.promise();
        if (!this.stopped) {
            this.sendQueue.enqueue(new MessageDetails(exchange, routingKey, properties, body, (Promise<Void>)promise, null));
        }
        return promise.future();
    }

    @Override
    public Future<Long> publishConfirm(String exchange, String routingKey, BasicProperties properties, Buffer body) {
        Promise promise = Promise.promise();
        if (!this.stopped) {
            this.sendQueue.enqueue(new MessageDetails(exchange, routingKey, properties, body, null, (Promise<Long>)promise));
        }
        return promise.future();
    }

    public RabbitMQPublisherImpl exceptionHandler(Handler<Throwable> hndlr) {
        return this;
    }

    public RabbitMQPublisherImpl handler(Handler<RabbitMQPublisherConfirmation> hndlr) {
        this.confirmationQueue.handler = hndlr;
        return this;
    }

    public RabbitMQPublisherImpl pause() {
        this.confirmationQueue.pause();
        return this;
    }

    public RabbitMQPublisherImpl resume() {
        this.confirmationQueue.fetch(Long.MAX_VALUE);
        return this;
    }

    public RabbitMQPublisherImpl fetch(long l) {
        this.confirmationQueue.fetch(l);
        return this;
    }

    public RabbitMQPublisherImpl endHandler(Handler<Void> hndlr) {
        return this;
    }

    private static class MessageDetails {
        private final String exchange;
        private final String routingKey;
        private final BasicProperties properties;
        private final Buffer message;
        private final Promise<Void> publishHandler;
        private final Promise<Long> confirmHandler;
        private volatile long deliveryTag;

        MessageDetails(String exchange, String routingKey, BasicProperties properties, Buffer message, Promise<Void> publishHandler, Promise<Long> confirmHandler) {
            this.exchange = exchange;
            this.routingKey = routingKey;
            this.properties = properties;
            this.message = message;
            this.publishHandler = publishHandler;
            this.confirmHandler = confirmHandler;
        }

        public void setDeliveryTag(long deliveryTag) {
            this.deliveryTag = deliveryTag;
        }
    }

    private class SendQueue
    extends InboundMessageQueue<Object> {
        private int size;
        private final AtomicInteger volatileSize;

        public SendQueue(ContextInternal context) {
            super(context.executor(), context.executor());
            this.volatileSize = new AtomicInteger();
        }

        protected void handleMessage(Object msg) {
            if (msg instanceof MessageDetails) {
                this.volatileSize.setRelease(--this.size);
                MessageDetails md = (MessageDetails)msg;
                RabbitMQPublisherImpl.this.handleMessageSend(md);
            } else if (msg instanceof Promise) {
                Promise promise = (Promise)msg;
                promise.complete();
            }
        }

        void enqueue(MessageDetails msg) {
            this.volatileSize.setRelease(++this.size);
            RabbitMQPublisherImpl.this.context.execute((Object)msg, arg_0 -> ((SendQueue)this).write(arg_0));
        }

        int size() {
            return this.volatileSize.get();
        }

        public void checkpoint(Promise<Void> promise) {
            RabbitMQPublisherImpl.this.context.execute(promise, arg_0 -> ((SendQueue)this).write(arg_0));
        }
    }

    private class ConfirmationQueue
    extends InboundMessageQueue<RabbitMQPublisherConfirmation> {
        private Handler<RabbitMQPublisherConfirmation> handler;

        public ConfirmationQueue(ContextInternal context) {
            super(context.executor(), context.executor());
        }

        protected void handleMessage(RabbitMQPublisherConfirmation msg) {
            Handler<RabbitMQPublisherConfirmation> h = this.handler;
            if (h != null) {
                RabbitMQPublisherImpl.this.context.dispatch((Object)msg, h);
            }
        }

        void enqueue(RabbitMQPublisherConfirmation confirmation) {
            RabbitMQPublisherImpl.this.context.execute((Object)confirmation, t -> this.write(confirmation));
        }
    }
}

