Class CPMEngine
java.lang.Object
org.apache.uima.collection.impl.cpm.engine.CPMEngine
- All Implemented Interfaces:
- Runnable
Responsible for creating and initializing processing threads. This instance manages the
 life-cycle of the CPE components. It exposes API for plugging in components programmatically
 instead of declaratively. Running in its own thread, this components creates separate Processing
 Pipelines for Analysis Engines and CAS Consumers, launches configured CollectionReader and
 attaches all of those components to form a pipeline from source to sink. The Collection Reader
 feeds Processing Threads containing Analysis Engines, and Analysis Engines feed results of
 analysis to CAS Consumers.
- 
Field SummaryFieldsModifier and TypeFieldDescriptionThe CAS pool.protected booleanThe is running.protected booleanThe killed.final ObjectThe lock for pause.protected BoundedWorkQueueThe output queue.protected booleanThe pause.protected Future<?>[]protected ProcessingUnit[]The processing units.protected booleanThe stopped.protected BoundedWorkQueueThe work queue.
- 
Constructor SummaryConstructorsConstructorDescriptionCPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData) Initializes Collection Processing Engine.
- 
Method SummaryModifier and TypeMethodDescriptionvoidaddCasProcessor(CasProcessor aCasProcessor) Adds a CASProcessor to the processing pipeline.voidaddCasProcessor(CasProcessor aCasProcessor, int aIndex) Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline.voidAdds the status callback listener.voidDeprecated.static voidcallEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps) Internal use only, public for cross package access. switches class loaders and locks casvoidcleanup()Null out fields of this object.voidStarts CASProcessor containers one a time.voiddisableCasProcessor(int aCasProcessorIndex) Disable a CASProcessor in the processing pipeline.voiddisableCasProcessor(String aCasProcessorName) Disable a CASProcessor in the processing pipeline.booleanDrop cas on exception.voidenableCasProcessor(String aCasProcessorName) Disable a CASProcessor in the processing pipeline.Returns a list of All Processing Containers.Returns a list of ALL callback listeners currently registered with the CPM.Returns all CASProcesors in the processing pipeline.protected CpeConfigurationGets the cpe config.Gets the last doc repository.Returns Id of the last document processed.Gets the performance tuning settings.intGets the pool size.Returns a list of Processing Containers for Analysis Engines.Progress[]Returns collectionReader progress.getStats()Returns CPE stats.intReturns number of processing threads.voidinvalidateCASes(CAS[] aCASList) Invalidate CA ses.booleanReturns if the CPE was killed hard.booleanisKilled()Returns true if this engine has been killed.booleanisParallizable(CasProcessor aProcessor, String aCpName) Determines if a given Cas Processor is parallelizable.booleanisPaused()Returns a global flag indicating if this Thread is in pause state.booleanReturns if the CPM should pause when exception occurs.booleanReturns a global flag indicating if this Thread is in processing state.voidkillIt()Kill CPM the hard way.voidpauseIt()Pauses this thread.voidpipelineKilled(String aPipelineThreadName) Callback method used to notify the engine when a processing pipeline is killed due to excessive errors.voidredeployAnalysisEngine(ProcessingContainer aProcessingContainer) Deploys CasProcessor and associates it with aProcessingContainer.voidreleaseCASes(CAS[] aCASList) Releases given cases back to pool.voidremoveCasProcessor(int aCasProcessorIndex) Removes a CASProcessor from the processing pipeline.voidUnregisters given listener from the CPM.voidresumeIt()Resumes this thread.voidrun()Using given configuration creates and starts CPE processing pipeline.voidRuns the CPE in a single thread without queues.voidsetCollectionReader(BaseCollectionReader aCollectionReader) Sets CollectionReader to use during processing.voidsetConcurrentThreadSize(int aConcurrentThreadSize) Defines number of threads executing the processing pipeline concurrently.voidsetInputQueueSize(int aInputQueueSize) Defines the size of inputQueue.voidsetNumToProcess(long aNumToProcess) Defines the size of the batch.voidsetOutputQueueSize(int aOutputQueueSize) Defines the size of outputQueue.voidsetPauseOnException(boolean aPause) Sets a global flag to indicate to the CPM that it should pause whenever exception occurs.voidsetPerformanceTuningSettings(Properties aPerformanceTuningSettings) Overrides the default performance tuning settings for this CPE.voidsetPoolSize(int aPoolSize) Defines the size of Cas Pool.voidSets the process controller adapter.voidPlugs in a map where the engine stores perfomance info at runtime.voidstopCasProcessors(boolean kill) Stops All Cas Processors and optionally changes the status according to kill flag.voidstopIt()Stops execution of the Processing Pipeline and this thread.
- 
Field Details- 
casPoolThe CAS pool.
- 
lockForPauseThe lock for pause.
- 
pauseprotected boolean pauseThe pause.
- 
isRunningprotected volatile boolean isRunningThe is running.
- 
stoppedprotected volatile boolean stoppedThe stopped.
- 
killedprotected volatile boolean killedThe killed.
- 
processingUnitsThe processing units.
- 
processingUnitResults
- 
outputQueueThe output queue.
- 
workQueueThe work queue.
 
- 
- 
Constructor Details- 
CPMEnginepublic CPMEngine(CPMExecutorService aExecutorService, CPEFactory aCpeFactory, ProcessTrace aProcTr, CheckpointData aCheckpointData) throws Exception Initializes Collection Processing Engine. Assigns this thread and all processing threads created by this component to a common Thread Group.- Parameters:
- aExecutorService- - contains all CPM related threads
- aCpeFactory- - CPE factory object responsible for parsing cpe descriptor and creating components
- aProcTr- - instance of the ProcessTrace where the CPM accumulates stats
- aCheckpointData- - checkpoint object facillitating restart from the last known point
- Throws:
- Exception- the exception
 
 
- 
- 
Method Details- 
getProcessingContainersReturns a list of Processing Containers for Analysis Engines. Each CasProcessor is managed by its own container.- Returns:
- the processing containers
 
- 
getAllProcessingContainersReturns a list of All Processing Containers. Each CasProcessor is managed by its own container.- Returns:
- the all processing containers
 
- 
getThreadCountReturns number of processing threads.- Returns:
- - number of processing threads
- Throws:
- ResourceConfigurationException- -
 
- 
setStatsPlugs in a map where the engine stores perfomance info at runtime.- Parameters:
- aMap- - map for runtime stats and totals
 
- 
getStatsReturns CPE stats.- Returns:
- Map containing CPE stats
 
- 
setPauseOnExceptionpublic void setPauseOnException(boolean aPause) Sets a global flag to indicate to the CPM that it should pause whenever exception occurs.- Parameters:
- aPause- - true if pause is requested on exception, false otherwise
 
- 
isPauseOnExceptionpublic boolean isPauseOnException()Returns if the CPM should pause when exception occurs.- Returns:
- - true if the CPM pauses when exception occurs, false otherwise
 
- 
setInputQueueSizepublic void setInputQueueSize(int aInputQueueSize) Defines the size of inputQueue. The queue stores this many entities read from the CollectionReader. Every processing pipeline thread will read its entities from this input queue. The CollectionReader is decoupled from the consumer of entities, and continuously replenishes the input queue.- Parameters:
- aInputQueueSize- the size of the batch.
 
- 
setOutputQueueSizepublic void setOutputQueueSize(int aOutputQueueSize) Defines the size of outputQueue. The queue stores this many entities enqueued by every processing pipeline thread.The results of analysis are dumped into this queue for consumer thread to consume its contents.- Parameters:
- aOutputQueueSize- the size of the batch.
 
- 
setPoolSizepublic void setPoolSize(int aPoolSize) Defines the size of Cas Pool.- Parameters:
- aPoolSize- the size of the Cas pool.
 
- 
getPoolSizepublic int getPoolSize()Gets the pool size.- Returns:
- the pool size
 
- 
setConcurrentThreadSizepublic void setConcurrentThreadSize(int aConcurrentThreadSize) Defines number of threads executing the processing pipeline concurrently.- Parameters:
- aConcurrentThreadSize- the size of the batch.
 
- 
addStatusCallbackListenerAdds the status callback listener.- Parameters:
- aListener- the a listener
 
- 
getCallbackListenersReturns a list of ALL callback listeners currently registered with the CPM.- Returns:
- -
 
- 
removeStatusCallbackListenerUnregisters given listener from the CPM.- Parameters:
- aListener- - instance of- BaseStatusCallbackListenerto unregister
 
- 
isKilledpublic boolean isKilled()Returns true if this engine has been killed.- Returns:
- true if this engine has been killed
 
- 
killItpublic void killIt()Kill CPM the hard way. None of the entities in the queues will be processed. This methof simply empties all queues and at the end adds EOFToken to the work queue so that all threads go away.
- 
isHardKilledpublic boolean isHardKilled()Returns if the CPE was killed hard. Soft kill allows the CPE to finish processing all in-transit CASes. Hard kill causes the CPM to stop processing and to throw away all unprocessed CASes from its queues.- Returns:
- true if the CPE was killed hard
 
- 
asynchStopDeprecated.Asynch stop.
- 
stopItpublic void stopIt()Stops execution of the Processing Pipeline and this thread.
- 
isParallizableDetermines if a given Cas Processor is parallelizable. Remote Cas Processors are by default parallelizable. For integrated and managed the CPM consults Cas Processor's descriptor to determine if it is parallelizable.- Parameters:
- aProcessor- - Cas Processor being checked
- aCpName- - name of the CP
- Returns:
- - true if CP is parallelizable, false otherwise
- Throws:
- Exception- -
 
- 
addCasProcessorAdds a CASProcessor to the processing pipeline. If a CasProcessor already exists and its status=DISABLED this method will re-enable the CasProcesser.- Parameters:
- aCasProcessor- CASProcessor to be added to the processing pipeline
- Throws:
- ResourceConfigurationException- the resource configuration exception
 
- 
addCasProcessorpublic void addCasProcessor(CasProcessor aCasProcessor, int aIndex) throws ResourceConfigurationException Adds a CASProcessor to the processing pipeline at a given place in the processing pipeline.- Parameters:
- aCasProcessor- CASProcessor to be added to the processing pipeline
- aIndex- - insertion point for a given CasProcessor
- Throws:
- ResourceConfigurationException- the resource configuration exception
 
- 
removeCasProcessorpublic void removeCasProcessor(int aCasProcessorIndex) Removes a CASProcessor from the processing pipeline.- Parameters:
- aCasProcessorIndex- - CasProcessor position in processing pipeline
 
- 
disableCasProcessorpublic void disableCasProcessor(int aCasProcessorIndex) Disable a CASProcessor in the processing pipeline.- Parameters:
- aCasProcessorIndex- CASProcessor to be added to the processing pipeline
 
- 
disableCasProcessorDisable a CASProcessor in the processing pipeline.- Parameters:
- aCasProcessorName- CASProcessor to be added to the processing pipeline
 
- 
enableCasProcessorDisable a CASProcessor in the processing pipeline.- Parameters:
- aCasProcessorName- CASProcessor to be added to the processing pipeline
 
- 
getCasProcessorsReturns all CASProcesors in the processing pipeline.- Returns:
- the cas processors
 
- 
redeployAnalysisEngineDeploys CasProcessor and associates it with aProcessingContainer.- Parameters:
- aProcessingContainer- the a processing container
- Throws:
- Exception- the exception
 
- 
deployCasProcessorsStarts CASProcessor containers one a time. During this phase the container deploys a TAE as local,remote, or integrated CasProcessor.- Throws:
- AbortCPMException- the abort CPM exception
 
- 
isRunningpublic boolean isRunning()Returns a global flag indicating if this Thread is in processing state.- Returns:
- true, if is running
 
- 
isPausedpublic boolean isPaused()Returns a global flag indicating if this Thread is in pause state.- Returns:
- true, if is paused
 
- 
pauseItpublic void pauseIt()Pauses this thread.
- 
resumeItpublic void resumeIt()Resumes this thread.
- 
setCollectionReaderSets CollectionReader to use during processing.- Parameters:
- aCollectionReader- aCollectionReader
 
- 
setNumToProcesspublic void setNumToProcess(long aNumToProcess) Defines the size of the batch.- Parameters:
- aNumToProcess- the new num to process
 
- 
getLastProcessedDocIdReturns Id of the last document processed.- Returns:
- the last processed doc id
 
- 
getLastDocRepositoryGets the last doc repository.- Returns:
- the last doc repository
 
- 
pipelineKilledCallback method used to notify the engine when a processing pipeline is killed due to excessive errors. This method is only called if the processing pipeline is unable to acquire a connection to remote service and when configuration indicates 'kill-pipeline' as the action to take on excessive errors. When running with multiple pipelines, routine decrements a global pipeline counter and tests if there are no more left. When all pipelines are killed as described above, the CPM needs to terminate. Since pipelines are prematurely killed, there are artifacts (CASes) in the work queue. These must be removed from the work queue and disposed of (released) back to the CAS pool so that the Collection Reader thread properly exits.- Parameters:
- aPipelineThreadName- - name of the pipeline thread exiting from its run() method
 
- 
runpublic void run()Using given configuration creates and starts CPE processing pipeline. It is either single-threaded or a multi-threaded pipeline. Which is actually used depends on the configuration defined in the CPE descriptor. In multi-threaded mode, the CPE starts number of threads: 1) ArtifactProducer Thread - this is a thread containing a Collection Reader. It runs asynchronously and it fills a WorkQueue with CASes. 2) CasConsumer Thread - this is an optional thread. It is only instantiated if there Cas Consumers in the pipeline 3) Processing Threads - one or more processing threads, configured identically, that are performing analysis How many threads are started depends on configuration in CPE descriptor All threads started here are placed in a ThreadGroup. This provides a catch-all mechanism for errors that may occur in the CPM. If error is thrown, the ThreadGroup is notified. The ThreadGroup than notifies all registers listeners to give an application a chance to report the error and do necessary cleanup. This routine manages all the threads and makes sure that all of them are cleaned up before returning. The ThreadGroup must cleanup all threads under its control otherwise a memory leak occurs. Even those threads that are not started must be cleaned as they end up in the ThreadGroup when instantiated. The code uses number of state variables to make decisions during cleanup.
- 
cleanuppublic void cleanup()Null out fields of this object. Call this only when this object is no longer needed.
- 
stopCasProcessorsStops All Cas Processors and optionally changes the status according to kill flag.- Parameters:
- kill- - true if CPE has been stopped before completing normally
- Throws:
- CasProcessorDeploymentException- the cas processor deployment exception
 
- 
getProgressReturns collectionReader progress.- Returns:
- the progress
 
- 
invalidateCASesInvalidate CA ses.- Parameters:
- aCASList- the a CAS list
 
- 
releaseCASesReleases given cases back to pool.- Parameters:
- aCASList- - cas list to release
 
- 
setPerformanceTuningSettingsOverrides the default performance tuning settings for this CPE. This affects things such as CAS sizing parameters.- Parameters:
- aPerformanceTuningSettings- the new settings
- See Also:
 
- 
getPerformanceTuningSettingsGets the performance tuning settings.- Returns:
- Returns the PerformanceTuningSettings.
 
- 
setProcessControllerAdapterSets the process controller adapter.- Parameters:
- aPca- the new process controller adapter
 
- 
getCpeConfigGets the cpe config.- Returns:
- the cpe config
- Throws:
- Exception- the exception
 
- 
dropCasOnExceptionpublic boolean dropCasOnException()Drop cas on exception.- Returns:
- true, if successful
 
- 
runSingleThreadedRuns the CPE in a single thread without queues.- Throws:
- Exception- -
 
- 
callEntityProcessCompleteWithCASpublic static void callEntityProcessCompleteWithCAS(StatusCallbackListener statCL, CAS cas, EntityProcessStatus eps) Internal use only, public for cross package access. switches class loaders and locks cas- Parameters:
- statCL- status call back listener
- cas- CAS
- eps- entity process status
 
 
-