Package adams.flow.control
Class LoadBalancer
-
- All Implemented Interfaces:
AdditionalInformationHandler
,CleanUpHandler
,DeepCopyOperator
,Destroyable
,GlobalInfoSupporter
,LoggingLevelHandler
,LoggingSupporter
,OptionHandler
,Pausable
,QuickInfoSupporter
,ShallowCopySupporter<Actor>
,SizeOfHandler
,Stoppable
,StoppableWithFeedback
,VariablesInspectionHandler
,FlowPauseStateListener
,VariableChangeListener
,Actor
,ActorHandler
,ControlActor
,ErrorHandler
,Flushable
,InputConsumer
,MutableActorHandler
,SubFlowWrapUp
,Serializable
,Comparable
public class LoadBalancer extends AbstractControlActor implements InputConsumer, MutableActorHandler, DeepCopyOperator
Runs the specified 'load actor' in as many separate threads as specified with the 'num-threads' parameter.
Always uses a copy of the variables.
NB: no callable transformer or sink allowed.
Input/output:
- accepts:
adams.flow.core.Unknown
-logging-level <OFF|SEVERE|WARNING|INFO|CONFIG|FINE|FINER|FINEST> (property: loggingLevel) The logging level for outputting errors and debugging output. default: WARNING
-name <java.lang.String> (property: name) The name of the actor. default: LoadBalancer
-annotation <adams.core.base.BaseAnnotation> (property: annotations) The annotations to attach to this actor. default:
-skip <boolean> (property: skip) If set to true, transformation is skipped and the input token is just forwarded as it is. default: false
-stop-flow-on-error <boolean> (property: stopFlowOnError) If set to true, the flow execution at this level gets stopped in case this actor encounters an error; the error gets propagated; useful for critical actors. default: false
-silent <boolean> (property: silent) If enabled, then no errors are output in the console; Note: the enclosing actor handler must have this enabled as well. default: false
-load <adams.flow.core.Actor> [-load ...] (property: loadActors) The actors to 'load-balance'. default: adams.flow.sink.Null
-num-threads <int> (property: numThreads) The number of threads to use for load-balancing (-1 means one for each core /cpu). default: 0
-use-local-storage <boolean> (property: useLocalStorage) If enabled, then each thread will restrict the scope of storage to be local; initially, a shallow copy of the storage is taken at the thread's time of creation. default: false
-deep-copy <boolean> (property: deepCopy) If enabled, the local storage gets copied using a deep copy. default: false
- Author:
- fracpete (fracpete at waikato dot ac dot nz)
- See Also:
- Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description static String
BACKUP_CURRENT
the key for storing the current token in the backup.protected Sequence
m_Actors
the actors to "balance".protected int
m_ActualNumThreads
the actual number of threads to use.protected Token
m_CurrentToken
the input token.protected boolean
m_DeepCopy
whether to perform a deep copy of the storage.protected PausableFixedThreadPoolExecutor
m_Executor
the executor service to use for parallel execution.protected int
m_NumThreads
the number of threads to use for parallel execution.protected int
m_ThreadsSpawned
the count of threads spawned so far.protected List<Actor>
m_ToCleanUp
the actors to clean up in the end.protected boolean
m_UseLocalStorage
whether to use local storage.-
Fields inherited from class adams.flow.control.AbstractControlActor
m_Compatibility, m_PauseStateManager, m_SetUpSubActors
-
Fields inherited from class adams.flow.core.AbstractActor
m_Annotations, m_BackupState, m_DetectedObjectVariables, m_DetectedVariables, m_ErrorHandler, m_Executed, m_Executing, m_ExecutionListeningSupporter, m_FullName, m_LoggingPrefix, m_Name, m_Parent, m_ScopeHandler, m_Self, m_Silent, m_Skip, m_StopFlowOnError, m_StopMessage, m_Stopped, m_StorageHandler, m_VariablesUpdated
-
Fields inherited from class adams.core.option.AbstractOptionHandler
m_OptionManager
-
Fields inherited from class adams.core.logging.LoggingObject
m_Logger, m_LoggingIsEnabled, m_LoggingLevel
-
Fields inherited from interface adams.flow.core.Actor
FILE_EXTENSION, FILE_EXTENSION_GZ
-
-
Constructor Summary
Constructors Constructor Description LoadBalancer()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description Class[]
accepts()
Returns the class that the consumer accepts.String
add(int index, Actor actor)
Inserts the actor at the given position.String
add(Actor actor)
Inserts the actor at the end.protected Hashtable<String,Object>
backupState()
Backs up the current state of the actor before update the variables.void
cleanUp()
Cleans up after the execution has finished.Token
currentInput()
Returns the current input token, if any.String
deepCopyTipText()
Returns the tip text for this property.void
defineOptions()
Adds options to the internal list of options.protected String
doExecute()
Executes the flow item.void
flushExecution()
Stops the processing of tokens without stopping the flow.protected void
forceVariables(Variables value)
Updates the Variables instance in use.Actor
get(int index)
Returns the actor at the given position.ActorHandlerInfo
getActorHandlerInfo()
Returns some information about the actor handler, e.g., whether it can contain standalones and the actor execution.boolean
getDeepCopy()
Returns whether to perform a deep copy for the local storage.Actor[]
getLoadActors()
Returns the load actors.int
getNumThreads()
Returns the number of threads in use.String
getQuickInfo()
Returns a quick info about the actor, which will be displayed in the GUI.boolean
getUseLocalStorage()
Returns whether to use user local storage scope.String
globalInfo()
Returns a string describing the object.boolean
hasInput()
Returns whether an input token is currently present.int
indexOf(String actor)
Returns the index of the actor.protected void
initialize()
Initializes the members.void
input(Token token)
The method that accepts the input token and then processes it.String
loadActorsTipText()
Returns the tip text for this property.String
numThreadsTipText()
Returns the tip text for this property.Actor
remove(int index)
Removes the actor at the given position and returns the removed object.void
removeAll()
Removes all actors.protected void
restoreState(Hashtable<String,Object> state)
Restores the state of the actor before the variables got updated.String
set(int index, Actor actor)
Sets the actor at the given position.void
setDeepCopy(boolean value)
Sets whether to perform a deep copy for the local storage.void
setLoadActors(Actor[] value)
Sets the load actors.void
setLoggingLevel(LoggingLevel value)
Sets the logging level.void
setNumThreads(int value)
Sets the number of threads to use.void
setSkip(boolean value)
Sets whether the transformation is skipped or not.String
setUp()
Initializes the sub-actors for flow execution.protected String
setUpLoadActors()
Gets called in the setUp() method.protected String
setUpSubActors()
Performs the setUp of the sub-actors.void
setUseLocalStorage(boolean value)
Sets whether to use local storage scope.int
size()
Returns the size of the group.void
stopExecution()
Stops the execution.protected void
updateParent()
Updates the parent of all actors in this group.String
useLocalStorageTipText()
Returns the tip text for this property.void
wrapUp()
Finishes up the execution.-
Methods inherited from class adams.flow.control.AbstractControlActor
active, canInspectOptions, check, destroy, firstActive, firstInputConsumer, flowPauseStateChanged, isPaused, isSubFlowWrappedUp, lastActive, pauseExecution, preExecute, reset, resumeExecution, setParent, wrapUpSubFlow
-
Methods inherited from class adams.flow.core.AbstractActor
annotationsTipText, canPerformSetUpCheck, compareTo, configureLogger, equals, execute, finalUpdateVariables, findVariables, findVariables, forCommandLine, forName, forName, getAdditionalInformation, getAnnotations, getDefaultName, getDetectedVariables, getErrorHandler, getFlowActors, getFlowExecutionListeningSupporter, getFullName, getName, getNextSibling, getParent, getParentComponent, getPreviousSibling, getRoot, getScopeHandler, getSilent, getSkip, getStopFlowOnError, getStopMessage, getStorageHandler, getVariables, handleError, handleException, hasErrorHandler, hasStopMessage, index, isBackedUp, isExecuted, isExecuting, isFinished, isHeadless, isStopped, nameTipText, performSetUpChecks, performVariableChecks, postExecute, pruneBackup, pruneBackup, setAnnotations, setErrorHandler, setName, setSilent, setStopFlowOnError, setVariables, shallowCopy, shallowCopy, silentTipText, sizeOf, skipTipText, stopExecution, stopFlowOnErrorTipText, updateDetectedVariables, updatePrefix, updateVariables, variableChanged
-
Methods inherited from class adams.core.option.AbstractOptionHandler
cleanUpOptions, finishInit, getDefaultLoggingLevel, getOptionManager, loggingLevelTipText, newOptionManager, toCommandLine, toString
-
Methods inherited from class adams.core.logging.LoggingObject
getLogger, getLoggingLevel, initializeLogging, isLoggingEnabled
-
Methods inherited from class java.lang.Object
clone, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface adams.flow.core.Actor
compareTo, destroy, equals, execute, findVariables, getAnnotations, getDefaultName, getDetectedVariables, getErrorHandler, getFlowExecutionListeningSupporter, getFullName, getName, getNextSibling, getParent, getParentComponent, getPreviousSibling, getRoot, getScopeHandler, getSilent, getSkip, getStopFlowOnError, getStopMessage, getStorageHandler, getVariables, handleError, hasErrorHandler, hasStopMessage, index, isExecuted, isFinished, isHeadless, isStopped, setAnnotations, setErrorHandler, setName, setParent, setSilent, setStopFlowOnError, setVariables, shallowCopy, shallowCopy, sizeOf, stopExecution, toCommandLine, variableChanged
-
Methods inherited from interface adams.flow.core.ActorHandler
check, firstActive, lastActive
-
Methods inherited from interface adams.core.AdditionalInformationHandler
getAdditionalInformation
-
Methods inherited from interface adams.core.logging.LoggingLevelHandler
getLoggingLevel
-
Methods inherited from interface adams.core.logging.LoggingSupporter
getLogger, isLoggingEnabled
-
Methods inherited from interface adams.core.option.OptionHandler
cleanUpOptions, getOptionManager
-
Methods inherited from interface adams.core.VariablesInspectionHandler
canInspectOptions
-
-
-
-
Field Detail
-
BACKUP_CURRENT
public static final String BACKUP_CURRENT
the key for storing the current token in the backup.- See Also:
- Constant Field Values
-
m_Actors
protected Sequence m_Actors
the actors to "balance".
-
m_CurrentToken
protected transient Token m_CurrentToken
the input token.
-
m_NumThreads
protected int m_NumThreads
the number of threads to use for parallel execution.
-
m_ActualNumThreads
protected int m_ActualNumThreads
the actual number of threads to use.
-
m_Executor
protected PausableFixedThreadPoolExecutor m_Executor
the executor service to use for parallel execution.
-
m_ThreadsSpawned
protected int m_ThreadsSpawned
the count of threads spawned so far.
-
m_UseLocalStorage
protected boolean m_UseLocalStorage
whether to use local storage.
-
m_DeepCopy
protected boolean m_DeepCopy
whether to perform a deep copy of the storage.
-
-
Method Detail
-
globalInfo
public String globalInfo()
Returns a string describing the object.- Specified by:
globalInfo
in interfaceGlobalInfoSupporter
- Specified by:
globalInfo
in classAbstractOptionHandler
- Returns:
- a description suitable for displaying in the gui
-
defineOptions
public void defineOptions()
Adds options to the internal list of options.- Specified by:
defineOptions
in interfaceOptionHandler
- Overrides:
defineOptions
in classAbstractActor
-
initialize
protected void initialize()
Initializes the members.- Overrides:
initialize
in classAbstractControlActor
-
updateParent
protected void updateParent()
Updates the parent of all actors in this group.- Overrides:
updateParent
in classAbstractControlActor
-
setLoggingLevel
public void setLoggingLevel(LoggingLevel value)
Sets the logging level.- Specified by:
setLoggingLevel
in interfaceLoggingLevelHandler
- Overrides:
setLoggingLevel
in classAbstractOptionHandler
- Parameters:
value
- the level
-
setSkip
public void setSkip(boolean value)
Sets whether the transformation is skipped or not.- Specified by:
setSkip
in interfaceActor
- Overrides:
setSkip
in classAbstractActor
- Parameters:
value
- true if transformation is to be skipped
-
setLoadActors
public void setLoadActors(Actor[] value)
Sets the load actors.- Parameters:
value
- the actors
-
getLoadActors
public Actor[] getLoadActors()
Returns the load actors.- Returns:
- the actors
-
loadActorsTipText
public String loadActorsTipText()
Returns the tip text for this property.- Returns:
- tip text for this property suitable for displaying in the GUI or for listing the options.
-
setNumThreads
public void setNumThreads(int value)
Sets the number of threads to use.- Parameters:
value
- the number of threads
-
getNumThreads
public int getNumThreads()
Returns the number of threads in use.- Returns:
- the number of threads
-
numThreadsTipText
public String numThreadsTipText()
Returns the tip text for this property.- Returns:
- tip text for this property suitable for displaying in the GUI or for listing the options.
-
setUseLocalStorage
public void setUseLocalStorage(boolean value)
Sets whether to use local storage scope.- Parameters:
value
- if true local storage scope will be used
-
getUseLocalStorage
public boolean getUseLocalStorage()
Returns whether to use user local storage scope.- Returns:
- true if local storage scope enabled
-
useLocalStorageTipText
public String useLocalStorageTipText()
Returns the tip text for this property.- Returns:
- tip text for this property suitable for displaying in the GUI or for listing the options.
-
setDeepCopy
public void setDeepCopy(boolean value)
Sets whether to perform a deep copy for the local storage.- Specified by:
setDeepCopy
in interfaceDeepCopyOperator
- Parameters:
value
- if true a deep copy for the local storage will be performed
-
getDeepCopy
public boolean getDeepCopy()
Returns whether to perform a deep copy for the local storage.- Specified by:
getDeepCopy
in interfaceDeepCopyOperator
- Returns:
- true if a deep copy is performed
-
deepCopyTipText
public String deepCopyTipText()
Returns the tip text for this property.- Specified by:
deepCopyTipText
in interfaceDeepCopyOperator
- Returns:
- tip text for this property suitable for displaying in the GUI or for listing the options.
-
getQuickInfo
public String getQuickInfo()
Returns a quick info about the actor, which will be displayed in the GUI.- Specified by:
getQuickInfo
in interfaceActor
- Specified by:
getQuickInfo
in interfaceQuickInfoSupporter
- Overrides:
getQuickInfo
in classAbstractActor
- Returns:
- null if no info available, otherwise short string
-
forceVariables
protected void forceVariables(Variables value)
Updates the Variables instance in use.
Use with caution!- Overrides:
forceVariables
in classAbstractControlActor
- Parameters:
value
- the instance to use
-
size
public int size()
Returns the size of the group.- Specified by:
size
in interfaceActorHandler
- Specified by:
size
in classAbstractControlActor
- Returns:
- the number of actors
-
get
public Actor get(int index)
Returns the actor at the given position.- Specified by:
get
in interfaceActorHandler
- Specified by:
get
in classAbstractControlActor
- Parameters:
index
- the position- Returns:
- the actor
-
set
public String set(int index, Actor actor)
Sets the actor at the given position.- Specified by:
set
in interfaceActorHandler
- Specified by:
set
in classAbstractControlActor
- Parameters:
index
- the positionactor
- the actor to set at this position- Returns:
- null if successful, otherwise error message
-
indexOf
public int indexOf(String actor)
Returns the index of the actor.- Specified by:
indexOf
in interfaceActorHandler
- Specified by:
indexOf
in classAbstractControlActor
- Parameters:
actor
- the name of the actor to look for- Returns:
- the index of -1 if not found
-
add
public String add(Actor actor)
Inserts the actor at the end.- Specified by:
add
in interfaceMutableActorHandler
- Parameters:
actor
- the actor to insert- Returns:
- null if successful, otherwise error message
-
add
public String add(int index, Actor actor)
Inserts the actor at the given position.- Specified by:
add
in interfaceMutableActorHandler
- Parameters:
index
- the positionactor
- the actor to insert- Returns:
- null if successful, otherwise error message
-
remove
public Actor remove(int index)
Removes the actor at the given position and returns the removed object.- Specified by:
remove
in interfaceMutableActorHandler
- Parameters:
index
- the position- Returns:
- the removed actor
-
removeAll
public void removeAll()
Removes all actors.- Specified by:
removeAll
in interfaceMutableActorHandler
-
getActorHandlerInfo
public ActorHandlerInfo getActorHandlerInfo()
Returns some information about the actor handler, e.g., whether it can contain standalones and the actor execution.- Specified by:
getActorHandlerInfo
in interfaceActorHandler
- Specified by:
getActorHandlerInfo
in classAbstractControlActor
- Returns:
- the info
-
backupState
protected Hashtable<String,Object> backupState()
Backs up the current state of the actor before update the variables.- Overrides:
backupState
in classAbstractActor
- Returns:
- the backup
- See Also:
AbstractActor.updateVariables()
,AbstractActor.restoreState(Hashtable)
-
restoreState
protected void restoreState(Hashtable<String,Object> state)
Restores the state of the actor before the variables got updated.- Overrides:
restoreState
in classAbstractActor
- Parameters:
state
- the backup of the state to restore from- See Also:
AbstractActor.updateVariables()
,AbstractActor.backupState()
-
accepts
public Class[] accepts()
Returns the class that the consumer accepts.- Specified by:
accepts
in interfaceInputConsumer
- Returns:
- adams.flow.core.Unknown.class
-
setUpLoadActors
protected String setUpLoadActors()
Gets called in the setUp() method. Returns null if load-actors are fine, otherwise error message.- Returns:
- null if everything OK, otherwise error message
-
setUpSubActors
protected String setUpSubActors()
Performs the setUp of the sub-actors.- Overrides:
setUpSubActors
in classAbstractControlActor
- Returns:
- null if everything is fine, otherwise error message
-
setUp
public String setUp()
Initializes the sub-actors for flow execution.- Specified by:
setUp
in interfaceActor
- Overrides:
setUp
in classAbstractControlActor
- Returns:
- null if everything is fine, otherwise error message
- See Also:
AbstractActor.reset()
-
input
public void input(Token token)
The method that accepts the input token and then processes it.- Specified by:
input
in interfaceInputConsumer
- Parameters:
token
- the token to accept and process
-
hasInput
public boolean hasInput()
Returns whether an input token is currently present.- Specified by:
hasInput
in interfaceInputConsumer
- Returns:
- true if input token present
-
currentInput
public Token currentInput()
Returns the current input token, if any.- Specified by:
currentInput
in interfaceInputConsumer
- Returns:
- the input token, null if none present
-
doExecute
protected String doExecute()
Executes the flow item.- Specified by:
doExecute
in classAbstractActor
- Returns:
- null if everything is fine, otherwise error message
-
wrapUp
public void wrapUp()
Finishes up the execution.- Specified by:
wrapUp
in interfaceActor
- Overrides:
wrapUp
in classAbstractControlActor
-
flushExecution
public void flushExecution()
Stops the processing of tokens without stopping the flow.- Specified by:
flushExecution
in interfaceActorHandler
- Specified by:
flushExecution
in interfaceFlushable
- Overrides:
flushExecution
in classAbstractControlActor
-
stopExecution
public void stopExecution()
Stops the execution. No message set.- Specified by:
stopExecution
in interfaceActor
- Specified by:
stopExecution
in interfaceStoppable
- Overrides:
stopExecution
in classAbstractControlActor
-
cleanUp
public void cleanUp()
Cleans up after the execution has finished. Also removes graphical components.- Specified by:
cleanUp
in interfaceActor
- Specified by:
cleanUp
in interfaceCleanUpHandler
- Overrides:
cleanUp
in classAbstractControlActor
-
-