package adams.multiprocess;

import adams.core.MessageCollection;
import adams.core.QuickInfoHelper;
import adams.core.logging.LoggingHelper;
import adams.core.net.rabbitmq.RabbitMQHelper;
import adams.core.net.rabbitmq.connection.AbstractConnectionFactory;
import adams.core.net.rabbitmq.connection.GuestConnectionFactory;
import adams.core.net.rabbitmq.send.AbstractConverter;
import adams.core.net.rabbitmq.send.BinaryConverter;
import adams.event.JobCompleteEvent;
import adams.event.JobCompleteListener;
import adams.multiprocess.Job;
import com.github.fracpete.javautils.Enumerate;
import com.github.fracpete.javautils.enumerate.Enumerated;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/* loaded from: input_file:adams/multiprocess/RabbitMQJobRunner.class */
public class RabbitMQJobRunner<T extends Job> extends AbstractJobRunner<T> {
    private static final long serialVersionUID = 8430171807757802783L;
    protected AbstractConnectionFactory m_ConnectionFactory;
    protected int m_PrefetchCount;
    protected String m_Queue;
    protected AbstractConverter m_SendConverter;
    protected adams.core.net.rabbitmq.receive.AbstractConverter m_ReceiveConverter;
    protected boolean m_DistributeJobs;
    protected transient Connection m_Connection;
    protected transient Channel m_Channel;
    protected transient HashSet<JobCompleteListener> m_JobCompleteListeners;
    protected String m_CallbackQueue;
    protected List<T> m_Jobs;
    protected Set<Integer> m_Processing;

    public String globalInfo() {
        return "JobRunner distributing jobs via a RabbitMQ broker.";
    }

    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("connection-factory", "connectionFactory", new GuestConnectionFactory());
        this.m_OptionManager.add("prefetch-count", "prefetchCount", 1, 0, (Number) null);
        this.m_OptionManager.add("queue", "queue", "");
        this.m_OptionManager.add("send-converter", "sendConverter", new BinaryConverter());
        this.m_OptionManager.add("receive-converter", "receiveConverter", new adams.core.net.rabbitmq.receive.BinaryConverter());
        this.m_OptionManager.add("distribute-jobs", "distributeJobs", false);
    }

    protected void initialize() {
        super.initialize();
        this.m_Jobs = new ArrayList();
    }

    public void setConnectionFactory(AbstractConnectionFactory abstractConnectionFactory) {
        this.m_ConnectionFactory = abstractConnectionFactory;
        reset();
    }

    public AbstractConnectionFactory getConnectionFactory() {
        return this.m_ConnectionFactory;
    }

    public String connectionFactoryTipText() {
        return "The base connection factory to encrypt.";
    }

    public void setPrefetchCount(int i) {
        this.m_PrefetchCount = i;
        reset();
    }

    public int getPrefetchCount() {
        return this.m_PrefetchCount;
    }

    public String prefetchCountTipText() {
        return "The number of un-acked jobs a client can pull off a queue; 0 = unlimited, 1 = fair.";
    }

    public void setQueue(String str) {
        this.m_Queue = str;
        reset();
    }

    public String getQueue() {
        return this.m_Queue;
    }

    public String queueTipText() {
        return "The name of the queue.";
    }

    public void setSendConverter(AbstractConverter abstractConverter) {
        this.m_SendConverter = abstractConverter;
        reset();
    }

    public AbstractConverter getSendConverter() {
        return this.m_SendConverter;
    }

    public String sendConverterTipText() {
        return "The converter to use for sending.";
    }

    public void setReceiveConverter(adams.core.net.rabbitmq.receive.AbstractConverter abstractConverter) {
        this.m_ReceiveConverter = abstractConverter;
        reset();
    }

    public adams.core.net.rabbitmq.receive.AbstractConverter getReceiveConverter() {
        return this.m_ReceiveConverter;
    }

    public String receiveConverterTipText() {
        return "The converter to use for receiving data.";
    }

    public void setDistributeJobs(boolean z) {
        this.m_DistributeJobs = z;
        reset();
    }

    public boolean getDistributeJobs() {
        return this.m_DistributeJobs;
    }

    public String distributeJobsTipText() {
        return "If enabled, the jobs get distributed via separate messages.";
    }

    public String getQuickInfo() {
        return ((((QuickInfoHelper.toString(this, "connectionFactory", this.m_ConnectionFactory, "connection: ") + QuickInfoHelper.toString(this, "prefetchCount", this.m_PrefetchCount == 0 ? "unlimited" : "" + this.m_PrefetchCount, ", prefetch: ")) + QuickInfoHelper.toString(this, "queue", this.m_Queue, ", queue: ")) + QuickInfoHelper.toString(this, "sendConverter", this.m_SendConverter, ", send: ")) + QuickInfoHelper.toString(this, "receiveConverter", this.m_ReceiveConverter, ", receive: ")) + QuickInfoHelper.toString(this, "distributeJobs", this.m_DistributeJobs, "distribute jobs", ", ");
    }

    protected Connection retrieveConnection() {
        MessageCollection messageCollection = new MessageCollection();
        ConnectionFactory generate = this.m_ConnectionFactory.generate(messageCollection);
        if (!messageCollection.isEmpty()) {
            return null;
        }
        try {
            return generate.newConnection();
        } catch (Exception e) {
            messageCollection.add("Failed to connect to broker (" + this.m_ConnectionFactory + ")!", e);
            LoggingHelper.handleException(this, "Failed to connect to broker (" + this.m_ConnectionFactory + ")!", e);
            return null;
        }
    }

    protected String preStart() {
        String preStart = super.preStart();
        if (preStart == null) {
            this.m_Connection = retrieveConnection();
            if (this.m_Connection == null) {
                preStart = "Failed to connect to broker (" + this.m_ConnectionFactory + ")!";
            }
        }
        if (preStart == null) {
            try {
                this.m_Channel = this.m_Connection.createChannel();
                if (this.m_Channel == null) {
                    preStart = "Failed to create a channel!";
                } else {
                    this.m_Channel.basicQos(this.m_PrefetchCount);
                }
            } catch (Exception e) {
                preStart = LoggingHelper.handleException(this, "Failed to create channel!", e);
            }
        }
        return preStart;
    }

    protected String doStart() {
        String str = null;
        this.m_SendConverter.setFlowContext(getFlowContext());
        this.m_ReceiveConverter.setFlowContext(getFlowContext());
        this.m_CallbackQueue = null;
        try {
            this.m_CallbackQueue = this.m_Channel.queueDeclare().getQueue();
        } catch (Exception e) {
            str = LoggingHelper.handleException(this, "Failed to create queue!", e);
        }
        this.m_Processing = new HashSet();
        ArrayList arrayList = new ArrayList();
        if (str == null) {
            MessageCollection messageCollection = new MessageCollection();
            if (this.m_DistributeJobs) {
                for (Enumerated enumerated : Enumerate.enumerate(getJobs())) {
                    LocalJobRunner localJobRunner = new LocalJobRunner();
                    localJobRunner.getMetaData().put("index", Integer.valueOf(enumerated.index));
                    localJobRunner.add((Job) enumerated.value);
                    this.m_Processing.add(Integer.valueOf(enumerated.index));
                    byte[] convert = this.m_SendConverter.convert(localJobRunner, messageCollection);
                    if (convert != null) {
                        arrayList.add(convert);
                    }
                }
            } else {
                LocalJobRunner localJobRunner2 = new LocalJobRunner();
                Iterator<T> it = getJobs().iterator();
                while (it.hasNext()) {
                    localJobRunner2.add(it.next());
                }
                localJobRunner2.getMetaData().put("index", 0);
                this.m_Processing.add(0);
                byte[] convert2 = this.m_SendConverter.convert(localJobRunner2, messageCollection);
                if (convert2 != null) {
                    arrayList.add(convert2);
                }
            }
            if (!messageCollection.isEmpty()) {
                str = messageCollection.toString();
            }
        }
        if (str == null) {
            AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().replyTo(this.m_CallbackQueue).build();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                try {
                    this.m_Channel.basicPublish("", this.m_Queue, build, (byte[]) it2.next());
                } catch (Exception e2) {
                    str = LoggingHelper.handleException(this, "Failed to publish job (queue=" + this.m_Queue + ")!", e2);
                }
                if (str != null) {
                    break;
                }
            }
        }
        return str;
    }

    protected String purgeQueue() {
        String str = null;
        if (this.m_Channel != null) {
            try {
                this.m_Channel.queuePurge(this.m_Queue);
            } catch (Exception e) {
                str = LoggingHelper.handleException(this, "Failed to purge queue: " + this.m_Queue, e);
            }
        }
        return str;
    }

    protected String deleteCallbackQueue() {
        String str = null;
        if (this.m_Channel != null) {
            try {
                this.m_Channel.queueDelete(this.m_CallbackQueue);
            } catch (Exception e) {
                str = LoggingHelper.handleException(this, "Failed to delete queue: " + this.m_CallbackQueue, e);
            }
        }
        return str;
    }

    protected void close() {
        RabbitMQHelper.closeQuietly(this.m_Channel);
        RabbitMQHelper.closeQuietly(this.m_Connection);
        this.m_Channel = null;
        this.m_Connection = null;
    }

    protected String doStop() {
        String str = null;
        try {
            DeliverCallback deliverCallback = (str2, delivery) -> {
                LocalJobRunner localJobRunner = (LocalJobRunner) this.m_ReceiveConverter.convert(delivery.getBody(), new MessageCollection());
                if (localJobRunner != null) {
                    Integer num = (Integer) localJobRunner.getMetaData().get("index");
                    if (num == null) {
                        getLogger().warning("No job index stored in meta-data of jobrunner?");
                        return;
                    }
                    if (isLoggingEnabled()) {
                        getLogger().info("Job #" + num + " received");
                    }
                    if (this.m_DistributeJobs) {
                        this.m_Jobs.set(num.intValue(), (Job) localJobRunner.getJobs().get(0));
                    } else {
                        for (int i = 0; i < localJobRunner.getJobs().size(); i++) {
                            this.m_Jobs.set(i, (Job) localJobRunner.getJobs().get(i));
                        }
                    }
                    this.m_Processing.remove(num);
                }
            };
            while (this.m_Processing.size() > 0) {
                this.m_Channel.basicConsume(this.m_CallbackQueue, true, deliverCallback, str3 -> {
                });
            }
        } catch (Exception e) {
            str = LoggingHelper.handleException(this, "Failed to receive data!", e);
        }
        String doTerminate = doTerminate(false);
        if (doTerminate != null) {
            str = str == null ? doTerminate : str + "\n" + doTerminate;
        }
        return str;
    }

    protected String doTerminate(boolean z) {
        this.m_Processing.clear();
        String purgeQueue = purgeQueue();
        String deleteCallbackQueue = deleteCallbackQueue();
        if (deleteCallbackQueue != null) {
            purgeQueue = purgeQueue == null ? deleteCallbackQueue : purgeQueue + "\n" + deleteCallbackQueue;
        }
        close();
        return purgeQueue;
    }

    public void addJobCompleteListener(JobCompleteListener jobCompleteListener) {
        synchronized (this.m_JobCompleteListeners) {
            this.m_JobCompleteListeners.add(jobCompleteListener);
        }
    }

    public void removeJobCompleteListener(JobCompleteListener jobCompleteListener) {
        synchronized (this.m_JobCompleteListeners) {
            this.m_JobCompleteListeners.remove(jobCompleteListener);
        }
    }

    protected void notifyJobCompleteListeners(JobCompleteEvent jobCompleteEvent) {
        synchronized (this.m_JobCompleteListeners) {
            Iterator<JobCompleteListener> it = this.m_JobCompleteListeners.iterator();
            while (it.hasNext()) {
                it.next().jobCompleted(jobCompleteEvent);
            }
        }
    }

    public void clear() {
        this.m_Jobs.clear();
    }

    public void add(T t) {
        this.m_Jobs.add(t);
    }

    public void add(JobList<T> jobList) {
        this.m_Jobs.addAll(jobList);
    }

    public List<T> getJobs() {
        return this.m_Jobs;
    }

    public void complete(T t, JobResult jobResult) {
        notifyJobCompleteListeners(new JobCompleteEvent(this, t, jobResult));
        if (t.getJobCompleteListener() != null) {
            t.getJobCompleteListener().jobCompleted(new JobCompleteEvent(this, t, jobResult));
        }
    }
}
