/*
 * Decompiled with CFR 0.152.
 */
package adams.flow.transformer.redisaction;

import adams.core.MessageCollection;
import adams.core.QuickInfoHelper;
import adams.core.Utils;
import adams.core.logging.LoggingSupporter;
import adams.core.option.OptionHandler;
import adams.data.redis.RedisDataType;
import adams.flow.standalone.RedisConnection;
import adams.flow.transformer.redisaction.AbstractRedisAction;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;

public class BroadcastAndListen
extends AbstractRedisAction {
    private static final long serialVersionUID = -6976434112891561358L;
    protected String m_ChannelOut;
    protected RedisDataType m_TypeOut;
    protected String m_ChannelIn;
    protected RedisDataType m_TypeIn;
    protected int m_TimeOut;
    protected transient StatefulRedisPubSubConnection m_PubSubConnection;
    protected transient RedisPubSubListener m_PubSubListener;
    protected transient Object m_Data;

    public String globalInfo() {
        return "Broadcasts the incoming data to the specified out channel and listens for data to come through on the in channel.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("channel-out", "channelOut", (Object)"");
        this.m_OptionManager.add("type-out", "typeOut", (Object)RedisDataType.STRING);
        this.m_OptionManager.add("channel-in", "channelIn", (Object)"");
        this.m_OptionManager.add("type-in", "typeIn", (Object)RedisDataType.STRING);
        this.m_OptionManager.add("time-out", "timeOut", (Object)1000, (Number)1, null);
    }

    public void setChannelOut(String value) {
        this.m_ChannelOut = value;
        this.reset();
    }

    public String getChannelOut() {
        return this.m_ChannelOut;
    }

    public String channelOutTipText() {
        return "The channel to send data to.";
    }

    public void setTypeOut(RedisDataType value) {
        this.m_TypeOut = value;
        this.reset();
    }

    public RedisDataType getTypeOut() {
        return this.m_TypeOut;
    }

    public String typeOutTipText() {
        return "The type of the data for the outgoing data.";
    }

    public void setChannelIn(String value) {
        this.m_ChannelIn = value;
        this.reset();
    }

    public String getChannelIn() {
        return this.m_ChannelIn;
    }

    public String channelInTipText() {
        return "The channel to receive data from.";
    }

    public void setTypeIn(RedisDataType value) {
        this.m_TypeIn = value;
        this.reset();
    }

    public RedisDataType getTypeIn() {
        return this.m_TypeIn;
    }

    public String typeInTipText() {
        return "The type of the data for the incoming data.";
    }

    public void setTimeOut(int value) {
        this.m_TimeOut = value;
        this.reset();
    }

    public int getTimeOut() {
        return this.m_TimeOut;
    }

    public String timeOutTipText() {
        return "The timeout in milli-second for waiting on a response.";
    }

    @Override
    public String getQuickInfo() {
        Object result = QuickInfoHelper.toString((OptionHandler)this, (String)"channelOut", (Object)this.m_ChannelOut, (String)"out: ");
        result = (String)result + QuickInfoHelper.toString((OptionHandler)this, (String)"typeOut", (Object)((Object)this.m_TypeOut), (String)"/");
        result = (String)result + QuickInfoHelper.toString((OptionHandler)this, (String)"channelIn", (Object)this.m_ChannelIn, (String)", in: ");
        result = (String)result + QuickInfoHelper.toString((OptionHandler)this, (String)"typeIn", (Object)((Object)this.m_TypeIn), (String)"/");
        result = (String)result + QuickInfoHelper.toString((OptionHandler)this, (String)"timeOut", (Object)this.m_TimeOut, (String)", timeout: ");
        return result;
    }

    @Override
    public Class[] accepts() {
        return new Class[]{this.m_TypeOut.getDataClass()};
    }

    @Override
    public Class generates() {
        return this.m_TypeIn.getDataClass();
    }

    protected RedisPubSubListener<String, String> newStringListener() {
        return new RedisPubSubListener<String, String>(){

            public void message(String channel, String message) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Message on channel '" + channel + "': " + message);
                }
                BroadcastAndListen.this.m_Data = message;
                BroadcastAndListen.this.m_PubSubConnection.removeListener(BroadcastAndListen.this.m_PubSubListener);
                BroadcastAndListen.this.m_PubSubConnection.async().unsubscribe(new Object[]{BroadcastAndListen.this.m_ChannelIn});
                BroadcastAndListen.this.m_PubSubConnection = null;
                BroadcastAndListen.this.m_PubSubListener = null;
            }

            public void message(String pattern, String channel, String message) {
                this.message(channel, message);
            }

            public void subscribed(String channel, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to channel: " + channel);
                }
            }

            public void psubscribed(String pattern, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to pattern: " + pattern);
                }
            }

            public void unsubscribed(String channel, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from channel: " + channel);
                }
            }

            public void punsubscribed(String pattern, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from pattern: " + pattern);
                }
            }
        };
    }

    protected RedisPubSubListener<byte[], byte[]> newBytesListener() {
        return new RedisPubSubListener<byte[], byte[]>(){

            public void message(byte[] channel, byte[] message) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Message on channel '" + new String(channel) + "': " + new String(message));
                }
                BroadcastAndListen.this.m_Data = message;
                BroadcastAndListen.this.m_PubSubConnection.removeListener(BroadcastAndListen.this.m_PubSubListener);
                BroadcastAndListen.this.m_PubSubConnection.async().unsubscribe(new Object[]{BroadcastAndListen.this.m_ChannelIn.getBytes()});
                BroadcastAndListen.this.m_PubSubConnection = null;
                BroadcastAndListen.this.m_PubSubListener = null;
            }

            public void message(byte[] pattern, byte[] channel, byte[] message) {
                this.message(channel, message);
            }

            public void subscribed(byte[] channel, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to channel: " + new String(channel));
                }
            }

            public void psubscribed(byte[] pattern, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to pattern: " + new String(pattern));
                }
            }

            public void unsubscribed(byte[] channel, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from channel: " + new String(channel));
                }
            }

            public void punsubscribed(byte[] pattern, long count) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from pattern: " + new String(pattern));
                }
            }
        };
    }

    @Override
    protected Object doExecute(RedisConnection connection, Object o, MessageCollection errors) {
        this.m_Data = null;
        long start = System.currentTimeMillis();
        switch (this.m_TypeOut) {
            case STRING: {
                this.m_PubSubListener = this.newStringListener();
                this.m_PubSubConnection = connection.getClient().connectPubSub((RedisCodec)StringCodec.UTF8);
                this.m_PubSubConnection.addListener(this.m_PubSubListener);
                this.m_PubSubConnection.async().subscribe(new Object[]{this.m_ChannelIn});
                connection.getConnection(this.m_TypeOut.getCodecClass()).async().publish((Object)this.m_ChannelOut, o);
                break;
            }
            case BYTE_ARRAY: {
                this.m_PubSubListener = this.newBytesListener();
                this.m_PubSubConnection = connection.getClient().connectPubSub((RedisCodec)new ByteArrayCodec());
                this.m_PubSubConnection.addListener(this.m_PubSubListener);
                this.m_PubSubConnection.async().subscribe(new Object[]{this.m_ChannelIn.getBytes()});
                connection.getConnection(this.m_TypeOut.getCodecClass()).async().publish((Object)this.m_ChannelOut.getBytes(), o);
                break;
            }
            default: {
                errors.add("Unhandled redis data type (setting up pub/sub): " + this.m_TypeOut);
                return null;
            }
        }
        while (this.m_Data == null && !this.isStopped() && System.currentTimeMillis() - start < (long)this.m_TimeOut) {
            Utils.wait((LoggingSupporter)this, (int)100, (int)100);
        }
        if (this.m_PubSubConnection != null) {
            if (this.m_PubSubListener != null) {
                this.m_PubSubConnection.removeListener(this.m_PubSubListener);
            }
            switch (this.m_TypeOut) {
                case STRING: {
                    this.m_PubSubConnection.async().unsubscribe(new Object[]{this.m_ChannelIn});
                    break;
                }
                case BYTE_ARRAY: {
                    this.m_PubSubConnection.async().unsubscribe(new Object[]{this.m_ChannelIn.getBytes()});
                    break;
                }
                default: {
                    errors.add("Unhandled redis data type (unsubscribing): " + this.m_TypeOut);
                    return null;
                }
            }
        }
        this.m_PubSubConnection = null;
        this.m_PubSubListener = null;
        return this.m_Data;
    }
}

