Class ProcessingContainer_Impl
java.lang.Object
org.apache.uima.resource.Resource_ImplBase
org.apache.uima.collection.impl.base_cpm.container.ProcessingContainer
org.apache.uima.collection.impl.cpm.container.ProcessingContainer_Impl
- All Implemented Interfaces:
Runnable
,CasProcessorController
,RunnableContainer
,ConfigurableResource
,Resource
Manages a pool of CasProcessor instances. Provides access to CasProcessor instance to Processing
Thread. Processing threads check out an instance of Cas Processor and when done invoking its
process() method return it back to pool. The container aggregates counts and totals on behalf of
all instances of Cas Processor. It also manages error and restart thresholds for Cas Processors
as a group. Errors are aggregated for all instances of Cas Processor as a group NOT individually.
The container takes appropriate actions when threshold are exceeded. What action is taken depends
on declaritive specification in the cpe descriptor.
-
Field Summary
Modifier and TypeFieldDescriptionThe cas processor pool.The failed cas processor list.Fields inherited from interface org.apache.uima.collection.impl.base_cpm.container.CasProcessorController
DISABLED, INITIALIZED, KILLED, NOTINITIALIZED, RUNNING
Fields inherited from interface org.apache.uima.resource.Resource
PARAM_AGGREGATE_SOFA_MAPPINGS, PARAM_CONFIG_MANAGER, PARAM_CONFIG_PARAM_SETTINGS, PARAM_EXTERNAL_OVERRIDE_SETTINGS, PARAM_PERFORMANCE_TUNING_SETTINGS, PARAM_RESOURCE_MANAGER, PARAM_UIMA_CONTEXT
-
Constructor Summary
ConstructorDescriptionProcessingContainer_Impl
(CasProcessorConfiguration aCasProcessorConfig, ProcessingResourceMetaData aMetaData, ServiceProxyPool aCasProcessorPool) Initialize container with CasProcessor configuration and pool containing instances of CasProcessor instances. -
Method Summary
Modifier and TypeMethodDescriptionboolean
Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown.void
addBytesIn
(long aBytesIn) Aggregate total bytes ingested by the CasProcessor.void
addBytesOut
(long aBytesOut) Aggregate total bytes processed by this CasProcessor.void
Add an arbitrary object and bind it to a given name.void
destroy()
Destroy instances of CasProcessors managed by this container.int
Return the up todate number of aborts recorded by the container.Returns all stats aggregate during the CPM run.long
Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.long
Returns total number of bytes processed so far by all CasProcessor instances managed by this container.Returns available instance of the CasProcessor from the instance pool.Returns CasProcessor configuration object.getConfigParameterValue
(String aParamName) Looks up the value of a configuration parameter.getConfigParameterValue
(String aGroupName, String aParamName) Looks up the value of a configuration parameter in a group.Returns deployer object used to launch the CasProcessor.long
Gets the fetch time.int
Returns number of filtered Cas'es.Deprecated.Returns id of the last entity processed by the CasProcessor.Returns component's input/output capabilities.Gets the metadata that describes thisResource
.getName()
Returns the name of this container.getPool()
long
Returns number of entities processed so far.long
Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.int
Returns total number of all CasProcessor restarts.int
Return the up todate number of retries recorded by the container.Return an abject identified with a given name.int
Returns the current status of the CasProcessor.long
Returns total time spent in process().void
incrementAbortCount
(int aCount) Increment number of aborted Cas'es due to inability to process the Cas.void
incrementCasProcessorErrors
(Throwable aThrowable) This routine determines what to do with an exception thrown during the CasProcessor processing.void
incrementFilteredCount
(int aCount) Increments number of CAS'es filtered by the CasProcessor.void
incrementProcessed
(int aIncrement) Increment processed.void
incrementRestartCount
(int aCount) Increment number of times the casProcessor was restarted due to failures.void
incrementRetryCount
(int aCount) Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems.void
incrementStat
(String aStatName, Integer aStat) Increment a value of a given stat.void
incrementTotalTime
(long aTime) Increments total time spend in the process() method of the CasProcessor.boolean
initialize
(ResourceSpecifier aSpecifier, Map aAdditionalParams) Initializes thisResource
from aResourceSpecifier
.boolean
Determines if instances of CasProcessor managed by this container are abortable.boolean
isEndOfBatch
(CasProcessor aCasProcessor, int aProcessedSize) boolean
Deprecated.boolean
isLocal()
Deprecated.boolean
isPaused()
boolean
isRemote()
Deprecated.boolean
void
logAbortedCases
(Object[] abortedCasList) Logs Cas'es that could not be processed.void
pause()
Pauses the container until resumed.boolean
processCas
(Object[] aCasList) Returns true if the Cas bundles should be processed by the CasProcessor.void
Instructs this Resource to re-read its configuration parameter settings.void
releaseCasProcessor
(CasProcessor aCasProcessor) Returns a given casProcessor instance back to the pool.void
void
resume()
void
run()
void
setCasProcessorDeployer
(CasProcessorDeployer aDeployer) Plug in deployer object used to launch/deploy the CasProcessor instance.void
setConfigParameterValue
(String aParamName, Object aValue) Sets the value of a configuration parameter.void
setConfigParameterValue
(String aGroupName, String aParamName, Object aValue) Sets the value of a configuration parameter in a group.void
setLastCas
(Object aCasObject) Deprecated.void
setLastProcessedEntityId
(String aEntityId) Copies id of the last entity processed by the CasProcessor.void
setMetadata
(ProcessingResourceMetaData aMetadata) Sets component's input/output capabilities.void
setProcessed
(long aProcessedCount) Used when recovering from checkpoint, sets the total number of entities before CPE stopped.void
setRemaining
(long aRemainingCount) Copies number of entities the CasProcessor has yet to process.void
setSingleFencedService
(boolean aSingleFencedInstance) void
setStatus
(int aStatus) Changes the status of the CasProcessor as a group.void
start()
Deprecated.void
stop()
Deprecated.Methods inherited from class org.apache.uima.resource.Resource_ImplBase
getCasManager, getLogger, getRelativePathResolver, getResourceManager, getUimaContext, getUimaContextAdmin, loadUserClass, loadUserClassOrThrow, setContextHolder, setContextHolderX, setLogger, setMetaData, withContextHolder
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.apache.uima.resource.Resource
getLogger, getResourceManager, getUimaContext, getUimaContextAdmin, setLogger
-
Field Details
-
casProcessorPool
The cas processor pool. -
failedCasProcessorList
The failed cas processor list.
-
-
Constructor Details
-
ProcessingContainer_Impl
public ProcessingContainer_Impl(CasProcessorConfiguration aCasProcessorConfig, ProcessingResourceMetaData aMetaData, ServiceProxyPool aCasProcessorPool) throws ResourceConfigurationException Initialize container with CasProcessor configuration and pool containing instances of CasProcessor instances.- Parameters:
aCasProcessorConfig
- - CasProcessor configuration as defined in cpe descriptoraMetaData
- the a meta dataaCasProcessorPool
- - pool of CasProcessor instances- Throws:
ResourceConfigurationException
- the resource configuration exception
-
-
Method Details
-
getMetadata
Returns component's input/output capabilities.- Returns:
- the metadata
-
setMetadata
Sets component's input/output capabilities.- Specified by:
setMetadata
in classProcessingContainer
- Parameters:
aMetadata
- component capabilities
-
setCasProcessorDeployer
Plug in deployer object used to launch/deploy the CasProcessor instance. Used for restarts.- Specified by:
setCasProcessorDeployer
in classProcessingContainer
- Parameters:
aDeployer
- - object responsible for deploying/launching CasProcessor
-
getDeployer
Returns deployer object used to launch the CasProcessor.- Specified by:
getDeployer
in classProcessingContainer
- Returns:
- - CasProcessorDeployer - deployer object
-
logAbortedCases
Logs Cas'es that could not be processed.- Specified by:
logAbortedCases
in classProcessingContainer
- Parameters:
abortedCasList
- - an arrar of Cas'es that could not be processed by this CasProcessor
-
getBytesIn
public long getBytesIn()Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.- Specified by:
getBytesIn
in classProcessingContainer
- Returns:
- - bytes processed
-
addBytesIn
public void addBytesIn(long aBytesIn) Aggregate total bytes ingested by the CasProcessor.- Specified by:
addBytesIn
in classProcessingContainer
- Parameters:
aBytesIn
- - number of ingested bytes
-
getBytesOut
public long getBytesOut()Returns total number of bytes processed so far by all CasProcessor instances managed by this container.- Specified by:
getBytesOut
in classProcessingContainer
- Returns:
- - bytes processed
-
addBytesOut
public void addBytesOut(long aBytesOut) Aggregate total bytes processed by this CasProcessor.- Specified by:
addBytesOut
in classProcessingContainer
- Parameters:
aBytesOut
- the a bytes out
-
incrementRestartCount
public void incrementRestartCount(int aCount) Increment number of times the casProcessor was restarted due to failures.- Specified by:
incrementRestartCount
in classProcessingContainer
- Parameters:
aCount
- - restart count
-
getRestartCount
public int getRestartCount()Returns total number of all CasProcessor restarts.- Specified by:
getRestartCount
in classProcessingContainer
- Returns:
- number of restarts
-
incrementRetryCount
public void incrementRetryCount(int aCount) Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems.- Specified by:
incrementRetryCount
in classProcessingContainer
- Parameters:
aCount
- - failure count
-
getRetryCount
public int getRetryCount()Return the up todate number of retries recorded by the container.- Specified by:
getRetryCount
in classProcessingContainer
- Returns:
- - retry count
-
incrementAbortCount
public void incrementAbortCount(int aCount) Increment number of aborted Cas'es due to inability to process the Cas.- Specified by:
incrementAbortCount
in classProcessingContainer
- Parameters:
aCount
- - number of aborts while processing Cas'es
-
getAbortCount
public int getAbortCount()Return the up todate number of aborts recorded by the container.- Specified by:
getAbortCount
in classProcessingContainer
- Returns:
- - number of failed attempts to analyze CAS'es
-
incrementFilteredCount
public void incrementFilteredCount(int aCount) Increments number of CAS'es filtered by the CasProcessor. Filtered CAS'es dont contain required features. Features that are required by the Cas Processor to perform analysis. Dependant feateurs are defined in the filter expression in the CPE descriptor- Specified by:
incrementFilteredCount
in classProcessingContainer
- Parameters:
aCount
- - number of filtered Cas'es
-
getFilteredCount
public int getFilteredCount()Returns number of filtered Cas'es.- Specified by:
getFilteredCount
in classProcessingContainer
- Returns:
- # of filtered Cas'es
-
getRemaining
public long getRemaining()Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.- Specified by:
getRemaining
in classProcessingContainer
- Returns:
- Number of entities yet to be processed
-
setRemaining
public void setRemaining(long aRemainingCount) Copies number of entities the CasProcessor has yet to process.- Specified by:
setRemaining
in classProcessingContainer
- Parameters:
aRemainingCount
- - number of entities to process
-
setLastProcessedEntityId
Copies id of the last entity processed by the CasProcessor.- Specified by:
setLastProcessedEntityId
in classProcessingContainer
- Parameters:
aEntityId
- - id of the entity
-
getLastProcessedEntityId
Returns id of the last entity processed by the CasProcessor.- Specified by:
getLastProcessedEntityId
in classProcessingContainer
- Returns:
- - id of entity
-
setLastCas
Deprecated.Copies the last Cas Processed.- Specified by:
setLastCas
in classProcessingContainer
- Parameters:
aCasObject
- the new last cas
-
getLastCas
Deprecated.Returns the last Cas processed.- Specified by:
getLastCas
in classProcessingContainer
- Returns:
- the last cas
-
incrementProcessed
public void incrementProcessed(int aIncrement) Increment processed.- Parameters:
aIncrement
- the a increment
-
setProcessed
public void setProcessed(long aProcessedCount) Used when recovering from checkpoint, sets the total number of entities before CPE stopped.- Specified by:
setProcessed
in classProcessingContainer
- Parameters:
aProcessedCount
- - number of entities processed before CPE stopped
-
getProcessed
public long getProcessed()Returns number of entities processed so far.- Specified by:
getProcessed
in classProcessingContainer
- Returns:
- - processed - number of entities processed
-
resetRestartCount
public void resetRestartCount()- Specified by:
resetRestartCount
in classProcessingContainer
-
incrementTotalTime
public void incrementTotalTime(long aTime) Increments total time spend in the process() method of the CasProcessor.- Specified by:
incrementTotalTime
in classProcessingContainer
- Parameters:
aTime
- - total time in process()
-
getTotalTime
public long getTotalTime()Returns total time spent in process().- Specified by:
getTotalTime
in classProcessingContainer
- Returns:
- - number of millis spent in process()
-
abortCPMOnError
public boolean abortCPMOnError()Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown. It looks at the value of the action attribute of the <errorRateThreshold> element in the cpe descriptor.- Specified by:
abortCPMOnError
in classProcessingContainer
- Returns:
- - true if the CPE should stop processing, false otherwise
-
incrementCasProcessorErrors
This routine determines what to do with an exception thrown during the CasProcessor processing. It interprets given exception and throws a new one according to configuration specified in the CPE descriptor. It examines provided thresholds and determines if the CPE should continue to run, if it should disable the CasProcessor (and all its instances), or disregard the error and continue.- Specified by:
incrementCasProcessorErrors
in classProcessingContainer
- Parameters:
aThrowable
- - exception to examine- Throws:
Exception
- the exception
-
isEndOfBatch
public boolean isEndOfBatch(CasProcessor aCasProcessor, int aProcessedSize) throws ResourceProcessException, IOException - Specified by:
isEndOfBatch
in classProcessingContainer
- Throws:
ResourceProcessException
IOException
-
processCas
Returns true if the Cas bundles should be processed by the CasProcessor. This routine checks for existance of dependent featues defined in the filter expression defined for the CasProcessor in the cpe descriptor. Currently this is done on per bundle basis. Meaning that all Cas'es must contain required features. If even one Cas does not have them, the entire bundle is skipped.- Specified by:
processCas
in classProcessingContainer
- Parameters:
aCasList
- - bundle containing instances of CAS- Returns:
- true, if successful
-
getCasProcessorConfiguration
Returns CasProcessor configuration object. This object represents xml configuration defined in the <casProcessor> section of the cpe descriptor.- Specified by:
getCasProcessorConfiguration
in classProcessingContainer
- Returns:
CasProcessorConfiguration
instance
-
start
Deprecated.Start.- Specified by:
start
in interfaceRunnableContainer
-
stop
Deprecated.Stop.- Specified by:
stop
in interfaceRunnableContainer
-
getCasProcessor
Returns available instance of the CasProcessor from the instance pool. It will wait indefinitely until an instance is available.- Specified by:
getCasProcessor
in interfaceCasProcessorController
- Returns:
- the cas processor
-
releaseCasProcessor
Returns a given casProcessor instance back to the pool.- Specified by:
releaseCasProcessor
in classProcessingContainer
- Parameters:
aCasProcessor
- - an instance of CasProcessor to return back to the pool- See Also:
-
getStatus
public int getStatus()Returns the current status of the CasProcessor.- Specified by:
getStatus
in interfaceCasProcessorController
- Returns:
- the status
-
setStatus
public void setStatus(int aStatus) Changes the status of the CasProcessor as a group.- Specified by:
setStatus
in interfaceCasProcessorController
- Parameters:
aStatus
- - new status
-
isLocal
Deprecated.Checks if is local.- Specified by:
isLocal
in interfaceCasProcessorController
- Returns:
- true, if is local
-
isRemote
Deprecated.Checks if is remote.- Specified by:
isRemote
in interfaceCasProcessorController
- Returns:
- true, if is remote
-
isIntegrated
Deprecated.Checks if is integrated.- Specified by:
isIntegrated
in interfaceCasProcessorController
- Returns:
- true, if is integrated
-
isAbortable
public boolean isAbortable()Determines if instances of CasProcessor managed by this container are abortable. Abortable CasProcessor's action attribute in the <errorRateThreshold> element has a value of 'disable'.- Specified by:
isAbortable
in interfaceCasProcessorController
- Returns:
- true if CasProcessor can be disabled
-
initialize
public boolean initialize(ResourceSpecifier aSpecifier, Map aAdditionalParams) throws ResourceInitializationException Description copied from interface:Resource
Initializes thisResource
from aResourceSpecifier
. Applications do not need to call this method. It is called automatically by theResourceFactory
and cannot be called a second time.- Specified by:
initialize
in interfaceResource
- Overrides:
initialize
in classResource_ImplBase
- Parameters:
aSpecifier
- specifies how to create a resource or locate an existing resource service.aAdditionalParams
- a Map containing additional parameters. May benull
if there are no parameters. Each class that implements this interface can decide what additional parameters it supports.- Returns:
- true if and only if initialization completed successfully. Returns false if the given
ResourceSpecifier
is not of an appropriate type for this Resource. If theResourceSpecifier
is of an appropriate type but is invalid or if some other failure occurs, an exception should be thrown. - Throws:
ResourceInitializationException
- if a failure occurs during initialization.
-
destroy
public void destroy()Destroy instances of CasProcessors managed by this container. Before destroying the instance, this method notifies it with CollectionProcessComplete so that the component finalizes its logic and does appropriate cleanup before shutdown.- Specified by:
destroy
in interfaceResource
- Overrides:
destroy
in classResource_ImplBase
-
run
public void run() -
getConfigParameterValue
Description copied from interface:ConfigurableResource
Looks up the value of a configuration parameter. This method will only return the value of a parameter that is not defined in any group.This method returns
null
if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returnsnull
if there is no declared configuration parameter with the specified name.- Specified by:
getConfigParameterValue
in interfaceConfigurableResource
- Parameters:
aParamName
- the name of a parameter that is not in any group- Returns:
- the value of the parameter with name
aParamName
,null
is either the parameter does not exist or it has not been assigned a value.
-
getConfigParameterValue
Description copied from interface:ConfigurableResource
Looks up the value of a configuration parameter in a group. If the parameter has no value assigned within the group, fallback strategies will be followed.This method returns
null
if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returnsnull
if there is no declared configuration parameter with the specified name.- Specified by:
getConfigParameterValue
in interfaceConfigurableResource
- Parameters:
aGroupName
- the name of a configuration group. If the group name isnull
, this method will return the same value asgetParameterValue(String)
.aParamName
- the name of a parameter in the group- Returns:
- the value of the parameter in group
aGroupName
with nameaParamName
,,null
is either the parameter does not exist or it has not been assigned a value.
-
setConfigParameterValue
Description copied from interface:ConfigurableResource
Sets the value of a configuration parameter. This only works for a parameter that is not defined in any group. Note that there is no guarantee that the change will take effect untilConfigurableResource.reconfigure()
is called.- Specified by:
setConfigParameterValue
in interfaceConfigurableResource
- Parameters:
aParamName
- the name of a parameter that is not in any groupaValue
- the value to assign to the parameter
-
setConfigParameterValue
Description copied from interface:ConfigurableResource
Sets the value of a configuration parameter in a group. Note that there is no guarantee that the change will take effect untilConfigurableResource.reconfigure()
is called.- Specified by:
setConfigParameterValue
in interfaceConfigurableResource
- Parameters:
aGroupName
- the name of a configuration group. If this parameter isnull
, this method will have the same effect assetParameterValue(String,Object)
.aParamName
- the name of a parameter in the groupaValue
- the value to assign to the parameter.
-
reconfigure
Description copied from interface:ConfigurableResource
Instructs this Resource to re-read its configuration parameter settings.- Specified by:
reconfigure
in interfaceConfigurableResource
- Throws:
ResourceConfigurationException
- if the configuration is not valid
-
getName
Returns the name of this container. It is the name of the Cas Processor.- Specified by:
getName
in classProcessingContainer
- Returns:
- the name
-
getMetaData
Description copied from interface:Resource
Gets the metadata that describes thisResource
.- Specified by:
getMetaData
in interfaceResource
- Overrides:
getMetaData
in classResource_ImplBase
- Returns:
- an object containing all metadata for this resource.
-
incrementStat
Increment a value of a given stat.- Specified by:
incrementStat
in classProcessingContainer
- Parameters:
aStatName
- the a stat nameaStat
- the a stat
-
addStat
Add an arbitrary object and bind it to a given name.- Specified by:
addStat
in classProcessingContainer
- Parameters:
aStatName
- the a stat nameaStat
- the a stat
-
getStat
Return an abject identified with a given name.- Specified by:
getStat
in classProcessingContainer
- Parameters:
aStatName
- the a stat name- Returns:
- the stat
-
getAllStats
Returns all stats aggregate during the CPM run.- Specified by:
getAllStats
in classProcessingContainer
- Returns:
- a map of all stats aggregated during the CPM run
-
pause
public void pause()Pauses the container until resumed. The CPM will pause to the Container while it is trying to re-connect to a shared remote service. While the Container is paused getCasProcessor() will not be allowed to return a new CasProcessor. All other methods are accessible and will function fine.- Specified by:
pause
in classProcessingContainer
-
resume
public void resume()- Specified by:
resume
in classProcessingContainer
-
isPaused
public boolean isPaused()- Specified by:
isPaused
in classProcessingContainer
-
getPool
- Specified by:
getPool
in classProcessingContainer
-
setSingleFencedService
public void setSingleFencedService(boolean aSingleFencedInstance) - Specified by:
setSingleFencedService
in classProcessingContainer
-
isSingleFencedService
public boolean isSingleFencedService()- Specified by:
isSingleFencedService
in classProcessingContainer
-
getFetchTime
public long getFetchTime()Gets the fetch time.- Returns:
- the fetch time
-