/*
 * Decompiled with CFR 0.152.
 */
package adams.flow.control;

import adams.core.MessageCollection;
import adams.core.QuickInfoHelper;
import adams.core.Utils;
import adams.core.VariableName;
import adams.core.net.rabbitmq.receive.AbstractConverter;
import adams.core.net.rabbitmq.receive.BinaryConverter;
import adams.core.option.OptionHandler;
import adams.flow.container.EncapsulatedActorsContainer;
import adams.flow.control.AbstractRabbitMQControlActor;
import adams.flow.control.StorageName;
import adams.flow.control.SubProcess;
import adams.flow.core.Actor;
import adams.flow.core.ActorExecution;
import adams.flow.core.ActorHandlerInfo;
import adams.flow.core.ActorUtils;
import adams.flow.core.EncapsulateActors;
import adams.flow.core.InputConsumer;
import adams.flow.core.OutputProducer;
import adams.flow.core.Token;
import adams.flow.core.Unknown;
import com.rabbitmq.client.DeliverCallback;

public class RabbitMQRemoteSubProcess
extends AbstractRabbitMQControlActor
implements InputConsumer,
OutputProducer {
    private static final long serialVersionUID = 5816569944356142679L;
    protected AbstractConverter m_ReceiveConverter;

    public String globalInfo() {
        return "Encapsulates a sequence of actors to be executed remotely. The first actor must accept input and the last one must produce output.";
    }

    @Override
    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("receive-converter", "receiveConverter", (Object)new BinaryConverter());
    }

    public void setReceiveConverter(AbstractConverter value) {
        this.m_ReceiveConverter = value;
        this.reset();
    }

    public AbstractConverter getReceiveConverter() {
        return this.m_ReceiveConverter;
    }

    public String receiveConverterTipText() {
        return "The converter to use for receiving data.";
    }

    @Override
    public String getQuickInfo() {
        String result = super.getQuickInfo();
        result = result + QuickInfoHelper.toString((OptionHandler)this, (String)"receiveConverter", (Object)((Object)this.m_ReceiveConverter), (String)", receive: ");
        return result;
    }

    @Override
    protected String checkSubActor(int index, Actor actor) {
        if (ActorUtils.isSource((Actor)actor)) {
            return "You cannot add a source actor ('" + actor.getName() + "'/" + actor.getClass().getName() + ")!";
        }
        return null;
    }

    @Override
    protected String checkSubActors(Actor[] actors) {
        if (actors.length > 0) {
            for (int i = 0; i < actors.length; ++i) {
                if (actors[i].getSkip() || actors[i] instanceof InputConsumer) continue;
                return "You need to provide an actor that processes input, '" + actors[i].getName() + "'/" + actors[i].getClass().getName() + " doesn't!";
            }
        }
        return null;
    }

    @Override
    public ActorHandlerInfo getActorHandlerInfo() {
        return new ActorHandlerInfo().allowStandalones(false).allowSource(false).actorExecution(ActorExecution.SEQUENTIAL).forwardsInput(true);
    }

    public Class[] accepts() {
        if (this.active() > 0) {
            return ((InputConsumer)this.firstActive()).accepts();
        }
        return new Class[]{Unknown.class};
    }

    public Class[] generates() {
        if (this.active() > 0) {
            return ((OutputProducer)this.lastActive()).generates();
        }
        return new Class[]{Unknown.class};
    }

    @Override
    public String setUp() {
        String result = super.setUp();
        if (result == null) {
            if (!(this.firstActive() instanceof InputConsumer)) {
                result = "First actor ('" + this.firstActive().getName() + "') does not accept input!";
            } else if (!(this.lastActive() instanceof OutputProducer)) {
                result = "Last actor ('" + this.lastActive().getName() + "') does not generate output!";
            }
        }
        return result;
    }

    @Override
    protected EncapsulatedActorsContainer encapsulate() {
        SubProcess sub = new SubProcess();
        sub.setName("source: " + this.getFullName());
        for (int i = 0; i < this.size(); ++i) {
            sub.add(this.get(i).shallowCopy(false));
        }
        sub.setParent(this.getParent());
        sub.setVariables(this.getVariables());
        return EncapsulateActors.wrap((Actor)sub, (VariableName[])this.m_VariableNames, (StorageName[])this.m_StorageNames, (Object)this.m_InputToken.getPayload());
    }

    @Override
    protected DeliverCallback generateDeliverCallback() {
        this.m_ReceiveConverter.setFlowContext((Actor)this);
        DeliverCallback result = (consumerTag, delivery) -> {
            try {
                byte[] dataRec = delivery.getBody();
                MessageCollection errorsRec = new MessageCollection();
                Object objRec = this.m_ReceiveConverter.convert(dataRec, errorsRec);
                if (objRec != null) {
                    if (objRec instanceof EncapsulatedActorsContainer) {
                        EncapsulatedActorsContainer contRec = (EncapsulatedActorsContainer)objRec;
                        if (contRec.hasValue("Output")) {
                            Object generated = contRec.getValue("Output");
                            if (this.isLoggingEnabled()) {
                                this.getLogger().info("Received: " + generated);
                            }
                            this.m_OutputToken = new Token(generated);
                        } else {
                            this.getLogger().warning("Did not receive any generated output!");
                        }
                    } else {
                        this.getLogger().severe("Expected " + Utils.classToString(EncapsulatedActorsContainer.class) + " but received " + Utils.classToString((Object)objRec) + " back!");
                    }
                }
            }
            catch (Exception e) {
                this.handleException("Failed to process received data!", e);
            }
            finally {
                this.m_Processing = null;
            }
        };
        return result;
    }
}

