/*
 * Decompiled with CFR 0.152.
 */
package adams.flow.standalone.rats.input;

import adams.core.QuickInfoHelper;
import adams.core.option.OptionHandler;
import adams.core.option.OptionUtils;
import adams.flow.core.Actor;
import adams.flow.source.twitterlistener.AbstractListener;
import adams.flow.source.twitterlistener.SampleListener;
import adams.flow.standalone.rats.input.AbstractBufferedRatInput;
import twitter4j.Status;

public class TwitterListener
extends AbstractBufferedRatInput {
    private static final long serialVersionUID = 7627032152241150448L;
    protected AbstractListener m_Listener;
    protected AbstractListener m_ActualListener;
    protected Thread m_TransferThread;

    public String globalInfo() {
        return "Listens to the twitter stream API using the specified listener.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("listener", "listener", (Object)new SampleListener());
    }

    public void setListener(AbstractListener value) {
        this.m_Listener = value;
        this.reset();
    }

    public AbstractListener getListener() {
        return this.m_Listener;
    }

    public String listenerTipText() {
        return "The listener to use for generating the status objects.";
    }

    public String getQuickInfo() {
        return QuickInfoHelper.toString((OptionHandler)this, (String)"listener", (Object)this.m_Listener, (String)"listener: ");
    }

    public Class generates() {
        return Status.class;
    }

    public String initReception() {
        String result = super.initReception();
        if (result == null) {
            if (this.m_ActualListener != null) {
                this.m_ActualListener.stopExecution();
            }
            this.m_ActualListener = (AbstractListener)OptionUtils.shallowCopy((Object)this.m_Listener);
            this.m_ActualListener.setFlowContext((Actor)this.getOwner());
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected String doReceive() {
        String result = null;
        if (!this.m_ActualListener.isListening()) {
            try {
                this.m_ActualListener.startExecution();
            }
            catch (IllegalThreadStateException illegalThreadStateException) {
            }
            catch (Exception e) {
                result = this.handleException("Failed to start listener thread!", e);
            }
            int count = 0;
            while (!this.m_ActualListener.isListening()) {
                ++count;
                try {
                    TwitterListener twitterListener = this;
                    synchronized (twitterListener) {
                        ((Object)((Object)this)).wait(50L);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
                if (count != 100) continue;
                result = "Thread timed out??";
                break;
            }
            if (result == null) {
                this.m_TransferThread = new Thread(() -> {
                    while (this.m_ActualListener.isListening()) {
                        Status status = this.m_ActualListener.next();
                        if (status == null) continue;
                        this.bufferData(status);
                        if (!this.isLoggingEnabled()) continue;
                        this.getLogger().info(status.toString());
                    }
                });
                this.m_TransferThread.start();
            }
        }
        return result;
    }

    protected void stopListening() {
        if (this.m_ActualListener != null) {
            this.m_ActualListener.stopExecution();
        }
        if (this.m_TransferThread != null) {
            this.m_TransferThread.stop();
            this.m_TransferThread = null;
        }
    }

    public void stopExecution() {
        this.stopListening();
        super.stopExecution();
    }

    public void cleanUp() {
        this.stopListening();
        super.cleanUp();
    }
}

