/*
 * Decompiled with CFR 0.152.
 */
package adams.flow.source;

import adams.core.License;
import adams.core.Pausable;
import adams.core.QuickInfoHelper;
import adams.core.Stoppable;
import adams.core.annotation.MixedCopyright;
import adams.core.net.TwitterHelper;
import adams.core.option.OptionHandler;
import adams.flow.core.Actor;
import adams.flow.core.Token;
import adams.flow.source.AbstractSource;
import java.io.Serializable;
import java.util.logging.Level;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.StreamListener;
import twitter4j.TwitterStream;

@MixedCopyright(author="Yusuke Yamamoto", copyright="2007-2010 Yusuke Yamamoto", license=License.APACHE2, url="http://twitter4j.org/en/code-examples.html")
public class TwitterListener
extends AbstractSource {
    private static final long serialVersionUID = -7777610085728160967L;
    protected int m_MaxStatusUpdates;
    protected transient Listener m_Listener;

    public String globalInfo() {
        return "Uses the Twitter streaming API to retrieve tweets.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("max-updates", "maxStatusUpdates", (Object)100, (Number)-1, null);
    }

    public String getQuickInfo() {
        return QuickInfoHelper.toString((OptionHandler)this, (String)"maxStatusUpdates", (Object)((this.m_MaxStatusUpdates <= 0 ? "unlimited " : "" + this.m_MaxStatusUpdates) + " status updates"));
    }

    public Class[] generates() {
        return new Class[]{Status.class};
    }

    public void setMaxStatusUpdates(int value) {
        this.m_MaxStatusUpdates = value;
        this.reset();
    }

    public int getMaxStatusUpdates() {
        return this.m_MaxStatusUpdates;
    }

    public String maxStatusUpdatesTipText() {
        return "The maximum number of status updates to output; use <=0 for unlimited.";
    }

    public String setUp() {
        String result = super.setUp();
        if (result == null) {
            if (this.m_Listener != null) {
                this.m_Listener.stopExecution();
            }
            this.m_Listener = new Listener(this);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected String doExecute() {
        String result = null;
        try {
            this.m_Listener.startExecution();
        }
        catch (IllegalThreadStateException illegalThreadStateException) {
        }
        catch (Exception e) {
            result = this.handleException("Failed to start listener thread!", e);
        }
        int count = 0;
        while (!this.m_Listener.isListening()) {
            ++count;
            try {
                TwitterListener twitterListener = this;
                synchronized (twitterListener) {
                    ((Object)((Object)this)).wait(50L);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (count != 100) continue;
            result = "Thread timed out??";
            break;
        }
        return result;
    }

    protected void stopListening() {
        if (this.m_Listener != null) {
            this.m_Listener.stopExecution();
        }
    }

    public void stopExecution() {
        this.stopListening();
        super.stopExecution();
    }

    public Token output() {
        Token result = null;
        Status status = this.m_Listener.next();
        if (status != null) {
            result = new Token((Object)status);
        }
        return result;
    }

    public boolean hasPendingOutput() {
        return this.m_Executed && this.m_Listener != null && this.m_Listener.hasNext();
    }

    public void wrapUp() {
        super.wrapUp();
        this.m_Listener = null;
    }

    public static class Listener
    implements Serializable,
    Pausable,
    Stoppable,
    StatusListener {
        private static final long serialVersionUID = 5406360301457780558L;
        protected TwitterListener m_Owner;
        protected TwitterStream m_Twitter;
        protected int m_Count;
        protected Status m_Next;
        protected boolean m_Paused;
        protected boolean m_Listening;

        public Listener(TwitterListener owner) {
            if (owner == null) {
                throw new IllegalArgumentException("Owner cannot be null!");
            }
            this.m_Owner = owner;
            this.m_Count = 0;
            this.m_Twitter = TwitterHelper.getTwitterStreamConnection((Actor)this.getOwner());
        }

        public TwitterListener getOwner() {
            return this.m_Owner;
        }

        public void startExecution() {
            try {
                this.m_Twitter.addListener((StreamListener)this);
                this.m_Twitter.sample();
                this.m_Listening = true;
            }
            catch (Exception e) {
                this.m_Twitter.removeListener((StreamListener)this);
                this.getOwner().getLogger().log(Level.SEVERE, "Failed to start listener!", (Throwable)e);
            }
        }

        public boolean isListening() {
            return this.m_Listening;
        }

        public void pauseExecution() {
            if (this.m_Listening) {
                this.m_Paused = true;
            }
        }

        public boolean isPaused() {
            return this.m_Paused;
        }

        public void resumeExecution() {
            this.m_Paused = false;
        }

        public void onStatus(Status status) {
            if (this.m_Listening && !this.m_Paused) {
                if (this.getOwner().getMaxStatusUpdates() > 0 && this.m_Count >= this.getOwner().getMaxStatusUpdates()) {
                    this.stopExecution();
                } else {
                    this.m_Next = status;
                }
            }
        }

        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
        }

        public void onTrackLimitationNotice(int i) {
        }

        public void onScrubGeo(long l, long l1) {
        }

        public void onStallWarning(StallWarning stallWarning) {
            this.getOwner().getLogger().warning(stallWarning.toString());
        }

        public void onException(Exception e) {
            this.getOwner().getLogger().log(Level.SEVERE, "Exception encountered while listening!", (Throwable)e);
        }

        public void stopExecution() {
            this.m_Listening = false;
            this.m_Paused = false;
            this.m_Twitter.removeListener((StreamListener)this);
            try {
                this.m_Twitter.shutdown();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.m_Twitter.cleanUp();
        }

        public boolean hasNext() {
            return this.m_Listening || this.m_Next != null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Status next() {
            Status result = null;
            int count = 0;
            while (result == null) {
                result = this.m_Next;
                ++count;
                if (result == null) {
                    if (!this.m_Listening) break;
                    try {
                        Listener listener = this;
                        synchronized (listener) {
                            this.wait(50L);
                        }
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                if (count != 100) continue;
            }
            if (result != null) {
                ++this.m_Count;
                if (this.getOwner().isLoggingEnabled() && this.m_Count % 100 == 0) {
                    this.getOwner().getLogger().info("status updates: " + this.m_Count);
                }
            }
            this.m_Next = null;
            return result;
        }
    }
}

