Class ProcessingUnit
java.lang.Object
org.apache.uima.collection.impl.cpm.engine.ProcessingUnit
- All Implemented Interfaces:
Runnable
This component executes the processing pipeline. Running in a separate thread it continuously
reads bundles of CAS from the Work Queue filled by
ArtifactProducer
and sends it through
configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order
of CAS Processor listing in the CPE descriptor. The results of analysis produced be CAS
Processors is enqueued onto an output queue that is shared with CAS Consumers.-
Field Summary
Modifier and TypeFieldDescriptionprotected Object[]
The artifact.protected CAS[]
The cas list.protected CPECasPool
The cas pool.protected CAS
The conversion cas.protected CAS[]
The conversion cas array.protected CpeConfiguration
The cpe configuration.protected CPMEngine
The cpm.protected CasConverter
The m converter.protected boolean
The notify listeners.protected long
The num to process.protected BoundedWorkQueue
The output queue.protected LinkedList
The process containers.protected ProcessTrace
The processing unit process trace.protected boolean
The release CAS.protected ArrayList
The status cb L.protected String
The thread id.int
The thread state.protected UimaTimer
The timer.long
The timer 01.long
The timer 02.long
The timer 03.long
The timer 04.long
The timer 05.long
The timer 06.protected BoundedWorkQueue
The work queue. -
Constructor Summary
ConstructorDescriptionInstantiates a new processing unit.ProcessingUnit
(CPMEngine acpm) Instantiates a new processing unit.ProcessingUnit
(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue) Initialize the PU. -
Method Summary
Modifier and TypeMethodDescriptionvoid
Plugs in Listener object used for notifications.protected boolean
analyze
(Object[] aCasObjectList, ProcessTrace pTrTemp) An alternate processing loop designed for the single-threaded CPM.void
cleanup()
Null out fields of this object.boolean
Consumes the input queue to make sure all bundles still there get processede before CPE terminates.void
disableCasProcessor
(int aCasProcessorIndex) Disable a CASProcessor in the processing pipeline.void
disableCasProcessor
(String aCasProcessorName) Alternative method to disable Cas Processor.protected void
doNotifyListeners
(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies all configured listeners.void
enableCasProcessor
(String aCasProcessorName) Enables Cas Processor with a given name.protected boolean
endOfProcessingReached
(long aCount) Returns true if the CPM has finished analyzing the collection.protected long
Returns the size of the CAS object.Returns list of listeners used by this PU for callbacks.getName()
boolean
Checks if is cas consumer pipeline.protected boolean
isProcessorReady
(int aStatus) Check if the CASProcessor status is available for processing.boolean
Returns true if this component is in running state.protected void
notifyListeners
(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.protected void
Process.protected boolean
processNext
(Object[] aCasObjectList, ProcessTrace pTrTemp) Executes the processing pipeline.void
Removes given listener from the list of listeners.void
run()
Starts the Processing Pipeline thread.void
Define a CasConsumer Pipeline identity for this instance.void
setCasPool
(CPECasPool aPool) Sets the cas pool.void
setContainers
(LinkedList processorList) Plugs in a list of Cas Processor containers.void
setCPMEngine
(CPMEngine acpm) Alternative method of providing the reference to the component managing the lifecycle of the CPE.void
setInputQueue
(BoundedWorkQueue aInputQueue) Alternative method of providing a queue from which this PU will read bundle of Cas.void
void
setNotifyListeners
(boolean aDoNotify) Set a flag indicating if notifications should be made via configured Listeners.void
setOutputQueue
(BoundedWorkQueue aOutputQueue) Alternative method of providing a queue where this PU will deposit results of analysis.void
setProcessingUnitProcessTrace
(ProcessTrace aProcessingUnitProcessTrace) Plugs in ProcessTrace object used to collect statistics.void
setReleaseCASFlag
(boolean aFlag) Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing.void
setUimaTimer
(UimaTimer aTimer) Plugs in custom timer used by the PU for getting time.protected void
showMetadata
(Object[] aCasList) Show metadata.void
stopCasProcessors
(boolean kill) Stops all Cas Processors that are part of this PU.
-
Field Details
-
threadState
public int threadStateThe thread state. -
casPool
The cas pool. -
releaseCAS
protected boolean releaseCASThe release CAS. -
cpm
The cpm. -
workQueue
The work queue. -
outputQueue
The output queue. -
mConverter
The m converter. -
processingUnitProcessTrace
The processing unit process trace. -
processContainers
The process containers. -
numToProcess
protected long numToProcessThe num to process. -
casList
The cas list. -
statusCbL
The status cb L. -
notifyListeners
protected boolean notifyListenersThe notify listeners. -
conversionCas
The conversion cas. -
artifact
The artifact. -
conversionCasArray
The conversion cas array. -
timer
The timer. -
threadId
The thread id. -
cpeConfiguration
The cpe configuration. -
timer01
public long timer01The timer 01. -
timer02
public long timer02The timer 02. -
timer03
public long timer03The timer 03. -
timer04
public long timer04The timer 04. -
timer05
public long timer05The timer 05. -
timer06
public long timer06The timer 06.
-
-
Constructor Details
-
ProcessingUnit
public ProcessingUnit()Instantiates a new processing unit. -
ProcessingUnit
Initialize the PU.- Parameters:
acpm
- - component managing life cycle of the CPEaInputQueue
- - queue to read fromaOutputQueue
- - queue to write to
-
ProcessingUnit
Instantiates a new processing unit.- Parameters:
acpm
- the acpm
-
-
Method Details
-
setName
-
getName
-
isRunning
public boolean isRunning()Returns true if this component is in running state.- Returns:
- - true if running, false otherwise
-
setCasConsumerPipelineIdentity
public void setCasConsumerPipelineIdentity()Define a CasConsumer Pipeline identity for this instance. -
isCasConsumerPipeline
public boolean isCasConsumerPipeline()Checks if is cas consumer pipeline.- Returns:
- true, if is cas consumer pipeline
-
setInputQueue
Alternative method of providing a queue from which this PU will read bundle of Cas.- Parameters:
aInputQueue
- - read queue
-
setOutputQueue
Alternative method of providing a queue where this PU will deposit results of analysis.- Parameters:
aOutputQueue
- - queue to write to
-
setCPMEngine
Alternative method of providing the reference to the component managing the lifecycle of the CPE.- Parameters:
acpm
- - reference to the contrlling engine
-
cleanup
public void cleanup()Null out fields of this object. Call this only when this object is no longer needed. -
setNotifyListeners
public void setNotifyListeners(boolean aDoNotify) Set a flag indicating if notifications should be made via configured Listeners.- Parameters:
aDoNotify
- - true if notification is required, false otherwise
-
addStatusCallbackListener
Plugs in Listener object used for notifications.- Parameters:
aListener
- -BaseStatusCallbackListener
instance
-
getCallbackListeners
Returns list of listeners used by this PU for callbacks.- Returns:
- - lif of
BaseStatusCallbackListener
instances
-
removeStatusCallbackListener
Removes given listener from the list of listeners.- Parameters:
aListener
- - object to remove from the list
-
setProcessingUnitProcessTrace
Plugs in ProcessTrace object used to collect statistics.- Parameters:
aProcessingUnitProcessTrace
- - object to compile stats
-
setUimaTimer
Plugs in custom timer used by the PU for getting time.- Parameters:
aTimer
- - custom timer to use
-
setContainers
Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.- Parameters:
processorList
- CASProcessor to be added to the processing pipeline
-
disableCasProcessor
public void disableCasProcessor(int aCasProcessorIndex) Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.- Parameters:
aCasProcessorIndex
- - location in the pipeline of the Cas Processor to delete
-
disableCasProcessor
Alternative method to disable Cas Processor. Uses a name to locate it.- Parameters:
aCasProcessorName
- - a name of the Cas Processor to disable
-
enableCasProcessor
Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.- Parameters:
aCasProcessorName
- - name of the Cas Processor to enable
-
run
public void run()Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline. -
consumeQueue
public boolean consumeQueue()Consumes the input queue to make sure all bundles still there get processede before CPE terminates.- Returns:
- true, if successful
-
processNext
protected boolean processNext(Object[] aCasObjectList, ProcessTrace pTrTemp) throws ResourceProcessException, IOException, CollectionException, AbortCPMException, KillPipelineException Executes the processing pipeline. Given bundle of CAS instances is processed by each CAS Processor in the pipeline. Conversions between different types of CAS Processors is done on the fly. Two types of CAS Processors are currently supported:- CasDataProcessor
- CasObjectProcessor
- Parameters:
aCasObjectList
- - bundle of CAS to analyzepTrTemp
- - object used to aggregate stats- Returns:
- true, if successful
- Throws:
ResourceProcessException
- the resource process exceptionIOException
- Signals that an I/O exception has occurred.CollectionException
- the collection exceptionAbortCPMException
- the abort CPM exceptionKillPipelineException
- the kill pipeline exception
-
notifyListeners
protected void notifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.- Parameters:
aCas
- - object containing an array of OR a single instance of CasisCasObject
- - true if instance of Cas is of type Cas, false otherwiseaEntityProcStatus
- - status object that may contain exceptions and trace
-
doNotifyListeners
protected void doNotifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus) Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.- Parameters:
aCas
- - Cas to pass to listenerisCasObject
- - true is Cas is of type CASaEntityProcStatus
- - status object containing exceptions and trace info
-
setReleaseCASFlag
public void setReleaseCASFlag(boolean aFlag) Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.- Parameters:
aFlag
- - true if this thread should release a CAS when analysis is complete
-
stopCasProcessors
public void stopCasProcessors(boolean kill) Stops all Cas Processors that are part of this PU.- Parameters:
kill
- - true if CPE has been stopped before finishing processing during external stop
-
endOfProcessingReached
protected boolean endOfProcessingReached(long aCount) Returns true if the CPM has finished analyzing the collection.- Parameters:
aCount
- - running total of documents processed so far- Returns:
- - true if CPM has processed all docs, false otherwise
-
process
Process.- Parameters:
anArtifact
- the an artifact
-
showMetadata
Show metadata.- Parameters:
aCasList
- the a cas list
-
isProcessorReady
protected boolean isProcessorReady(int aStatus) Check if the CASProcessor status is available for processing.- Parameters:
aStatus
- the a status- Returns:
- true, if is processor ready
-
getBytes
Returns the size of the CAS object. Currently only CASData is supported.- Parameters:
aCas
- CAS to get the size for- Returns:
- the size of the CAS object. Currently only CASData is supported.
-
setCasPool
Sets the cas pool.- Parameters:
aPool
- the new cas pool
-
analyze
An alternate processing loop designed for the single-threaded CPM.- Parameters:
aCasObjectList
- - a list of CASes to analyzepTrTemp
- - process trace where statistics are added during analysis- Returns:
- true, if successful
- Throws:
Exception
- the exception
-