/*
 * Decompiled with CFR 0.152.
 */
package adams.multiprocess;

import adams.core.MessageCollection;
import adams.core.QuickInfoHelper;
import adams.core.logging.LoggingHelper;
import adams.core.logging.LoggingSupporter;
import adams.core.net.rabbitmq.RabbitMQHelper;
import adams.core.net.rabbitmq.connection.AbstractConnectionFactory;
import adams.core.net.rabbitmq.connection.GuestConnectionFactory;
import adams.core.net.rabbitmq.receive.BinaryConverter;
import adams.core.net.rabbitmq.send.AbstractConverter;
import adams.core.option.OptionHandler;
import adams.event.JobCompleteEvent;
import adams.event.JobCompleteListener;
import adams.multiprocess.AbstractJobRunner;
import adams.multiprocess.Job;
import adams.multiprocess.JobList;
import adams.multiprocess.JobResult;
import adams.multiprocess.LocalJobRunner;
import com.github.fracpete.javautils.Enumerate;
import com.github.fracpete.javautils.enumerate.Enumerated;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class RabbitMQJobRunner<T extends Job>
extends AbstractJobRunner<T> {
    private static final long serialVersionUID = 8430171807757802783L;
    protected AbstractConnectionFactory m_ConnectionFactory;
    protected int m_PrefetchCount;
    protected String m_Queue;
    protected AbstractConverter m_SendConverter;
    protected adams.core.net.rabbitmq.receive.AbstractConverter m_ReceiveConverter;
    protected boolean m_DistributeJobs;
    protected transient Connection m_Connection;
    protected transient Channel m_Channel;
    protected transient HashSet<JobCompleteListener> m_JobCompleteListeners;
    protected String m_CallbackQueue;
    protected List<T> m_Jobs;
    protected Set<Integer> m_Processing;

    public String globalInfo() {
        return "JobRunner distributing jobs via a RabbitMQ broker.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("connection-factory", "connectionFactory", (Object)new GuestConnectionFactory());
        this.m_OptionManager.add("prefetch-count", "prefetchCount", (Object)1, (Number)0, null);
        this.m_OptionManager.add("queue", "queue", (Object)"");
        this.m_OptionManager.add("send-converter", "sendConverter", (Object)new adams.core.net.rabbitmq.send.BinaryConverter());
        this.m_OptionManager.add("receive-converter", "receiveConverter", (Object)new BinaryConverter());
        this.m_OptionManager.add("distribute-jobs", "distributeJobs", (Object)false);
    }

    protected void initialize() {
        super.initialize();
        this.m_Jobs = new ArrayList<T>();
    }

    public void setConnectionFactory(AbstractConnectionFactory value) {
        this.m_ConnectionFactory = value;
        this.reset();
    }

    public AbstractConnectionFactory getConnectionFactory() {
        return this.m_ConnectionFactory;
    }

    public String connectionFactoryTipText() {
        return "The base connection factory to encrypt.";
    }

    public void setPrefetchCount(int value) {
        this.m_PrefetchCount = value;
        this.reset();
    }

    public int getPrefetchCount() {
        return this.m_PrefetchCount;
    }

    public String prefetchCountTipText() {
        return "The number of un-acked jobs a client can pull off a queue; 0 = unlimited, 1 = fair.";
    }

    public void setQueue(String value) {
        this.m_Queue = value;
        this.reset();
    }

    public String getQueue() {
        return this.m_Queue;
    }

    public String queueTipText() {
        return "The name of the queue.";
    }

    public void setSendConverter(AbstractConverter value) {
        this.m_SendConverter = value;
        this.reset();
    }

    public AbstractConverter getSendConverter() {
        return this.m_SendConverter;
    }

    public String sendConverterTipText() {
        return "The converter to use for sending.";
    }

    public void setReceiveConverter(adams.core.net.rabbitmq.receive.AbstractConverter value) {
        this.m_ReceiveConverter = value;
        this.reset();
    }

    public adams.core.net.rabbitmq.receive.AbstractConverter getReceiveConverter() {
        return this.m_ReceiveConverter;
    }

    public String receiveConverterTipText() {
        return "The converter to use for receiving data.";
    }

    public void setDistributeJobs(boolean value) {
        this.m_DistributeJobs = value;
        this.reset();
    }

    public boolean getDistributeJobs() {
        return this.m_DistributeJobs;
    }

    public String distributeJobsTipText() {
        return "If enabled, the jobs get distributed via separate messages.";
    }

    public String getQuickInfo() {
        String result = QuickInfoHelper.toString((OptionHandler)this, (String)"connectionFactory", (Object)((Object)this.m_ConnectionFactory), (String)"connection: ");
        result = result + QuickInfoHelper.toString((OptionHandler)this, (String)"prefetchCount", (Object)(this.m_PrefetchCount == 0 ? "unlimited" : "" + this.m_PrefetchCount), (String)", prefetch: ");
        result = result + QuickInfoHelper.toString((OptionHandler)this, (String)"queue", (Object)this.m_Queue, (String)", queue: ");
        result = result + QuickInfoHelper.toString((OptionHandler)this, (String)"sendConverter", (Object)((Object)this.m_SendConverter), (String)", send: ");
        result = result + QuickInfoHelper.toString((OptionHandler)this, (String)"receiveConverter", (Object)((Object)this.m_ReceiveConverter), (String)", receive: ");
        result = result + QuickInfoHelper.toString((OptionHandler)this, (String)"distributeJobs", (boolean)this.m_DistributeJobs, (String)"distribute jobs", (String)", ");
        return result;
    }

    protected Connection retrieveConnection() {
        MessageCollection errors = new MessageCollection();
        ConnectionFactory factory = this.m_ConnectionFactory.generate(errors);
        if (!errors.isEmpty()) {
            return null;
        }
        try {
            return factory.newConnection();
        }
        catch (Exception e) {
            errors.add("Failed to connect to broker (" + (Object)((Object)this.m_ConnectionFactory) + ")!", (Throwable)e);
            LoggingHelper.handleException((LoggingSupporter)this, (String)("Failed to connect to broker (" + (Object)((Object)this.m_ConnectionFactory) + ")!"), (Throwable)e);
            return null;
        }
    }

    protected String preStart() {
        String result = super.preStart();
        if (result == null) {
            this.m_Connection = this.retrieveConnection();
            if (this.m_Connection == null) {
                result = "Failed to connect to broker (" + (Object)((Object)this.m_ConnectionFactory) + ")!";
            }
        }
        if (result == null) {
            try {
                this.m_Channel = this.m_Connection.createChannel();
                if (this.m_Channel == null) {
                    result = "Failed to create a channel!";
                } else {
                    this.m_Channel.basicQos(this.m_PrefetchCount);
                }
            }
            catch (Exception e) {
                result = LoggingHelper.handleException((LoggingSupporter)this, (String)"Failed to create channel!", (Throwable)e);
            }
        }
        return result;
    }

    protected String doStart() {
        String result = null;
        this.m_SendConverter.setFlowContext(this.getFlowContext());
        this.m_ReceiveConverter.setFlowContext(this.getFlowContext());
        this.m_CallbackQueue = null;
        try {
            this.m_CallbackQueue = this.m_Channel.queueDeclare().getQueue();
        }
        catch (Exception e) {
            result = LoggingHelper.handleException((LoggingSupporter)this, (String)"Failed to create queue!", (Throwable)e);
        }
        this.m_Processing = new HashSet<Integer>();
        ArrayList<byte[]> runners = new ArrayList<byte[]>();
        if (result == null) {
            byte[] ser;
            LocalJobRunner runner;
            MessageCollection errors = new MessageCollection();
            if (this.m_DistributeJobs) {
                for (Enumerated enm : Enumerate.enumerate(this.getJobs())) {
                    runner = new LocalJobRunner();
                    runner.getMetaData().put("index", enm.index);
                    runner.add((Job)enm.value);
                    this.m_Processing.add(enm.index);
                    ser = this.m_SendConverter.convert(runner, errors);
                    if (ser == null) continue;
                    runners.add(ser);
                }
            } else {
                runner = new LocalJobRunner();
                for (Job job : this.getJobs()) {
                    runner.add(job);
                }
                runner.getMetaData().put("index", 0);
                this.m_Processing.add(0);
                ser = this.m_SendConverter.convert(runner, errors);
                if (ser != null) {
                    runners.add(ser);
                }
            }
            if (!errors.isEmpty()) {
                result = errors.toString();
            }
        }
        if (result == null) {
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().replyTo(this.m_CallbackQueue).build();
            for (byte[] r : runners) {
                try {
                    this.m_Channel.basicPublish("", this.m_Queue, props, r);
                }
                catch (Exception e) {
                    result = LoggingHelper.handleException((LoggingSupporter)this, (String)("Failed to publish job (queue=" + this.m_Queue + ")!"), (Throwable)e);
                }
                if (result == null) continue;
                break;
            }
        }
        return result;
    }

    protected String purgeQueue() {
        String result = null;
        if (this.m_Channel != null) {
            try {
                this.m_Channel.queuePurge(this.m_Queue);
            }
            catch (Exception e) {
                result = LoggingHelper.handleException((LoggingSupporter)this, (String)("Failed to purge queue: " + this.m_Queue), (Throwable)e);
            }
        }
        return result;
    }

    protected String deleteCallbackQueue() {
        String result = null;
        if (this.m_Channel != null) {
            try {
                this.m_Channel.queueDelete(this.m_CallbackQueue);
            }
            catch (Exception e) {
                result = LoggingHelper.handleException((LoggingSupporter)this, (String)("Failed to delete queue: " + this.m_CallbackQueue), (Throwable)e);
            }
        }
        return result;
    }

    protected void close() {
        RabbitMQHelper.closeQuietly(this.m_Channel);
        RabbitMQHelper.closeQuietly(this.m_Connection);
        this.m_Channel = null;
        this.m_Connection = null;
    }

    protected String doStop() {
        String result = null;
        try {
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                MessageCollection errorsRec;
                byte[] dataRec = delivery.getBody();
                LocalJobRunner jobrunner = (LocalJobRunner)this.m_ReceiveConverter.convert(dataRec, errorsRec = new MessageCollection());
                if (jobrunner != null) {
                    Integer index = (Integer)jobrunner.getMetaData().get("index");
                    if (index != null) {
                        if (this.isLoggingEnabled()) {
                            this.getLogger().info("Job #" + index + " received");
                        }
                        if (this.m_DistributeJobs) {
                            this.m_Jobs.set(index, (Job)jobrunner.getJobs().get(0));
                        } else {
                            for (int i = 0; i < jobrunner.getJobs().size(); ++i) {
                                this.m_Jobs.set(i, (Job)jobrunner.getJobs().get(i));
                            }
                        }
                        this.m_Processing.remove(index);
                    } else {
                        this.getLogger().warning("No job index stored in meta-data of jobrunner?");
                    }
                }
            };
            while (this.m_Processing.size() > 0) {
                this.m_Channel.basicConsume(this.m_CallbackQueue, true, deliverCallback, consumerTag -> {});
            }
        }
        catch (Exception e) {
            result = LoggingHelper.handleException((LoggingSupporter)this, (String)"Failed to receive data!", (Throwable)e);
        }
        String msg = this.doTerminate(false);
        if (msg != null) {
            result = result == null ? msg : result + "\n" + msg;
        }
        return result;
    }

    protected String doTerminate(boolean wait) {
        this.m_Processing.clear();
        String result = this.purgeQueue();
        String msg = this.deleteCallbackQueue();
        if (msg != null) {
            result = result == null ? msg : result + "\n" + msg;
        }
        this.close();
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addJobCompleteListener(JobCompleteListener l) {
        HashSet<JobCompleteListener> hashSet = this.m_JobCompleteListeners;
        synchronized (hashSet) {
            this.m_JobCompleteListeners.add(l);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeJobCompleteListener(JobCompleteListener l) {
        HashSet<JobCompleteListener> hashSet = this.m_JobCompleteListeners;
        synchronized (hashSet) {
            this.m_JobCompleteListeners.remove(l);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifyJobCompleteListeners(JobCompleteEvent e) {
        HashSet<JobCompleteListener> hashSet = this.m_JobCompleteListeners;
        synchronized (hashSet) {
            Iterator<JobCompleteListener> iter = this.m_JobCompleteListeners.iterator();
            while (iter.hasNext()) {
                iter.next().jobCompleted(e);
            }
        }
    }

    public void clear() {
        this.m_Jobs.clear();
    }

    public void add(T job) {
        this.m_Jobs.add(job);
    }

    public void add(JobList<T> jobs) {
        this.m_Jobs.addAll((Collection<T>)jobs);
    }

    public List<T> getJobs() {
        return this.m_Jobs;
    }

    public void complete(T j, JobResult jr) {
        this.notifyJobCompleteListeners(new JobCompleteEvent((Object)this, j, jr));
        if (j.getJobCompleteListener() != null) {
            j.getJobCompleteListener().jobCompleted(new JobCompleteEvent((Object)this, j, jr));
        }
    }
}

