/*
 * Decompiled with CFR 0.152.
 */
package adams.flow.source;

import adams.core.annotation.MixedCopyright;
import adams.flow.core.AbstractActor;
import adams.flow.core.Token;
import adams.flow.core.TwitterUtils;
import adams.flow.source.AbstractSource;
import java.io.IOException;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.StatusStream;
import twitter4j.TwitterException;
import twitter4j.TwitterStream;

@MixedCopyright
public class TwitterListener
extends AbstractSource {
    private static final long serialVersionUID = -7777610085728160967L;
    protected int m_MaxStatusUpdates;
    protected transient Listener m_Listener;

    public String globalInfo() {
        return "Uses the Twitter streaming API to retrieve tweets.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("max-updates", "maxStatusUpdates", (Object)100, (Number)-1, null);
    }

    public String getQuickInfo() {
        String variable = this.getOptionManager().getVariableForProperty("maxStatusUpdates");
        if (variable != null) {
            return variable;
        }
        return (this.m_MaxStatusUpdates <= 0 ? "unlimited " : "" + this.m_MaxStatusUpdates) + " status updates";
    }

    public Class[] generates() {
        return new Class[]{Status.class};
    }

    public void setMaxStatusUpdates(int value) {
        this.m_MaxStatusUpdates = value;
    }

    public int getMaxStatusUpdates() {
        return this.m_MaxStatusUpdates;
    }

    public String maxStatusUpdatesTipText() {
        return "The maximum number of status updates to output; use <=0 for unlimited.";
    }

    public String setUp() {
        String result = super.setUp();
        if (result == null) {
            if (this.m_Listener != null) {
                this.m_Listener.stopListening();
            }
            this.m_Listener = new Listener(this);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected String doExecute() {
        String result = null;
        this.m_Listener.start();
        int count = 0;
        while (!this.m_Listener.isListening()) {
            ++count;
            try {
                TwitterListener twitterListener = this;
                synchronized (twitterListener) {
                    ((Object)((Object)this)).wait(50L);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (count != 100) continue;
            result = "Thread timed out??";
            break;
        }
        return result;
    }

    protected void stopListening() {
        if (this.m_Listener != null) {
            this.m_Listener.stopListening();
        }
    }

    public void stopExecution() {
        this.stopListening();
        super.stopExecution();
    }

    public Token output() {
        Token result = null;
        Status status = this.m_Listener.next();
        if (status != null) {
            result = new Token((Object)status);
        }
        return result;
    }

    public boolean hasPendingOutput() {
        return this.m_Executed && this.m_Listener != null && this.m_Listener.hasNext();
    }

    public void wrapUp() {
        super.wrapUp();
        this.m_Listener = null;
    }

    public static class Listener
    extends Thread
    implements StatusListener {
        public static final int TCP_ERROR_INITIAL_WAIT = 250;
        public static final int TCP_ERROR_WAIT_CAP = 16000;
        public static final int HTTP_ERROR_INITIAL_WAIT = 10000;
        public static final int HTTP_ERROR_WAIT_CAP = 240000;
        protected TwitterListener m_Owner;
        protected TwitterStream m_Twitter;
        protected StatusListener m_TwitterListener;
        protected boolean m_Listening;
        protected int m_Count;
        protected StatusStream m_Stream;
        protected int m_TimeToSleep;
        protected Status m_Next;

        public Listener(TwitterListener owner) {
            this.m_Owner = owner;
            this.m_Next = null;
        }

        public TwitterListener getOwner() {
            return this.m_Owner;
        }

        public void run() {
            this.m_Listening = true;
            this.m_Count = 0;
            this.m_Twitter = TwitterUtils.getTwitterStreamConnection((AbstractActor)this.getOwner());
            while (this.m_Listening) {
                try {
                    if (!this.m_Listening || this.m_Stream != null) continue;
                    this.m_Stream = this.m_Twitter.getSampleStream();
                    this.m_TimeToSleep = 0;
                    while (this.m_Listening) {
                        this.m_Stream.next((StatusListener)this);
                    }
                }
                catch (TwitterException te) {
                    if (!this.m_Listening) continue;
                    this.m_TimeToSleep = 0 == this.m_TimeToSleep && te.getStatusCode() > 200 ? 10000 : 250;
                    if (this.m_Listening) {
                        try {
                            Thread.sleep(this.m_TimeToSleep);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                        this.m_TimeToSleep = Math.min(this.m_TimeToSleep * 2, te.getStatusCode() > 200 ? 240000 : 16000);
                    }
                    this.m_Stream = null;
                    this.onException((Exception)((Object)te));
                }
            }
            try {
                this.m_Stream.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            this.getOwner().stopListening();
        }

        public boolean isListening() {
            return this.m_Listening;
        }

        public void stopListening() {
            this.m_Listening = false;
        }

        public void onDeletionNotice(StatusDeletionNotice arg0) {
        }

        public void onException(Exception e) {
            this.getOwner().getSystemErr().printStackTrace((Throwable)e);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onStatus(Status status) {
            if (this.getOwner().getMaxStatusUpdates() > 0 && this.m_Count >= this.getOwner().getMaxStatusUpdates()) {
                this.m_Listening = false;
            } else {
                this.m_Next = status;
            }
            Listener listener = this;
            synchronized (listener) {
                this.notifyAll();
            }
        }

        public void onTrackLimitationNotice(int arg0) {
        }

        public void onScrubGeo(int arg0, long arg1) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Status next() {
            Listener listener;
            Status result = null;
            int count = 0;
            while (result == null) {
                result = this.m_Next;
                ++count;
                if (result == null) {
                    if (!this.m_Listening) break;
                    try {
                        listener = this;
                        synchronized (listener) {
                            this.wait(50L);
                        }
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
                if (count != 100) continue;
            }
            if (result != null) {
                ++this.m_Count;
                if (this.getOwner().isDebugOn() && this.m_Count % 50 == 0) {
                    this.getOwner().debug("status updates: " + this.m_Count);
                }
            }
            this.m_Next = null;
            listener = this;
            synchronized (listener) {
                this.notifyAll();
            }
            return result;
        }

        public boolean hasNext() {
            return this.m_Listening || this.m_Next != null;
        }
    }
}

