/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.streams.ReadStream;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.impl.RabbitMQClientImpl;

public class RabbitMQConfirmListenerImpl
implements ReadStream<RabbitMQConfirmation> {
    private final RabbitMQClientImpl client;
    private final ConfirmationQueue pending;
    private final int maxQueueSize;
    private Handler<Throwable> exceptionHandler;

    public RabbitMQConfirmListenerImpl(RabbitMQClientImpl client, Context context, int maxQueueSize) {
        this.client = client;
        this.maxQueueSize = maxQueueSize;
        this.pending = new ConfirmationQueue((ContextInternal)context);
    }

    void handleAck(long deliveryTag, boolean multiple, boolean succeeded) {
        this.pending.handleAck(deliveryTag, multiple, succeeded);
    }

    public RabbitMQConfirmListenerImpl exceptionHandler(Handler<Throwable> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    public RabbitMQConfirmListenerImpl handler(Handler<RabbitMQConfirmation> handler) {
        this.pending.handler = handler;
        return this;
    }

    private void handleException(Throwable exception) {
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle((Object)exception);
        }
    }

    public RabbitMQConfirmListenerImpl pause() {
        this.pending.pause();
        return this;
    }

    public RabbitMQConfirmListenerImpl resume() {
        this.pending.fetch(Long.MAX_VALUE);
        return this;
    }

    public RabbitMQConfirmListenerImpl fetch(long amount) {
        this.pending.fetch(amount);
        return this;
    }

    public RabbitMQConfirmListenerImpl endHandler(Handler<Void> hndlr) {
        return this;
    }

    class ConfirmationQueue
    extends InboundMessageQueue<RabbitMQConfirmation> {
        private final ContextInternal context;
        private Handler<RabbitMQConfirmation> handler;
        private int size;

        public ConfirmationQueue(ContextInternal context) {
            super(context.executor(), context.executor());
            this.context = context;
        }

        protected void handleMessage(RabbitMQConfirmation msg) {
            --this.size;
            this.context.dispatch((Object)msg, m -> {
                Handler<RabbitMQConfirmation> h = this.handler;
                if (h != null) {
                    try {
                        h.handle(m);
                    }
                    catch (Exception e) {
                        RabbitMQConfirmListenerImpl.this.handleException(e);
                    }
                }
            });
        }

        void handleAck(long deliveryTag, boolean multiple, boolean succeeded) {
            this.context.execute((Object)new RabbitMQConfirmation(RabbitMQConfirmListenerImpl.this.client.getChannelInstance(), deliveryTag, multiple, succeeded), m -> {
                if (this.size++ < RabbitMQConfirmListenerImpl.this.maxQueueSize) {
                    this.write(m);
                }
            });
        }
    }
}

