/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.impl.QueueConsumerHandler;
import java.io.IOException;

public class RabbitMQConsumerImpl
implements RabbitMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConsumerImpl.class);
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private String queueName;
    private final QueueConsumerHandler consumerHandler;
    private final PendingQueue pending;
    private final int maxQueueSize;
    private volatile boolean cancelled;

    RabbitMQConsumerImpl(Context context, QueueConsumerHandler consumerHandler, QueueOptions options, String queueName) {
        PendingQueue pending = new PendingQueue((ContextInternal)context);
        pending.pause();
        this.consumerHandler = consumerHandler;
        this.maxQueueSize = options.maxInternalQueueSize();
        this.pending = pending;
        this.queueName = queueName;
    }

    @Override
    public String queueName() {
        return this.queueName;
    }

    @Override
    public RabbitMQConsumer setQueueName(String name) {
        this.queueName = name;
        return this;
    }

    @Override
    public RabbitMQConsumer exceptionHandler(Handler<Throwable> exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    @Override
    public RabbitMQConsumer handler(Handler<RabbitMQMessage> handler) {
        this.pending.handler = handler;
        return this;
    }

    @Override
    public RabbitMQConsumer pause() {
        this.pending.pause();
        return this;
    }

    @Override
    public RabbitMQConsumer resume() {
        this.pending.fetch(Long.MAX_VALUE);
        return this;
    }

    @Override
    public RabbitMQConsumer fetch(long amount) {
        this.pending.fetch(amount);
        return this;
    }

    @Override
    public RabbitMQConsumer endHandler(Handler<Void> endHandler) {
        this.endHandler = endHandler;
        return this;
    }

    @Override
    public String consumerTag() {
        return this.consumerHandler.getConsumerTag();
    }

    @Override
    public Future<Void> cancel() {
        Future operationResult;
        try {
            log.debug((Object)("Cancelling " + this.consumerTag()));
            this.cancelled = true;
            this.consumerHandler.getChannel().basicCancel(this.consumerTag());
            operationResult = Future.succeededFuture();
        }
        catch (IOException e) {
            operationResult = Future.failedFuture((Throwable)e);
        }
        this.handleEnd();
        return operationResult;
    }

    @Override
    public boolean isCancelled() {
        return this.cancelled;
    }

    @Override
    public boolean isPaused() {
        return false;
    }

    void handleMessage(RabbitMQMessage message) {
        this.pending.enqueue(message);
    }

    private void handleException(Throwable exception) {
        Handler<Throwable> h = this.exceptionHandler;
        if (h != null) {
            h.handle((Object)exception);
        }
    }

    void handleEnd() {
        Handler<Void> h = this.endHandler;
        if (h != null) {
            h.handle(null);
        }
    }

    class PendingQueue
    extends InboundMessageQueue<RabbitMQMessage> {
        private final ContextInternal context;
        private Handler<RabbitMQMessage> handler;
        private int size;

        public PendingQueue(ContextInternal context) {
            super(context.executor(), context.executor());
            this.context = context;
        }

        protected void handleMessage(RabbitMQMessage msg) {
            --this.size;
            Handler<RabbitMQMessage> h = this.handler;
            if (h != null) {
                this.context.dispatch((Object)msg, m -> {
                    try {
                        h.handle(m);
                    }
                    catch (Exception e) {
                        RabbitMQConsumerImpl.this.handleException(e);
                    }
                });
            }
        }

        void enqueue(RabbitMQMessage message) {
            this.context.execute((Object)message, m -> {
                if (this.size++ < RabbitMQConsumerImpl.this.maxQueueSize) {
                    this.write(m);
                }
            });
        }
    }
}

