/*
 * Decompiled with CFR 0.152.
 */
package adams.flow.source;

import adams.core.License;
import adams.core.QuickInfoHelper;
import adams.core.annotation.MixedCopyright;
import adams.core.option.OptionHandler;
import adams.core.option.OptionUtils;
import adams.flow.core.Actor;
import adams.flow.core.Token;
import adams.flow.source.AbstractSource;
import adams.flow.source.twitterlistener.AbstractListener;
import adams.flow.source.twitterlistener.SampleListener;
import twitter4j.Status;

@MixedCopyright(author="Yusuke Yamamoto", copyright="2007-2010 Yusuke Yamamoto", license=License.APACHE2, url="http://twitter4j.org/en/code-examples.html")
public class TwitterListener
extends AbstractSource {
    private static final long serialVersionUID = -7777610085728160967L;
    protected AbstractListener m_Listener;
    protected AbstractListener m_ActualListener;

    public String globalInfo() {
        return "Uses the Twitter streaming API (and the specified listener) to retrieve tweets.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("listener", "listener", (Object)new SampleListener());
    }

    public String getQuickInfo() {
        return QuickInfoHelper.toString((OptionHandler)this, (String)"listener", (Object)((Object)this.m_Listener), (String)"listener: ");
    }

    public Class[] generates() {
        return new Class[]{Status.class};
    }

    public void setListener(AbstractListener value) {
        this.m_Listener = value;
        this.reset();
    }

    public AbstractListener getListener() {
        return this.m_Listener;
    }

    public String listenerTipText() {
        return "The listener to use for generating the status objects.";
    }

    public String setUp() {
        String result = super.setUp();
        if (result == null) {
            if (this.m_ActualListener != null) {
                this.m_ActualListener.stopExecution();
            }
            this.m_ActualListener = (AbstractListener)((Object)OptionUtils.shallowCopy((Object)((Object)this.m_Listener)));
            this.m_ActualListener.setFlowContext((Actor)this);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected String doExecute() {
        String result = null;
        try {
            this.m_ActualListener.startExecution();
        }
        catch (IllegalThreadStateException illegalThreadStateException) {
        }
        catch (Exception e) {
            result = this.handleException("Failed to start listener thread!", e);
        }
        int count = 0;
        while (!this.m_ActualListener.isListening()) {
            ++count;
            try {
                TwitterListener twitterListener = this;
                synchronized (twitterListener) {
                    ((Object)((Object)this)).wait(50L);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (count != 100) continue;
            result = "Thread timed out??";
            break;
        }
        return result;
    }

    protected void stopListening() {
        if (this.m_ActualListener != null) {
            this.m_ActualListener.stopExecution();
        }
    }

    public void stopExecution() {
        this.stopListening();
        super.stopExecution();
    }

    public Token output() {
        Token result = null;
        Status status = this.m_ActualListener.next();
        if (status != null) {
            result = new Token((Object)status);
        }
        return result;
    }

    public boolean hasPendingOutput() {
        return this.m_Executed && this.m_ActualListener != null && this.m_ActualListener.hasNext();
    }

    public void wrapUp() {
        super.wrapUp();
        this.m_ActualListener = null;
    }
}

