/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.redis.client.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Completable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisClusterConnectOptions;
import io.vertx.redis.client.RedisClusterTransactions;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisReplicas;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.CommandImpl;
import io.vertx.redis.client.impl.PooledRedisConnection;
import io.vertx.redis.client.impl.RedisConnectionManager;
import io.vertx.redis.client.impl.RedisURI;
import io.vertx.redis.client.impl.RequestImpl;
import io.vertx.redis.client.impl.SharedSlots;
import io.vertx.redis.client.impl.Slots;
import io.vertx.redis.client.impl.ZModem;
import io.vertx.redis.client.impl.types.ErrorType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;

public class RedisClusterConnection
implements RedisConnection {
    private static final Logger LOG = LoggerFactory.getLogger(RedisClusterConnection.class);
    private static final Random RANDOM = new Random();
    static final int RETRIES = 16;
    private static final Map<Command, Function<List<Response>, Response>> REDUCERS = new HashMap<Command, Function<List<Response>, Response>>();
    private static final List<Command> MASTER_ONLY_COMMANDS = new ArrayList<Command>();
    final VertxInternal vertx;
    private final RedisConnectionManager connectionManager;
    private final RedisClusterConnectOptions connectOptions;
    final SharedSlots sharedSlots;
    private final Map<String, PooledRedisConnection> connections;
    private boolean deferredMulti = false;
    private String boundToEndpoint = null;

    public static void addReducer(Command command, Function<List<Response>, Response> fn) {
        REDUCERS.put(command, fn);
    }

    public static void addMasterOnlyCommand(Command command) {
        MASTER_ONLY_COMMANDS.add(command);
    }

    RedisClusterConnection(Vertx vertx, RedisConnectionManager connectionManager, RedisClusterConnectOptions connectOptions, SharedSlots sharedSlots, Map<String, PooledRedisConnection> connections) {
        this.vertx = (VertxInternal)vertx;
        this.connectionManager = connectionManager;
        this.connectOptions = connectOptions;
        this.sharedSlots = sharedSlots;
        this.connections = connections;
    }

    @Override
    public RedisConnection exceptionHandler(Handler<Throwable> handler) {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            redisConnection.exceptionHandler((Handler)handler);
        }
        return this;
    }

    @Override
    public RedisConnection handler(Handler<Response> handler) {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            redisConnection.handler((Handler)handler);
        }
        return this;
    }

    @Override
    public RedisConnection pause() {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            redisConnection.pause();
        }
        return this;
    }

    @Override
    public RedisConnection resume() {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            redisConnection.resume();
        }
        return this;
    }

    @Override
    public RedisConnection fetch(long amount) {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            redisConnection.fetch(amount);
        }
        return this;
    }

    @Override
    public RedisConnection endHandler(@Nullable Handler<Void> handler) {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            redisConnection.endHandler((Handler)handler);
        }
        return this;
    }

    @Override
    public Future<Response> send(Request request) {
        Future future = this.sharedSlots.get().compose(slots -> this.send(request, (Slots)slots));
        if (this.connectOptions.getClusterTransactions() == RedisClusterTransactions.SINGLE_NODE) {
            return future.andThen(ignored -> {
                String cmdName = request.command().toString();
                if ("exec".equals(cmdName) || "discard".equals(cmdName)) {
                    this.deferredMulti = false;
                    this.boundToEndpoint = null;
                }
            });
        }
        return future;
    }

    private Future<Response> send(Request request, Slots slots) {
        String[] endpoints;
        PromiseInternal promise = this.vertx.promise();
        RequestImpl req = (RequestImpl)request;
        CommandImpl cmd = (CommandImpl)req.command();
        List<byte[]> args = req.getArgs();
        List<byte[]> keys = req.keys();
        if (cmd.isTransactional()) {
            RedisClusterTransactions txMode = this.connectOptions.getClusterTransactions();
            if (txMode == RedisClusterTransactions.DISABLED) {
                promise.fail("Transactions in Redis cluster disabled");
                return promise.future();
            }
            if (txMode == RedisClusterTransactions.SINGLE_NODE && this.boundToEndpoint == null) {
                String cmdName = cmd.toString();
                if ("multi".equals(cmdName)) {
                    this.deferredMulti = true;
                    return Future.succeededFuture((Object)SimpleStringType.OK);
                }
                if ("watch".equals(cmdName)) {
                    int hashSlot = ZModem.generateMultiRaw(keys);
                    endpoints = slots.endpointsForKey(hashSlot);
                    this.boundToEndpoint = endpoints[0];
                }
            }
        }
        if (cmd.needsGetKeys()) {
            this.send(this.selectEndpoint(slots, -1, cmd.isReadOnly(args), true), 16, req, (Completable<Response>)promise);
            return promise.future();
        }
        boolean forceMasterEndpoint = MASTER_ONLY_COMMANDS.contains(cmd) || this.deferredMulti;
        switch (keys.size()) {
            case 0: {
                if (REDUCERS.containsKey(cmd)) {
                    ArrayList<Future> responses = new ArrayList<Future>(slots.size());
                    for (int i = 0; i < slots.size(); ++i) {
                        endpoints = slots.endpointsForSlot(i);
                        PromiseInternal p = this.vertx.promise();
                        this.send(this.selectMasterOrReplicaEndpoint(cmd.isReadOnly(args), endpoints, forceMasterEndpoint), 16, req, (Completable<Response>)p);
                        responses.add(p.future());
                    }
                    Future.all(responses).onComplete(arg_0 -> RedisClusterConnection.lambda$send$2((Promise)promise, cmd, arg_0));
                } else {
                    this.send(this.selectEndpoint(slots, -1, cmd.isReadOnly(args), forceMasterEndpoint), 16, req, (Completable<Response>)promise);
                }
                return promise.future();
            }
            case 1: {
                this.send(this.selectEndpoint(slots, ZModem.generate(keys.get(0)), cmd.isReadOnly(args), forceMasterEndpoint), 16, req, (Completable<Response>)promise);
                return promise.future();
            }
        }
        int hashSlot = ZModem.generateMultiRaw(keys);
        if (hashSlot == -1) {
            if (!REDUCERS.containsKey(cmd)) {
                promise.fail(this.buildCrossslotFailureMsg(req));
                return promise.future();
            }
            Map<Integer, Request> requests = this.splitRequest(cmd, args);
            if (requests.isEmpty()) {
                promise.fail(this.buildCrossslotFailureMsg(req));
                return promise.future();
            }
            ArrayList<Future> responses = new ArrayList<Future>(requests.size());
            for (Map.Entry<Integer, Request> kv : requests.entrySet()) {
                PromiseInternal p = this.vertx.promise();
                this.send(this.selectEndpoint(slots, kv.getKey(), cmd.isReadOnly(args), forceMasterEndpoint), 16, kv.getValue(), (Completable<Response>)p);
                responses.add(p.future());
            }
            Future.all(responses).onComplete(arg_0 -> RedisClusterConnection.lambda$send$3((Promise)promise, cmd, arg_0));
            return promise.future();
        }
        String[] endpoints2 = slots.endpointsForKey(hashSlot);
        this.send(this.selectMasterOrReplicaEndpoint(cmd.isReadOnly(args), endpoints2, forceMasterEndpoint), 16, req, (Completable<Response>)promise);
        return promise.future();
    }

    private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
        IdentityHashMap<Integer, Request> map = new IdentityHashMap<Integer, Request>();
        int lastKey = cmd.iterateKeys(args, (begin, keyIdx, keyStep) -> {
            int j;
            int slot = ZModem.generate((byte[])args.get(keyIdx));
            Request request = (Request)map.get(slot);
            if (request == null) {
                request = Request.cmd(cmd);
                for (j = 0; j < begin; ++j) {
                    request.arg((byte[])args.get(j));
                }
                map.put(slot, request);
            }
            request.arg((byte[])args.get(keyIdx));
            for (j = keyIdx + 1; j < keyIdx + keyStep; ++j) {
                request.arg((byte[])args.get(j));
            }
        });
        Collection col = map.values();
        col.forEach(req -> {
            for (int j = lastKey; j < args.size(); ++j) {
                req.arg((byte[])args.get(j));
            }
        });
        return map;
    }

    void send(String selectedEndpoint, int retries, Request command, Completable<Response> handler) {
        String endpoint;
        String string = endpoint = this.boundToEndpoint != null ? this.boundToEndpoint : selectedEndpoint;
        if (this.deferredMulti) {
            this.deferredMulti = false;
            this.boundToEndpoint = endpoint;
            this.send(endpoint, retries, Request.cmd(Command.MULTI), (Completable<Response>)((Completable)(result, failure) -> {
                if (failure == null) {
                    this.send(endpoint, retries, command, handler);
                } else {
                    handler.fail(failure);
                }
            }));
            return;
        }
        PooledRedisConnection connection = this.connections.get(endpoint);
        if (connection == null) {
            this.connectionManager.getConnection(endpoint, RedisReplicas.NEVER != this.connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null).onSuccess(conn -> {
                Map<String, PooledRedisConnection> map = this.connections;
                synchronized (map) {
                    if (this.connections.containsKey(endpoint)) {
                        conn.close().onFailure(t -> LOG.warn((Object)("Failed closing connection: " + t)));
                    } else {
                        this.connections.put(endpoint, (PooledRedisConnection)conn);
                    }
                }
                this.send(endpoint, retries, command, handler);
            }).onFailure(t -> {
                if (retries > 0) {
                    this.send(endpoint, retries - 1, command, handler);
                } else {
                    handler.fail("Failed obtaining connection to: " + endpoint);
                }
            });
            return;
        }
        connection.send(command).onComplete(send -> {
            if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) {
                ErrorType cause = (ErrorType)send.cause();
                boolean ask = cause.is("ASK");
                boolean moved = cause.is("MOVED");
                if (ask || moved) {
                    Object addr;
                    if (moved) {
                        this.sharedSlots.invalidate();
                    }
                    if ((addr = cause.slice(' ', 2)) == null) {
                        handler.fail("Cannot find endpoint:port in redirection: " + cause);
                        return;
                    }
                    RedisURI uri = new RedisURI(endpoint);
                    if (((String)addr).startsWith(":")) {
                        addr = uri.socketAddress().host() + (String)addr;
                    }
                    String newEndpoint = uri.protocol() + "://" + uri.userinfo() + (String)addr;
                    if (this.boundToEndpoint != null && !this.boundToEndpoint.equalsIgnoreCase(newEndpoint)) {
                        handler.fail("Redirect inside a transaction: " + cause);
                        return;
                    }
                    if (ask) {
                        this.send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), (Completable<Response>)((Completable)(resp, err) -> {
                            if (err != null) {
                                handler.fail("Failed ASKING: " + err + ", caused by " + cause);
                            } else {
                                this.send(newEndpoint, retries - 1, command, handler);
                            }
                        }));
                    } else {
                        this.send(newEndpoint, retries - 1, command, handler);
                    }
                    return;
                }
                if (cause.is("TRYAGAIN") || cause.is("CLUSTERDOWN")) {
                    long backoff = (long)(Math.pow(2.0, 16 - Math.max(retries, 9)) * 10.0);
                    this.vertx.setTimer(backoff, t -> this.send(endpoint, retries - 1, command, handler));
                    return;
                }
                if (cause.is("NOAUTH") && this.connectOptions.getPassword() != null) {
                    connection.send(Request.cmd(Command.AUTH).arg(this.connectOptions.getPassword())).onFailure(arg_0 -> ((Completable)handler).fail(arg_0)).onSuccess(auth -> this.send(endpoint, retries - 1, command, handler));
                    return;
                }
            }
            try {
                handler.complete((Object)((Response)send.result()), send.cause());
            }
            catch (RuntimeException e) {
                LOG.error((Object)"Handler failure", (Throwable)e);
            }
        });
    }

    @Override
    public Future<List<Response>> batch(List<Request> requests) {
        return this.sharedSlots.get().compose(slots -> this.batch(requests, (Slots)slots));
    }

    private Future<List<Response>> batch(List<Request> requests, Slots slots) {
        PromiseInternal promise = this.vertx.promise();
        if (requests.isEmpty()) {
            LOG.debug((Object)"Empty batch");
            promise.succeed(Collections.emptyList());
        } else {
            int correctSlot = -1;
            String currentEndpoint = null;
            boolean readOnly = false;
            boolean forceMasterEndpoint = false;
            block4: for (Request request : requests) {
                String endpoint;
                int slot;
                RequestImpl req = (RequestImpl)request;
                CommandImpl cmd = (CommandImpl)req.command();
                List<byte[]> args = req.getArgs();
                readOnly |= cmd.isReadOnly(args);
                if (cmd.needsGetKeys()) {
                    forceMasterEndpoint = true;
                    continue;
                }
                List<byte[]> keys = req.keys();
                forceMasterEndpoint |= MASTER_ONLY_COMMANDS.contains(cmd) || cmd.isTransactional();
                switch (keys.size()) {
                    case 0: {
                        continue block4;
                    }
                    case 1: {
                        slot = ZModem.generate(keys.get(0));
                        endpoint = slots.endpointsForKey(slot)[0];
                        if (currentEndpoint == null) {
                            currentEndpoint = endpoint;
                            correctSlot = slot;
                            continue block4;
                        }
                        if (currentEndpoint.equals(endpoint)) continue block4;
                        promise.fail(this.buildCrossslotFailureMsg(req));
                        return promise.future();
                    }
                }
                Iterator<byte[]> iterator = keys.iterator();
                if (!iterator.hasNext()) continue;
                byte[] key = iterator.next();
                slot = ZModem.generate(key);
                endpoint = slots.endpointsForKey(slot)[0];
                if (currentEndpoint == null) {
                    correctSlot = slot;
                    currentEndpoint = endpoint;
                    continue;
                }
                if (currentEndpoint.equals(endpoint)) continue;
                promise.fail(this.buildCrossslotFailureMsg(req));
                return promise.future();
            }
            this.batch(this.selectEndpoint(slots, correctSlot, readOnly, forceMasterEndpoint), 16, requests, (Completable<List<Response>>)promise);
        }
        return promise.future();
    }

    private void batch(String endpoint, int retries, List<Request> commands, Completable<List<Response>> handler) {
        RedisConnection connection = this.connections.get(endpoint);
        if (connection == null) {
            this.connectionManager.getConnection(endpoint, RedisReplicas.NEVER != this.connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null).onSuccess(conn -> {
                Map<String, PooledRedisConnection> map = this.connections;
                synchronized (map) {
                    if (this.connections.containsKey(endpoint)) {
                        conn.close().onFailure(t -> LOG.warn((Object)("Failed closing connection: " + t)));
                    } else {
                        this.connections.put(endpoint, (PooledRedisConnection)conn);
                    }
                }
                this.batch(endpoint, retries, commands, handler);
            }).onFailure(t -> {
                if (retries > 0) {
                    this.batch(endpoint, retries - 1, commands, handler);
                } else {
                    handler.fail("Failed obtaining connection to: " + endpoint);
                }
            });
            return;
        }
        connection.batch(commands).onComplete(send -> {
            if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) {
                ErrorType cause = (ErrorType)send.cause();
                boolean ask = cause.is("ASK");
                boolean moved = cause.is("MOVED");
                if (ask || moved) {
                    Object addr;
                    if (moved) {
                        this.sharedSlots.invalidate();
                    }
                    if ((addr = cause.slice(' ', 2)) == null) {
                        handler.fail("Cannot find endpoint:port in redirection: " + cause);
                        return;
                    }
                    RedisURI uri = new RedisURI(endpoint);
                    if (((String)addr).startsWith(":")) {
                        addr = uri.socketAddress().host() + (String)addr;
                    }
                    String newEndpoint = uri.protocol() + "://" + uri.userinfo() + (String)addr;
                    if (ask) {
                        this.batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), (Completable<List<Response>>)((Completable)(resp, err) -> {
                            if (err != null) {
                                handler.fail("Failed ASKING: " + err + ", caused by " + cause);
                            } else {
                                this.batch(newEndpoint, retries - 1, commands, handler);
                            }
                        }));
                    } else {
                        this.batch(newEndpoint, retries - 1, commands, handler);
                    }
                    return;
                }
                if (cause.is("TRYAGAIN") || cause.is("CLUSTERDOWN")) {
                    long backoff = (long)(Math.pow(2.0, 16 - Math.max(retries, 9)) * 10.0);
                    this.vertx.setTimer(backoff, t -> this.batch(endpoint, retries - 1, commands, handler));
                    return;
                }
                if (cause.is("NOAUTH") && this.connectOptions.getPassword() != null) {
                    connection.send(Request.cmd(Command.AUTH).arg(this.connectOptions.getPassword())).onFailure(arg_0 -> ((Completable)handler).fail(arg_0)).onSuccess(auth -> this.batch(endpoint, retries - 1, commands, handler));
                    return;
                }
            }
            try {
                handler.complete((Object)((List)send.result()), send.cause());
            }
            catch (RuntimeException e) {
                LOG.error((Object)"Handler failure", (Throwable)e);
            }
        });
    }

    @Override
    public Future<Void> close() {
        this.deferredMulti = false;
        this.boundToEndpoint = null;
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null) continue;
            futures.add(redisConnection.close());
        }
        return Future.all(futures).mapEmpty();
    }

    @Override
    public boolean pendingQueueFull() {
        for (RedisConnection redisConnection : this.connections.values()) {
            if (redisConnection == null || !redisConnection.pendingQueueFull()) continue;
            return true;
        }
        return false;
    }

    private String selectEndpoint(Slots slots, int keySlot, boolean readOnly, boolean forceMasterEndpoint) {
        if (keySlot == -1) {
            return slots.randomEndPoint(forceMasterEndpoint);
        }
        String[] endpoints = slots.endpointsForKey(keySlot);
        if (endpoints == null || endpoints.length == 0) {
            RedisURI uri = new RedisURI(this.connectOptions.getEndpoint());
            return uri.protocol() + "://" + uri.userinfo() + uri.socketAddress();
        }
        return this.selectMasterOrReplicaEndpoint(readOnly, endpoints, forceMasterEndpoint);
    }

    private String selectMasterOrReplicaEndpoint(boolean readOnly, String[] endpoints, boolean forceMasterEndpoint) {
        if (forceMasterEndpoint) {
            return endpoints[0];
        }
        RedisReplicas useReplicas = this.connectOptions.getUseReplicas();
        if (readOnly && useReplicas != RedisReplicas.NEVER && endpoints.length > 1) {
            switch (useReplicas) {
                case ALWAYS: {
                    return endpoints[1 + RANDOM.nextInt(endpoints.length - 1)];
                }
                case SHARE: {
                    return endpoints[RANDOM.nextInt(endpoints.length)];
                }
            }
        }
        return endpoints[0];
    }

    String buildCrossslotFailureMsg(RequestImpl req) {
        return "Keys of command or batch: \"" + req.toString() + "\" targets not all in the same hash slot (CROSSSLOT) and client side resharding is not supported";
    }

    private static /* synthetic */ void lambda$send$3(Promise promise, CommandImpl cmd, AsyncResult composite) {
        if (composite.failed()) {
            promise.fail(composite.cause());
        } else {
            promise.succeed((Object)REDUCERS.get(cmd).apply(((CompositeFuture)composite.result()).list()));
        }
    }

    private static /* synthetic */ void lambda$send$2(Promise promise, CommandImpl cmd, AsyncResult composite) {
        if (composite.failed()) {
            promise.fail(composite.cause());
        } else {
            promise.succeed((Object)REDUCERS.get(cmd).apply(((CompositeFuture)composite.result()).list()));
        }
    }
}

