package adams.flow.transformer.redisaction;

import adams.core.MessageCollection;
import adams.core.QuickInfoHelper;
import adams.core.Utils;
import adams.data.redis.RedisDataType;
import adams.flow.standalone.RedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;

/* loaded from: input_file:adams/flow/transformer/redisaction/BroadcastAndListen.class */
public class BroadcastAndListen extends AbstractRedisAction {
    private static final long serialVersionUID = -6976434112891561358L;
    protected String m_ChannelOut;
    protected RedisDataType m_TypeOut;
    protected String m_ChannelIn;
    protected RedisDataType m_TypeIn;
    protected int m_TimeOut;
    protected transient StatefulRedisPubSubConnection m_PubSubConnection;
    protected transient RedisPubSubListener m_PubSubListener;
    protected transient Object m_Data;

    public String globalInfo() {
        return "Broadcasts the incoming data to the specified out channel and listens for data to come through on the in channel.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("channel-out", "channelOut", "");
        this.m_OptionManager.add("type-out", "typeOut", RedisDataType.STRING);
        this.m_OptionManager.add("channel-in", "channelIn", "");
        this.m_OptionManager.add("type-in", "typeIn", RedisDataType.STRING);
        this.m_OptionManager.add("time-out", "timeOut", 1000, 1, (Number) null);
    }

    public void setChannelOut(String str) {
        this.m_ChannelOut = str;
        reset();
    }

    public String getChannelOut() {
        return this.m_ChannelOut;
    }

    public String channelOutTipText() {
        return "The channel to send data to.";
    }

    public void setTypeOut(RedisDataType redisDataType) {
        this.m_TypeOut = redisDataType;
        reset();
    }

    public RedisDataType getTypeOut() {
        return this.m_TypeOut;
    }

    public String typeOutTipText() {
        return "The type of the data for the outgoing data.";
    }

    public void setChannelIn(String str) {
        this.m_ChannelIn = str;
        reset();
    }

    public String getChannelIn() {
        return this.m_ChannelIn;
    }

    public String channelInTipText() {
        return "The channel to receive data from.";
    }

    public void setTypeIn(RedisDataType redisDataType) {
        this.m_TypeIn = redisDataType;
        reset();
    }

    public RedisDataType getTypeIn() {
        return this.m_TypeIn;
    }

    public String typeInTipText() {
        return "The type of the data for the incoming data.";
    }

    public void setTimeOut(int i) {
        this.m_TimeOut = i;
        reset();
    }

    public int getTimeOut() {
        return this.m_TimeOut;
    }

    public String timeOutTipText() {
        return "The timeout in milli-second for waiting on a response.";
    }

    @Override // adams.flow.transformer.redisaction.AbstractRedisAction
    public String getQuickInfo() {
        return (((QuickInfoHelper.toString(this, "channelOut", this.m_ChannelOut, "out: ") + QuickInfoHelper.toString(this, "typeOut", this.m_TypeOut, "/")) + QuickInfoHelper.toString(this, "channelIn", this.m_ChannelIn, ", in: ")) + QuickInfoHelper.toString(this, "typeIn", this.m_TypeIn, "/")) + QuickInfoHelper.toString(this, "timeOut", Integer.valueOf(this.m_TimeOut), ", timeout: ");
    }

    @Override // adams.flow.transformer.redisaction.AbstractRedisAction
    public Class[] accepts() {
        return new Class[]{this.m_TypeOut.getDataClass()};
    }

    @Override // adams.flow.transformer.redisaction.AbstractRedisAction
    public Class generates() {
        return this.m_TypeIn.getDataClass();
    }

    protected RedisPubSubListener<String, String> newStringListener() {
        return new RedisPubSubListener<String, String>() { // from class: adams.flow.transformer.redisaction.BroadcastAndListen.1
            public void message(String str, String str2) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Message on channel '" + str + "': " + str2);
                }
                BroadcastAndListen.this.m_Data = str2;
                BroadcastAndListen.this.m_PubSubConnection.removeListener(BroadcastAndListen.this.m_PubSubListener);
                BroadcastAndListen.this.m_PubSubConnection.async().unsubscribe(new Object[]{BroadcastAndListen.this.m_ChannelIn});
                BroadcastAndListen.this.m_PubSubConnection = null;
                BroadcastAndListen.this.m_PubSubListener = null;
            }

            public void message(String str, String str2, String str3) {
                message(str2, str3);
            }

            public void subscribed(String str, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to channel: " + str);
                }
            }

            public void psubscribed(String str, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to pattern: " + str);
                }
            }

            public void unsubscribed(String str, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from channel: " + str);
                }
            }

            public void punsubscribed(String str, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from pattern: " + str);
                }
            }
        };
    }

    protected RedisPubSubListener<byte[], byte[]> newBytesListener() {
        return new RedisPubSubListener<byte[], byte[]>() { // from class: adams.flow.transformer.redisaction.BroadcastAndListen.2
            public void message(byte[] bArr, byte[] bArr2) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Message on channel '" + new String(bArr) + "': " + new String(bArr2));
                }
                BroadcastAndListen.this.m_Data = bArr2;
                BroadcastAndListen.this.m_PubSubConnection.removeListener(BroadcastAndListen.this.m_PubSubListener);
                BroadcastAndListen.this.m_PubSubConnection.async().unsubscribe(new Object[]{BroadcastAndListen.this.m_ChannelIn.getBytes()});
                BroadcastAndListen.this.m_PubSubConnection = null;
                BroadcastAndListen.this.m_PubSubListener = null;
            }

            public void message(byte[] bArr, byte[] bArr2, byte[] bArr3) {
                message(bArr2, bArr3);
            }

            public void subscribed(byte[] bArr, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to channel: " + new String(bArr));
                }
            }

            public void psubscribed(byte[] bArr, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Subscribed to pattern: " + new String(bArr));
                }
            }

            public void unsubscribed(byte[] bArr, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from channel: " + new String(bArr));
                }
            }

            public void punsubscribed(byte[] bArr, long j) {
                if (BroadcastAndListen.this.isLoggingEnabled()) {
                    BroadcastAndListen.this.getLogger().info("Unsubscribed from pattern: " + new String(bArr));
                }
            }
        };
    }

    @Override // adams.flow.transformer.redisaction.AbstractRedisAction
    protected Object doExecute(RedisConnection redisConnection, Object obj, MessageCollection messageCollection) {
        this.m_Data = null;
        long currentTimeMillis = System.currentTimeMillis();
        switch (this.m_TypeOut) {
            case STRING:
                this.m_PubSubListener = newStringListener();
                this.m_PubSubConnection = redisConnection.getClient().connectPubSub(StringCodec.UTF8);
                this.m_PubSubConnection.addListener(this.m_PubSubListener);
                this.m_PubSubConnection.async().subscribe(new Object[]{this.m_ChannelIn});
                redisConnection.getConnection(this.m_TypeOut.getCodecClass()).async().publish(this.m_ChannelOut, obj);
                break;
            case BYTE_ARRAY:
                this.m_PubSubListener = newBytesListener();
                this.m_PubSubConnection = redisConnection.getClient().connectPubSub(new ByteArrayCodec());
                this.m_PubSubConnection.addListener(this.m_PubSubListener);
                this.m_PubSubConnection.async().subscribe(new Object[]{this.m_ChannelIn.getBytes()});
                redisConnection.getConnection(this.m_TypeOut.getCodecClass()).async().publish(this.m_ChannelOut.getBytes(), obj);
                break;
            default:
                messageCollection.add("Unhandled redis data type (setting up pub/sub): " + this.m_TypeOut);
                return null;
        }
        while (this.m_Data == null && !isStopped() && System.currentTimeMillis() - currentTimeMillis < this.m_TimeOut) {
            Utils.wait(this, 100, 100);
        }
        if (this.m_PubSubConnection != null) {
            if (this.m_PubSubListener != null) {
                this.m_PubSubConnection.removeListener(this.m_PubSubListener);
            }
            switch (this.m_TypeOut) {
                case STRING:
                    this.m_PubSubConnection.async().unsubscribe(new Object[]{this.m_ChannelIn});
                    break;
                case BYTE_ARRAY:
                    this.m_PubSubConnection.async().unsubscribe(new Object[]{this.m_ChannelIn.getBytes()});
                    break;
                default:
                    messageCollection.add("Unhandled redis data type (unsubscribing): " + this.m_TypeOut);
                    return null;
            }
        }
        this.m_PubSubConnection = null;
        this.m_PubSubListener = null;
        return this.m_Data;
    }
}
