package adams.flow.control;

import adams.core.MessageCollection;
import adams.core.QuickInfoHelper;
import adams.core.Utils;
import adams.core.net.rabbitmq.receive.AbstractConverter;
import adams.core.net.rabbitmq.receive.BinaryConverter;
import adams.flow.container.EncapsulatedActorsContainer;
import adams.flow.core.Actor;
import adams.flow.core.ActorExecution;
import adams.flow.core.ActorHandlerInfo;
import adams.flow.core.ActorUtils;
import adams.flow.core.EncapsulateActors;
import adams.flow.core.InputConsumer;
import adams.flow.core.OutputProducer;
import adams.flow.core.Token;
import adams.flow.core.Unknown;
import com.rabbitmq.client.DeliverCallback;

/* loaded from: input_file:adams/flow/control/RabbitMQRemoteSubProcess.class */
public class RabbitMQRemoteSubProcess extends AbstractRabbitMQControlActor implements InputConsumer, OutputProducer {
    private static final long serialVersionUID = 5816569944356142679L;
    protected AbstractConverter m_ReceiveConverter;

    public String globalInfo() {
        return "Encapsulates a sequence of actors to be executed remotely. The first actor must accept input and the last one must produce output.";
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("receive-converter", "receiveConverter", new BinaryConverter());
    }

    public void setReceiveConverter(AbstractConverter abstractConverter) {
        this.m_ReceiveConverter = abstractConverter;
        reset();
    }

    public AbstractConverter getReceiveConverter() {
        return this.m_ReceiveConverter;
    }

    public String receiveConverterTipText() {
        return "The converter to use for receiving data.";
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    public String getQuickInfo() {
        return super.getQuickInfo() + QuickInfoHelper.toString(this, "receiveConverter", this.m_ReceiveConverter, ", receive: ");
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    protected String checkSubActor(int i, Actor actor) {
        if (ActorUtils.isSource(actor)) {
            return "You cannot add a source actor ('" + actor.getName() + "'/" + actor.getClass().getName() + ")!";
        }
        return null;
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    protected String checkSubActors(Actor[] actorArr) {
        if (actorArr.length <= 0) {
            return null;
        }
        for (int i = 0; i < actorArr.length; i++) {
            if (!actorArr[i].getSkip() && !(actorArr[i] instanceof InputConsumer)) {
                return "You need to provide an actor that processes input, '" + actorArr[i].getName() + "'/" + actorArr[i].getClass().getName() + " doesn't!";
            }
        }
        return null;
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    public ActorHandlerInfo getActorHandlerInfo() {
        return new ActorHandlerInfo().allowStandalones(false).allowSource(false).actorExecution(ActorExecution.SEQUENTIAL).forwardsInput(true);
    }

    public Class[] accepts() {
        return active() > 0 ? firstActive().accepts() : new Class[]{Unknown.class};
    }

    public Class[] generates() {
        return active() > 0 ? lastActive().generates() : new Class[]{Unknown.class};
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    public String setUp() {
        String up = super.setUp();
        if (up == null) {
            if (!(firstActive() instanceof InputConsumer)) {
                up = "First actor ('" + firstActive().getName() + "') does not accept input!";
            } else if (!(lastActive() instanceof OutputProducer)) {
                up = "Last actor ('" + lastActive().getName() + "') does not generate output!";
            }
        }
        return up;
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    protected EncapsulatedActorsContainer encapsulate() {
        SubProcess subProcess = new SubProcess();
        subProcess.setName("source: " + getFullName());
        for (int i = 0; i < size(); i++) {
            subProcess.add(get(i).shallowCopy(false));
        }
        subProcess.setParent(getParent());
        subProcess.setVariables(getVariables());
        return EncapsulateActors.wrap(subProcess, this.m_VariableNames, this.m_StorageNames, this.m_InputToken.getPayload());
    }

    @Override // adams.flow.control.AbstractRabbitMQControlActor
    protected DeliverCallback generateDeliverCallback() {
        this.m_ReceiveConverter.setFlowContext(this);
        return (str, delivery) -> {
            try {
                try {
                    Object convert = this.m_ReceiveConverter.convert(delivery.getBody(), new MessageCollection());
                    if (convert != null) {
                        if (convert instanceof EncapsulatedActorsContainer) {
                            EncapsulatedActorsContainer encapsulatedActorsContainer = (EncapsulatedActorsContainer) convert;
                            if (encapsulatedActorsContainer.hasValue("Output")) {
                                Object value = encapsulatedActorsContainer.getValue("Output");
                                if (isLoggingEnabled()) {
                                    getLogger().info("Received: " + value);
                                }
                                this.m_OutputToken = new Token(value);
                            } else {
                                getLogger().warning("Did not receive any generated output!");
                            }
                        } else {
                            getLogger().severe("Expected " + Utils.classToString(EncapsulatedActorsContainer.class) + " but received " + Utils.classToString(convert) + " back!");
                        }
                    }
                    this.m_Processing = null;
                } catch (Exception e) {
                    handleException("Failed to process received data!", e);
                    this.m_Processing = null;
                }
            } catch (Throwable th) {
                this.m_Processing = null;
                throw th;
            }
        };
    }
}
