package adams.flow.control;

import adams.core.Utils;
import adams.core.Variables;
import adams.core.base.BaseText;
import adams.core.management.ProcessUtils;
import adams.flow.core.AbstractActor;
import adams.flow.core.ActorExecution;
import adams.flow.core.ActorHandler;
import adams.flow.core.ActorHandlerInfo;
import adams.flow.core.ActorUtils;
import adams.flow.core.InputConsumer;
import adams.flow.core.MutableActorHandler;
import adams.flow.core.Token;
import adams.flow.core.Unknown;
import adams.flow.sink.Null;
import adams.multiprocess.PausableFixedThreadPoolExecutor;
import java.util.Hashtable;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:adams/flow/control/LoadBalancer.class */
public class LoadBalancer extends AbstractControlActor implements InputConsumer, MutableActorHandler {
    private static final long serialVersionUID = -8782869993629454572L;
    public static final String BACKUP_CURRENT = "current";
    protected Sequence m_Actors;
    protected transient Token m_CurrentToken;
    protected int m_NumThreads;
    protected int m_ActualNumThreads;
    protected PausableFixedThreadPoolExecutor m_Executor;
    protected boolean m_HasGlobalTransformers;
    protected Vector<AbstractActor> m_ToCleanUp;
    protected int m_ThreadsSpawned;
    protected boolean m_UseLocalVariables;
    protected boolean m_UseLocalStorage;
    protected boolean m_DeepCopy;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:adams/flow/control/LoadBalancer$ThreadShell.class */
    public static class ThreadShell extends AbstractControlActor implements InputConsumer, StorageHandler {
        private static final long serialVersionUID = -3358113395261199819L;
        protected AbstractActor m_Actor;
        protected Storage m_LocalStorage;
        protected Variables m_LocalVariables;

        public ThreadShell(AbstractActor abstractActor, Variables variables, Storage storage) {
            if (abstractActor == null) {
                throw new IllegalArgumentException("Actor cannot be null!");
            }
            if (variables == null) {
                throw new IllegalArgumentException("Variables cannot be null!");
            }
            if (storage == null) {
                throw new IllegalArgumentException("Storage cannot be null!");
            }
            this.m_Actor = abstractActor;
            this.m_LocalVariables = variables;
            this.m_LocalStorage = storage;
        }

        @Override // adams.core.option.OptionHandlingObject
        public String globalInfo() {
            return "Shell around a LoadBalancer thread, in order to provide optional local scope for variables and storage.";
        }

        @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
        public ActorHandlerInfo getActorHandlerInfo() {
            return this.m_Actor instanceof ActorHandler ? ((ActorHandler) this.m_Actor).getActorHandlerInfo() : new ActorHandlerInfo(false, ActorExecution.PARALLEL, true);
        }

        @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
        public int size() {
            return 1;
        }

        @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
        public AbstractActor get(int i) {
            if (i == 0) {
                return this.m_Actor;
            }
            throw new IllegalArgumentException("Illegal index: " + i);
        }

        @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
        public void set(int i, AbstractActor abstractActor) {
            if (i != 0) {
                throw new IllegalArgumentException("Illegal index: " + i);
            }
            this.m_Actor = abstractActor;
            reset();
            updateParent();
        }

        @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
        public int indexOf(String str) {
            return this.m_Actor.getName().equals(str) ? 0 : -1;
        }

        @Override // adams.flow.core.AbstractActor, adams.core.VariablesHandler
        public synchronized Variables getVariables() {
            return this.m_LocalVariables;
        }

        @Override // adams.flow.control.StorageHandler
        public Storage getStorage() {
            return this.m_LocalStorage;
        }

        @Override // adams.flow.core.AbstractActor
        public StorageHandler getStorageHandler() {
            return this;
        }

        @Override // adams.flow.core.InputConsumer
        public Class[] accepts() {
            return this.m_Actor instanceof InputConsumer ? ((InputConsumer) this.m_Actor).accepts() : new Class[0];
        }

        @Override // adams.flow.core.InputConsumer
        public void input(Token token) {
            if (this.m_Actor instanceof InputConsumer) {
                ((InputConsumer) this.m_Actor).input(token);
            }
        }

        @Override // adams.flow.core.AbstractActor
        protected String doExecute() {
            return this.m_Actor.execute();
        }
    }

    @Override // adams.core.option.OptionHandlingObject
    public String globalInfo() {
        return "Runs the specified 'load actor' in as many separate threads as specified with the 'num-threads' parameter.\nNB: (changing) variables cannot be used in the load-balancer actor, as this would create unwanted side-effects.";
    }

    @Override // adams.flow.core.AbstractActor, adams.core.option.OptionHandlingObject, adams.core.option.OptionHandler
    public void defineOptions() {
        super.defineOptions();
        this.m_OptionManager.add("load", "loadActors", new AbstractActor[]{new Null()});
        this.m_OptionManager.add("num-threads", "numThreads", -1, -1, null);
        this.m_OptionManager.add("use-local-vars", "useLocalVariables", false);
        this.m_OptionManager.add("use-local-storage", "useLocalStorage", false);
        this.m_OptionManager.add("deep-copy", "deepCopy", false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.AbstractActor, adams.core.option.OptionHandlingObject
    public void initialize() {
        super.initialize();
        this.m_CurrentToken = null;
        this.m_ToCleanUp = new Vector<>();
        this.m_Actors = new Sequence();
        this.m_Actors.setAllowStandalones(true);
        this.m_Actors.setAllowSource(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // adams.flow.control.AbstractControlActor
    public void updateParent() {
        this.m_Actors.setName(getName());
        this.m_Actors.setParent(null);
        this.m_Actors.setParent(getParent());
    }

    public void setLoadActors(AbstractActor[] abstractActorArr) {
        this.m_Actors.setActors(abstractActorArr);
        reset();
        updateParent();
    }

    public AbstractActor[] getLoadActors() {
        return this.m_Actors.getActors();
    }

    public String loadActorsTipText() {
        return "The actors to 'load-balance'.";
    }

    public void setNumThreads(int i) {
        if (i >= -1) {
            this.m_NumThreads = i;
            reset();
        }
    }

    public int getNumThreads() {
        return this.m_NumThreads;
    }

    public String numThreadsTipText() {
        return "The number of threads to use for load-balancing (-1 means one for each core/cpu).";
    }

    public void setUseLocalVariables(boolean z) {
        this.m_UseLocalVariables = z;
        reset();
    }

    public boolean getUseLocalVariables() {
        return this.m_UseLocalVariables;
    }

    public String useLocalVariablesTipText() {
        return "If enabled, then each thread will restrict the scope of variables to be local; initially, a copy of all variables is taken at the  thread's time of creation.";
    }

    public void setUseLocalStorage(boolean z) {
        this.m_UseLocalStorage = z;
        reset();
    }

    public boolean getUseLocalStorage() {
        return this.m_UseLocalStorage;
    }

    public String useLocalStorageTipText() {
        return "If enabled, then each thread will restrict the scope of storage to be local; initially, a shallow copy of the storage is taken at the thread's time of creation.";
    }

    public void setDeepCopy(boolean z) {
        this.m_DeepCopy = z;
        reset();
    }

    public boolean getDeepCopy() {
        return this.m_DeepCopy;
    }

    public String deepCopyTipText() {
        return "If enabled, the local storage gets copied using a deep copy.";
    }

    @Override // adams.flow.core.AbstractActor, adams.core.QuickInfoSupporter
    public String getQuickInfo() {
        String variableForProperty = getOptionManager().getVariableForProperty("numThreads");
        String str = variableForProperty != null ? "threads: " + variableForProperty : this.m_NumThreads < 1 ? "threads: #cores" : "threads: " + this.m_NumThreads;
        if (this.m_UseLocalVariables) {
            str = str + ", local vars";
        }
        if (this.m_UseLocalStorage) {
            str = str + ", local storage";
            if (this.m_DeepCopy) {
                str = str + " (deep copy)";
            }
        }
        return str;
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
    public int size() {
        return this.m_Actors.size();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
    public AbstractActor get(int i) {
        return this.m_Actors.get(i);
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
    public void set(int i, AbstractActor abstractActor) {
        this.m_Actors.set(i, abstractActor);
        reset();
        updateParent();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
    public int indexOf(String str) {
        return this.m_Actors.indexOf(str);
    }

    @Override // adams.flow.core.MutableActorHandler
    public void add(int i, AbstractActor abstractActor) {
        this.m_Actors.add(i, abstractActor);
        reset();
        updateParent();
    }

    @Override // adams.flow.core.MutableActorHandler
    public AbstractActor remove(int i) {
        AbstractActor remove = this.m_Actors.remove(i);
        reset();
        return remove;
    }

    @Override // adams.flow.core.MutableActorHandler
    public void removeAll() {
        this.m_Actors.removeAll();
        reset();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.ActorHandler
    public ActorHandlerInfo getActorHandlerInfo() {
        return this.m_Actors.getActorHandlerInfo();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // adams.flow.core.AbstractActor
    public Hashtable<String, Object> backupState() {
        Hashtable<String, Object> backupState = super.backupState();
        if (this.m_CurrentToken != null) {
            backupState.put("current", this.m_CurrentToken);
        }
        return backupState;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // adams.flow.core.AbstractActor
    public void restoreState(Hashtable<String, Object> hashtable) {
        if (hashtable.containsKey("current")) {
            this.m_CurrentToken = (Token) hashtable.get("current");
            hashtable.remove("current");
        }
        super.restoreState(hashtable);
    }

    @Override // adams.flow.core.InputConsumer
    public Class[] accepts() {
        return this.m_Actors != null ? this.m_Actors.accepts() : new Class[]{Unknown.class};
    }

    protected String setUpLoadActors() {
        Sequence sequence = (Sequence) this.m_Actors.shallowCopy(true);
        sequence.setAllowStandalones(true);
        sequence.setName(getName());
        sequence.setParent(null);
        sequence.setParent(getParent());
        String up = sequence.setUp();
        if (up != null) {
            up = "Failed to setUp() load-actors: " + up;
        }
        sequence.destroy();
        if (up == null) {
            Hashtable<String, Integer> findGlobalTransformers = ActorUtils.findGlobalTransformers(this.m_Actors);
            if (findGlobalTransformers.size() > 0) {
                up = "Load-actors contain global transformers, no load-balancing possible: " + findGlobalTransformers.keySet();
            }
        }
        return up;
    }

    @Override // adams.flow.control.AbstractControlActor
    protected String setUpSubActors() {
        String str = null;
        if (this.m_Actors.active() == 0) {
            str = "No tee-actors provided!";
        }
        if (str == null && !getSkip()) {
            updateParent();
            str = setUpLoadActors();
        }
        return str;
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.AbstractActor
    public String setUp() {
        String up = super.setUp();
        if (up == null) {
            if (this.m_NumThreads == -1) {
                this.m_ActualNumThreads = ProcessUtils.getAvailableProcessors();
            } else if (this.m_NumThreads > 1) {
                this.m_ActualNumThreads = this.m_NumThreads;
            } else {
                this.m_ActualNumThreads = 1;
            }
            this.m_ThreadsSpawned = 0;
            this.m_Executor = new PausableFixedThreadPoolExecutor(this.m_ActualNumThreads);
        }
        return up;
    }

    @Override // adams.flow.core.InputConsumer
    public void input(Token token) {
        this.m_CurrentToken = token;
        while (this.m_Executor.getActiveCount() >= this.m_Executor.getMaximumPoolSize() && !this.m_Stopped) {
            if (isDebugOn()) {
                debug("Waiting for free thread...");
            }
            try {
                synchronized (this.m_Executor) {
                    this.m_Executor.wait(100L);
                }
            } catch (Exception e) {
            }
        }
    }

    @Override // adams.flow.core.AbstractActor
    protected String doExecute() {
        this.m_ThreadsSpawned++;
        final Token token = this.m_CurrentToken;
        final int i = this.m_ThreadsSpawned;
        Sequence sequence = (Sequence) this.m_Actors.shallowCopy(true);
        sequence.setAllowStandalones(true);
        final ThreadShell threadShell = new ThreadShell(sequence, this.m_UseLocalVariables ? getVariables().getClone() : getVariables(), this.m_UseLocalStorage ? this.m_DeepCopy ? (Storage) Utils.deepCopy(getStorageHandler().getStorage()) : getStorageHandler().getStorage().getClone() : getStorageHandler().getStorage());
        threadShell.setName(getName());
        threadShell.setParent(null);
        threadShell.setParent(getParent());
        threadShell.setDebugLevel(getDebugLevel());
        threadShell.setAnnotations(new BaseText("Thread #" + i));
        this.m_ToCleanUp.add(threadShell);
        Callable<String> callable = new Callable<String>() { // from class: adams.flow.control.LoadBalancer.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                String str;
                try {
                    if (LoadBalancer.this.isDebugOn()) {
                        LoadBalancer.this.debug("Starting thread #" + i);
                    }
                    str = threadShell.setUp();
                    if (str == null) {
                        threadShell.input(token);
                        str = threadShell.execute();
                    }
                    if (LoadBalancer.this.isDebugOn()) {
                        LoadBalancer.this.debug("...finished thread #" + i + (str == null ? "" : " with error"));
                    }
                } catch (Exception e) {
                    String str2 = "Failed to execute thread #" + i + ": ";
                    System.err.println(str2);
                    e.printStackTrace();
                    str = str2 + e;
                }
                return str;
            }
        };
        synchronized (this.m_Executor) {
            this.m_Executor.submit(callable);
        }
        return null;
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.AbstractActor
    public void wrapUp() {
        if (this.m_Executor != null) {
            this.m_Executor.shutdown();
            while (!this.m_Executor.isTerminated()) {
                try {
                    this.m_Executor.awaitTermination(100L, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                }
            }
            this.m_Executor = null;
        }
        this.m_CurrentToken = null;
        super.wrapUp();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.core.Pausable
    public void pauseExecution() {
        this.m_Actors.pauseExecution();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.core.Pausable
    public boolean isPaused() {
        return this.m_Actors.isPaused();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.core.Pausable
    public void resumeExecution() {
        this.m_Actors.resumeExecution();
    }

    @Override // adams.flow.core.AbstractActor, adams.core.Stoppable
    public void stopExecution() {
        if (this.m_Executor != null) {
            try {
                synchronized (this.m_Executor) {
                    this.m_Executor.notifyAll();
                    this.m_Executor.shutdownNow();
                }
            } catch (Exception e) {
            }
        }
        super.stopExecution();
    }

    @Override // adams.flow.control.AbstractControlActor, adams.flow.core.AbstractActor, adams.core.CleanUpHandler
    public void cleanUp() {
        this.m_CurrentToken = null;
        for (int i = 0; i < this.m_ToCleanUp.size(); i++) {
            this.m_ToCleanUp.get(i).cleanUp();
        }
        this.m_ToCleanUp.clear();
        super.cleanUp();
    }
}
