/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.netty.handler.ssl.JdkSslContext;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.VertxSslContext;
import io.vertx.core.internal.tls.SslContextManager;
import io.vertx.core.internal.tls.SslContextProvider;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.ClientSSLOptions;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.SSLEngineOptions;
import io.vertx.core.net.SSLOptions;
import io.vertx.core.streams.ReadStream;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.impl.ChannelConfirmHandler;
import io.vertx.rabbitmq.impl.QueueConsumerHandler;
import io.vertx.rabbitmq.impl.RabbitMQMessageImpl;
import io.vertx.rabbitmq.impl.Utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class RabbitMQClientImpl
implements RabbitMQClient,
ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQClientImpl.class);
    private static final JsonObject emptyConfig = new JsonObject();
    private final Vertx vertx;
    private final RabbitMQOptions config;
    private final int retries;
    private Connection connection;
    private Channel channel;
    private long channelInstance;
    private boolean channelConfirms = false;
    private boolean hasConnected = false;
    private AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private List<Handler<Promise<Void>>> connectionEstablishedCallbacks = new ArrayList<Handler<Promise<Void>>>();

    public RabbitMQClientImpl(Vertx vertx, RabbitMQOptions config) {
        this.vertx = vertx;
        this.config = config;
        this.retries = config.getReconnectAttempts();
    }

    public long getChannelInstance() {
        return this.channelInstance;
    }

    @Override
    public void addConnectionEstablishedCallback(Handler<Promise<Void>> connectionEstablishedCallback) {
        this.connectionEstablishedCallbacks.add(connectionEstablishedCallback);
    }

    private static Connection newConnection(Vertx vertx, RabbitMQOptions config) throws IOException, TimeoutException {
        ConnectionFactory cf = new ConnectionFactory();
        String uri = config.getUri();
        List<Address> addresses = null;
        if (uri != null) {
            try {
                cf.setUri(uri);
                log.info((Object)("Connecting to " + cf.getHost()));
            }
            catch (Exception e) {
                throw new IllegalArgumentException("Invalid rabbitmq connection uri ", e);
            }
        } else {
            cf.setUsername(config.getUser());
            cf.setPassword(config.getPassword());
            addresses = config.getAddresses().isEmpty() ? Collections.singletonList(new Address(config.getHost(), config.getPort())) : config.getAddresses();
            cf.setVirtualHost(config.getVirtualHost());
            log.info((Object)("Connecting to " + String.valueOf(addresses)));
        }
        cf.setConnectionTimeout(config.getConnectionTimeout());
        cf.setRequestedHeartbeat(config.getRequestedHeartbeat());
        cf.setHandshakeTimeout(config.getHandshakeTimeout());
        cf.setRequestedChannelMax(config.getRequestedChannelMax());
        cf.setNetworkRecoveryInterval(config.getNetworkRecoveryInterval());
        cf.setAutomaticRecoveryEnabled(config.isAutomaticRecoveryEnabled());
        if (config.isSsl()) {
            SslContextProvider provider;
            config.setSslEngineOptions((SSLEngineOptions)new JdkSSLEngineOptions());
            try {
                SslContextManager sslHelper = new SslContextManager(SslContextManager.resolveEngineOptions((SSLEngineOptions)config.getSslEngineOptions(), (boolean)config.isUseAlpn()));
                ClientSSLOptions options = config.getSslOptions().copy();
                provider = (SslContextProvider)sslHelper.resolveSslContextProvider((SSLOptions)options, config.getHostnameVerificationAlgorithm(), null, null, ((VertxInternal)vertx).createEventLoopContext()).toCompletionStage().toCompletableFuture().get(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                throw new VertxException((Throwable)e);
            }
            catch (ExecutionException e) {
                throw new VertxException(e.getCause());
            }
            VertxSslContext ctx = provider.createContext(false, null, null, null, false);
            cf.useSslProtocol(((JdkSslContext)ctx.unwrap()).context());
        }
        if (config.isNioEnabled()) {
            cf.useNio();
        }
        if (config.getSaslConfig() != null) {
            cf.setSaslConfig((SaslConfig)config.getSaslConfig());
        }
        if (config.getCredentialsProvider() != null) {
            cf.setCredentialsProvider(config.getCredentialsProvider());
        }
        if (config.getCredentialsRefreshService() != null) {
            cf.setCredentialsRefreshService(config.getCredentialsRefreshService());
        }
        if (config.getMetricsCollector() != null) {
            cf.setMetricsCollector(config.getMetricsCollector());
        }
        return addresses == null ? cf.newConnection(config.getConnectionName()) : cf.newConnection(addresses, config.getConnectionName());
    }

    @Override
    public boolean isConnected() {
        boolean connected = false;
        if (this.connection != null && this.connection.isOpen()) {
            connected = true;
        }
        return connected;
    }

    @Override
    public boolean isOpenChannel() {
        return this.channel != null && this.channel.isOpen();
    }

    @Override
    public Future<Void> basicAck(long deliveryTag, boolean multiple) {
        return this.forChannel(channel -> {
            channel.basicAck(deliveryTag, multiple);
            return null;
        });
    }

    @Override
    public Future<Void> basicNack(long deliveryTag, boolean multiple, boolean requeue) {
        return this.forChannel(channel -> {
            channel.basicNack(deliveryTag, multiple, requeue);
            return null;
        });
    }

    private void restartConsumer(int attempts, QueueConsumerHandler handler, QueueOptions options) {
        if (handler.queue().isCancelled()) {
            return;
        }
        this.restartConnect(0, (Completable<Void>)((Completable)(res, err) -> this.forChannel(chan -> {
            RabbitMQConsumer q = handler.queue();
            chan.basicConsume(q.queueName(), options.isAutoAck(), options.getConsumerTag(), options.isNoLocal(), options.isConsumerExclusive(), options.getConsumerArguments(), (Consumer)handler);
            log.info((Object)("Reconsume queue: " + q.queueName() + " success"));
            return q.resume();
        }).onComplete(arChan -> {
            if (arChan.failed()) {
                log.error((Object)"Failed to restart consumer: ", arChan.cause());
                long delay = this.config.getReconnectInterval();
                this.vertx.setTimer(delay, id -> this.restartConsumer(attempts + 1, handler, options));
            }
        })));
    }

    @Override
    public Future<Void> restartConnect(int attempts) {
        return Future.future(h -> this.restartConnect(attempts, (Completable<Void>)h));
    }

    private void restartConnect(int attempts, Completable<Void> resultHandler) {
        if (this.retries == 0) {
            log.error((Object)"Retries disabled. Will not attempt to restart");
            resultHandler.fail("Retries disabled. Will not attempt to restart");
            return;
        }
        if (this.isReconnecting.compareAndSet(false, true)) {
            if (this.channel != null && this.channel.isOpen()) {
                log.debug((Object)"Other consumers or producers reconnect successfully. Reuse their channel");
                resultHandler.succeed();
                this.isReconnecting.set(false);
                return;
            }
            log.debug((Object)"Start to reconnect...");
            this.execRestart(attempts, resultHandler);
            return;
        }
        log.debug((Object)"Other consumers or producers are reconnecting. Continue to wait for reconnection");
        this.vertx.setTimer(this.config.getReconnectInterval(), id -> this.restartConnect(attempts, resultHandler));
    }

    private void execRestart(int attempts, Completable<Void> resultHandler) {
        this.stop().onComplete(ar -> {
            if (ar.succeeded()) {
                if (attempts >= this.retries) {
                    log.error((Object)("Max number of consumer restart attempts (" + this.retries + ") reached. Will not attempt to restart again"));
                } else {
                    this.start().onComplete(arStart -> {
                        if (arStart.succeeded()) {
                            if (this.channelConfirms) {
                                this.confirmSelect();
                            }
                            log.info((Object)"Successed to restart client. ");
                            this.isReconnecting.set(false);
                            resultHandler.succeed();
                        } else {
                            log.error((Object)"Failed to restart client: ", arStart.cause());
                            long delay = this.config.getReconnectInterval();
                            this.vertx.setTimer(delay, id -> this.execRestart(attempts + 1, resultHandler));
                        }
                    });
                }
            } else {
                log.error((Object)"Failed to stop client, will attempt to restart: ", ar.cause());
                this.vertx.setTimer(this.config.getReconnectInterval(), id -> this.execRestart(attempts + 1, resultHandler));
            }
        });
    }

    @Override
    public Future<RabbitMQConsumer> basicConsumer(String queue, QueueOptions options) {
        return this.forChannel(channel -> {
            log.debug((Object)"Created new QueueConsumer");
            QueueConsumerHandler handler = new QueueConsumerHandler(this.vertx, channel, options, queue);
            if (this.retries > 0) {
                handler.setShutdownHandler((Handler<ShutdownSignalException>)((Handler)sig -> this.restartConsumer(0, handler, options)));
            }
            try {
                channel.basicConsume(queue, options.isAutoAck(), options.getConsumerTag(), options.isNoLocal(), options.isConsumerExclusive(), options.getConsumerArguments(), (Consumer)handler);
            }
            catch (Throwable ex) {
                log.warn((Object)"Failed to consume: ", ex);
                this.restartConsumer(0, handler, options);
            }
            return handler;
        }).map(res -> {
            RabbitMQConsumer q = res.queue();
            q.resume();
            return q;
        });
    }

    @Override
    public Future<RabbitMQMessage> basicGet(String queue, boolean autoAck) {
        return this.forChannel(channel -> {
            GetResponse response = channel.basicGet(queue, autoAck);
            if (response == null) {
                return null;
            }
            return new RabbitMQMessageImpl(response.getBody(), null, response.getEnvelope(), response.getProps(), response.getMessageCount());
        });
    }

    @Override
    public Future<Void> basicPublish(String exchange, String routingKey, Buffer body) {
        return this.basicPublishWithDeliveryTag(exchange, routingKey, (BasicProperties)new AMQP.BasicProperties(), body, null);
    }

    @Override
    public Future<Void> basicPublish(String exchange, String routingKey, BasicProperties properties, Buffer body) {
        return this.basicPublishWithDeliveryTag(exchange, routingKey, properties, body, null);
    }

    @Override
    public Future<Void> basicPublishWithDeliveryTag(String exchange, String routingKey, BasicProperties properties, Buffer body, @Nullable Handler<Long> deliveryTagHandler) {
        return this.forChannel(channel -> {
            if (deliveryTagHandler != null) {
                long deliveryTag = channel.getNextPublishSeqNo();
                deliveryTagHandler.handle((Object)deliveryTag);
            }
            channel.basicPublish(exchange, routingKey, (AMQP.BasicProperties)properties, body.getBytes());
            return null;
        });
    }

    @Override
    public Future<ReadStream<RabbitMQConfirmation>> addConfirmListener(int maxQueueSize) {
        return this.forChannel(channel -> {
            ChannelConfirmHandler handler = new ChannelConfirmHandler(this.vertx, this, maxQueueSize);
            channel.addConfirmListener((ConfirmListener)handler);
            channel.confirmSelect();
            this.channelConfirms = true;
            return handler.getListener();
        });
    }

    @Override
    public Future<Void> confirmSelect() {
        return this.forChannel(channel -> {
            channel.confirmSelect();
            this.channelConfirms = true;
            return null;
        });
    }

    @Override
    public Future<Void> waitForConfirms() {
        return this.forChannel(channel -> {
            channel.waitForConfirmsOrDie();
            return null;
        });
    }

    @Override
    public Future<Void> waitForConfirms(long timeout) {
        return this.forChannel(channel -> {
            channel.waitForConfirmsOrDie(timeout);
            return null;
        });
    }

    @Override
    public Future<Void> basicQos(int prefetchSize, int prefetchCount, boolean global) {
        return this.forChannel(channel -> {
            channel.basicQos(prefetchSize, prefetchCount, global);
            return null;
        });
    }

    @Override
    public Future<Void> exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete) {
        return this.exchangeDeclare(exchange, type, durable, autoDelete, emptyConfig);
    }

    @Override
    public Future<Void> exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, JsonObject config) {
        return this.forChannel(channel -> {
            channel.exchangeDeclare(exchange, type, durable, autoDelete, new LinkedHashMap(config.getMap()));
            return null;
        });
    }

    @Override
    public Future<Void> exchangeDelete(String exchange) {
        return this.forChannel(channel -> {
            channel.exchangeDelete(exchange);
            return null;
        });
    }

    @Override
    public Future<Void> exchangeBind(String destination, String source, String routingKey) {
        return this.forChannel(channel -> {
            channel.exchangeBind(destination, source, routingKey);
            return null;
        });
    }

    @Override
    public Future<Void> exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) {
        return this.forChannel(channel -> {
            channel.exchangeBind(destination, source, routingKey, arguments);
            return null;
        });
    }

    @Override
    public Future<Void> exchangeUnbind(String destination, String source, String routingKey) {
        return this.forChannel(channel -> {
            channel.exchangeUnbind(destination, source, routingKey);
            return null;
        });
    }

    @Override
    public Future<Void> exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) {
        return this.forChannel(channel -> {
            channel.exchangeUnbind(destination, source, routingKey, arguments);
            return null;
        });
    }

    @Override
    public Future<JsonObject> queueDeclareAuto() {
        return this.forChannel(channel -> {
            AMQP.Queue.DeclareOk result = channel.queueDeclare();
            return Utils.toJson(result);
        });
    }

    @Override
    public Future<AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete) {
        return this.queueDeclare(queue, durable, exclusive, autoDelete, emptyConfig);
    }

    @Override
    public Future<AMQP.Queue.DeclareOk> queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, JsonObject config) {
        return this.forChannel(channel -> channel.queueDeclare(queue, durable, exclusive, autoDelete, new LinkedHashMap(config.getMap())));
    }

    @Override
    public Future<AMQP.Queue.DeleteOk> queueDelete(String queue) {
        return this.forChannel(channel -> channel.queueDelete(queue));
    }

    @Override
    public Future<AMQP.Queue.DeleteOk> queueDeleteIf(String queue, boolean ifUnused, boolean ifEmpty) {
        return this.forChannel(channel -> channel.queueDelete(queue, ifUnused, ifEmpty));
    }

    @Override
    public Future<Void> queueBind(String queue, String exchange, String routingKey) {
        return this.forChannel(channel -> {
            channel.queueBind(queue, exchange, routingKey);
            return null;
        });
    }

    @Override
    public Future<Void> queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) {
        return this.forChannel(channel -> {
            channel.queueBind(queue, exchange, routingKey, arguments);
            return null;
        });
    }

    @Override
    public Future<Void> queueUnbind(String queue, String exchange, String routingKey) {
        return this.forChannel(channel -> {
            channel.queueUnbind(queue, exchange, routingKey);
            return null;
        });
    }

    @Override
    public Future<Void> queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) {
        return this.forChannel(channel -> {
            channel.queueUnbind(queue, exchange, routingKey, arguments);
            return null;
        });
    }

    @Override
    public Future<Long> messageCount(String queue) {
        return this.forChannel(channel -> channel.messageCount(queue));
    }

    @Override
    public Future<Void> start() {
        log.info((Object)"Starting rabbitmq client");
        return this.start((ContextInternal)this.vertx.getOrCreateContext(), 0);
    }

    private Future<Void> start(ContextInternal ctx, int attempts) {
        Promise promise = Promise.promise();
        this.tryConnect(ctx, attempts, (Promise<Void>)promise);
        return promise.future();
    }

    public void tryConnect(ContextInternal ctx, int attempts, Promise<Void> promise) {
        ctx.executeBlocking(() -> {
            try {
                this.connect();
                return null;
            }
            catch (IOException | TimeoutException e) {
                log.error((Object)"Could not connect to rabbitmq", (Throwable)e);
                throw e;
            }
        }).onSuccess(h -> promise.complete()).onFailure(err -> {
            if (this.retries == 0 || !this.hasConnected && !this.config.isAutomaticRecoveryOnInitialConnection()) {
                log.error((Object)"Retries disabled. Will not attempt to restart");
                promise.fail(err);
            } else if (attempts >= this.retries) {
                log.info((Object)("Max number of connect attempts (" + this.retries + ") reached. Will not attempt to connect again"));
                promise.fail(err);
            } else {
                long delay = this.config.getReconnectInterval();
                log.info((Object)"Attempting to reconnect to rabbitmq...");
                this.vertx.setTimer(delay, id -> {
                    log.debug((Object)("Reconnect attempt # " + attempts));
                    this.tryConnect(ctx, attempts + 1, promise);
                });
            }
        });
    }

    @Override
    public Future<Void> stop() {
        log.info((Object)"Stopping rabbitmq client");
        return this.vertx.executeBlocking(() -> {
            this.disconnect();
            return null;
        });
    }

    private <T> Future<T> forChannel(ChannelHandler<T> channelHandler) {
        ContextInternal ctx = (ContextInternal)this.vertx.getOrCreateContext();
        if (this.connection == null || this.channel == null) {
            return ctx.failedFuture("Not connected");
        }
        if (!this.channel.isOpen()) {
            try {
                log.debug((Object)"channel is close, try create Channel");
                ++this.channelInstance;
                this.channel = this.connection.createChannel();
                if (this.channelConfirms) {
                    this.channel.confirmSelect();
                }
            }
            catch (IOException e) {
                log.debug((Object)"create channel error");
                return ctx.failedFuture((Throwable)e);
            }
        }
        return this.vertx.executeBlocking(() -> channelHandler.handle(this.channel));
    }

    private Future<Void> connect() throws IOException, TimeoutException {
        log.debug((Object)"Connecting to rabbitmq...");
        this.connection = RabbitMQClientImpl.newConnection(this.vertx, this.config);
        this.connection.addShutdownListener((ShutdownListener)this);
        ++this.channelInstance;
        this.channel = this.connection.createChannel();
        Promise promise = Promise.promise();
        if (this.connectionEstablishedCallbacks.isEmpty()) {
            promise.complete();
        } else {
            Iterator<Handler<Promise<Void>>> iter = this.connectionEstablishedCallbacks.iterator();
            this.connectCallbackHandler((AsyncResult<Void>)Future.succeededFuture(), iter, (Promise<Void>)promise);
        }
        log.debug((Object)"Connected to rabbitmq !");
        this.hasConnected = true;
        return promise.future();
    }

    private void connectCallbackHandler(AsyncResult<Void> prevResult, Iterator<Handler<Promise<Void>>> iter, Promise<Void> connectPromise) {
        try {
            if (prevResult.failed()) {
                connectPromise.fail(prevResult.cause());
            } else if (iter.hasNext()) {
                Handler<Promise<Void>> next = iter.next();
                Promise callbackPromise = Promise.promise();
                next.handle((Object)callbackPromise);
                callbackPromise.future().onComplete(result -> this.connectCallbackHandler((AsyncResult<Void>)result, iter, connectPromise));
            } else {
                connectPromise.complete();
            }
        }
        catch (Throwable ex) {
            log.error((Object)"Exception whilst running connection stablished callback: ", ex);
            connectPromise.fail(ex);
        }
    }

    private void disconnect() throws IOException {
        try {
            log.debug((Object)"Disconnecting from rabbitmq...");
            if (this.connection != null) {
                this.connection.close();
            }
            log.debug((Object)"Disconnected from rabbitmq !");
        }
        catch (AlreadyClosedException ex) {
            log.debug((Object)"Already disconnected from rabbitmq !");
        }
        finally {
            this.connection = null;
            this.channel = null;
        }
    }

    public void shutdownCompleted(ShutdownSignalException cause) {
        if (cause.isInitiatedByApplication()) {
            return;
        }
        log.info((Object)"RabbitMQ connection shutdown! The client will attempt to reconnect automatically", (Throwable)cause);
        this.restartConnect(0, (Completable<Void>)((Completable)(err, res) -> log.info((Object)"reconnect success")));
    }

    private static interface ChannelHandler<T> {
        public T handle(Channel var1) throws Exception;
    }
}

