Class ProcessingUnit

java.lang.Object
org.apache.uima.collection.impl.cpm.engine.ProcessingUnit
All Implemented Interfaces:
Runnable

public class ProcessingUnit extends Object implements Runnable
This component executes the processing pipeline. Running in a separate thread it continuously reads bundles of CAS from the Work Queue filled by ArtifactProducer and sends it through configured CasProcessors. The sequence in which CasProcessors are invoked is defined by the order of CAS Processor listing in the CPE descriptor. The results of analysis produced be CAS Processors is enqueued onto an output queue that is shared with CAS Consumers.
  • Field Details

    • threadState

      public int threadState
      The thread state.
    • casPool

      protected CPECasPool casPool
      The cas pool.
    • releaseCAS

      protected boolean releaseCAS
      The release CAS.
    • cpm

      protected CPMEngine cpm
      The cpm.
    • workQueue

      protected BoundedWorkQueue workQueue
      The work queue.
    • outputQueue

      protected BoundedWorkQueue outputQueue
      The output queue.
    • mConverter

      protected CasConverter mConverter
      The m converter.
    • processingUnitProcessTrace

      protected ProcessTrace processingUnitProcessTrace
      The processing unit process trace.
    • processContainers

      protected LinkedList processContainers
      The process containers.
    • numToProcess

      protected long numToProcess
      The num to process.
    • casList

      protected CAS[] casList
      The cas list.
    • statusCbL

      protected ArrayList statusCbL
      The status cb L.
    • notifyListeners

      protected boolean notifyListeners
      The notify listeners.
    • conversionCas

      protected CAS conversionCas
      The conversion cas.
    • artifact

      protected Object[] artifact
      The artifact.
    • conversionCasArray

      protected CAS[] conversionCasArray
      The conversion cas array.
    • timer

      protected UimaTimer timer
      The timer.
    • threadId

      protected String threadId
      The thread id.
    • cpeConfiguration

      protected CpeConfiguration cpeConfiguration
      The cpe configuration.
    • timer01

      public long timer01
      The timer 01.
    • timer02

      public long timer02
      The timer 02.
    • timer03

      public long timer03
      The timer 03.
    • timer04

      public long timer04
      The timer 04.
    • timer05

      public long timer05
      The timer 05.
    • timer06

      public long timer06
      The timer 06.
  • Constructor Details

    • ProcessingUnit

      public ProcessingUnit()
      Instantiates a new processing unit.
    • ProcessingUnit

      public ProcessingUnit(CPMEngine acpm, BoundedWorkQueue aInputQueue, BoundedWorkQueue aOutputQueue)
      Initialize the PU.
      Parameters:
      acpm - - component managing life cycle of the CPE
      aInputQueue - - queue to read from
      aOutputQueue - - queue to write to
    • ProcessingUnit

      public ProcessingUnit(CPMEngine acpm)
      Instantiates a new processing unit.
      Parameters:
      acpm - the acpm
  • Method Details

    • setName

      public void setName(String aName)
    • getName

      public String getName()
    • isRunning

      public boolean isRunning()
      Returns true if this component is in running state.
      Returns:
      - true if running, false otherwise
    • setCasConsumerPipelineIdentity

      public void setCasConsumerPipelineIdentity()
      Define a CasConsumer Pipeline identity for this instance.
    • isCasConsumerPipeline

      public boolean isCasConsumerPipeline()
      Checks if is cas consumer pipeline.
      Returns:
      true, if is cas consumer pipeline
    • setInputQueue

      public void setInputQueue(BoundedWorkQueue aInputQueue)
      Alternative method of providing a queue from which this PU will read bundle of Cas.
      Parameters:
      aInputQueue - - read queue
    • setOutputQueue

      public void setOutputQueue(BoundedWorkQueue aOutputQueue)
      Alternative method of providing a queue where this PU will deposit results of analysis.
      Parameters:
      aOutputQueue - - queue to write to
    • setCPMEngine

      public void setCPMEngine(CPMEngine acpm)
      Alternative method of providing the reference to the component managing the lifecycle of the CPE.
      Parameters:
      acpm - - reference to the contrlling engine
    • cleanup

      public void cleanup()
      Null out fields of this object. Call this only when this object is no longer needed.
    • setNotifyListeners

      public void setNotifyListeners(boolean aDoNotify)
      Set a flag indicating if notifications should be made via configured Listeners.
      Parameters:
      aDoNotify - - true if notification is required, false otherwise
    • addStatusCallbackListener

      public void addStatusCallbackListener(BaseStatusCallbackListener aListener)
      Plugs in Listener object used for notifications.
      Parameters:
      aListener - - BaseStatusCallbackListener instance
    • getCallbackListeners

      public ArrayList getCallbackListeners()
      Returns list of listeners used by this PU for callbacks.
      Returns:
      - lif of BaseStatusCallbackListener instances
    • removeStatusCallbackListener

      public void removeStatusCallbackListener(BaseStatusCallbackListener aListener)
      Removes given listener from the list of listeners.
      Parameters:
      aListener - - object to remove from the list
    • setProcessingUnitProcessTrace

      public void setProcessingUnitProcessTrace(ProcessTrace aProcessingUnitProcessTrace)
      Plugs in ProcessTrace object used to collect statistics.
      Parameters:
      aProcessingUnitProcessTrace - - object to compile stats
    • setUimaTimer

      public void setUimaTimer(UimaTimer aTimer)
      Plugs in custom timer used by the PU for getting time.
      Parameters:
      aTimer - - custom timer to use
    • setContainers

      public void setContainers(LinkedList processorList)
      Plugs in a list of Cas Processor containers. During processing Cas Processors in this list are called sequentially. Each Cas Processor is contained in the container that is managing errors, counts and totals, and restarts.
      Parameters:
      processorList - CASProcessor to be added to the processing pipeline
    • disableCasProcessor

      public void disableCasProcessor(int aCasProcessorIndex)
      Disable a CASProcessor in the processing pipeline. Locate it by provided index. The disabled Cas Processor remains in the Processing Pipeline, however it is not used furing processing.
      Parameters:
      aCasProcessorIndex - - location in the pipeline of the Cas Processor to delete
    • disableCasProcessor

      public void disableCasProcessor(String aCasProcessorName)
      Alternative method to disable Cas Processor. Uses a name to locate it.
      Parameters:
      aCasProcessorName - - a name of the Cas Processor to disable
    • enableCasProcessor

      public void enableCasProcessor(String aCasProcessorName)
      Enables Cas Processor with a given name. Enabled Cas Processor will immediately begin to receive bundles of Cas.
      Parameters:
      aCasProcessorName - - name of the Cas Processor to enable
    • run

      public void run()
      Starts the Processing Pipeline thread. This thread waits for an artifact to arrive on configured Work Queue. Once the CAS arrives, it is removed from the queue and sent through the analysis pipeline.
      Specified by:
      run in interface Runnable
    • consumeQueue

      public boolean consumeQueue()
      Consumes the input queue to make sure all bundles still there get processede before CPE terminates.
      Returns:
      true, if successful
    • processNext

      protected boolean processNext(Object[] aCasObjectList, ProcessTrace pTrTemp) throws ResourceProcessException, IOException, CollectionException, AbortCPMException, KillPipelineException
      Executes the processing pipeline. Given bundle of CAS instances is processed by each CAS Processor in the pipeline. Conversions between different types of CAS Processors is done on the fly. Two types of CAS Processors are currently supported:
      • CasDataProcessor
      • CasObjectProcessor
      The first operates on instances of CasData the latter operates on instances of CAS. The results produced by CAS Processors are added to the output queue.
      Parameters:
      aCasObjectList - - bundle of CAS to analyze
      pTrTemp - - object used to aggregate stats
      Returns:
      true, if successful
      Throws:
      ResourceProcessException - the resource process exception
      IOException - Signals that an I/O exception has occurred.
      CollectionException - the collection exception
      AbortCPMException - the abort CPM exception
      KillPipelineException - the kill pipeline exception
    • notifyListeners

      protected void notifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
      Notifies Listeners of the fact that the pipeline has finished processing the current set Cas'es.
      Parameters:
      aCas - - object containing an array of OR a single instance of Cas
      isCasObject - - true if instance of Cas is of type Cas, false otherwise
      aEntityProcStatus - - status object that may contain exceptions and trace
    • doNotifyListeners

      protected void doNotifyListeners(Object aCas, boolean isCasObject, EntityProcessStatus aEntityProcStatus)
      Notifies all configured listeners. Makes sure that appropriate type of Cas is sent to the listener. Convertions take place to ensure compatibility.
      Parameters:
      aCas - - Cas to pass to listener
      isCasObject - - true is Cas is of type CAS
      aEntityProcStatus - - status object containing exceptions and trace info
    • setReleaseCASFlag

      public void setReleaseCASFlag(boolean aFlag)
      Called by the CPMEngine during setup to indicate that this thread is supposed to release a CAS at the end of processing. This is typically done for Cas Consumer thread, but in configurations not using Cas Consumers The processing pipeline may also release the CAS.
      Parameters:
      aFlag - - true if this thread should release a CAS when analysis is complete
    • stopCasProcessors

      public void stopCasProcessors(boolean kill)
      Stops all Cas Processors that are part of this PU.
      Parameters:
      kill - - true if CPE has been stopped before finishing processing during external stop
    • endOfProcessingReached

      protected boolean endOfProcessingReached(long aCount)
      Returns true if the CPM has finished analyzing the collection.
      Parameters:
      aCount - - running total of documents processed so far
      Returns:
      - true if CPM has processed all docs, false otherwise
    • process

      protected void process(Object anArtifact)
      Process.
      Parameters:
      anArtifact - the an artifact
    • showMetadata

      protected void showMetadata(Object[] aCasList)
      Show metadata.
      Parameters:
      aCasList - the a cas list
    • isProcessorReady

      protected boolean isProcessorReady(int aStatus)
      Check if the CASProcessor status is available for processing.
      Parameters:
      aStatus - the a status
      Returns:
      true, if is processor ready
    • getBytes

      protected long getBytes(Object aCas)
      Returns the size of the CAS object. Currently only CASData is supported.
      Parameters:
      aCas - CAS to get the size for
      Returns:
      the size of the CAS object. Currently only CASData is supported.
    • setCasPool

      public void setCasPool(CPECasPool aPool)
      Sets the cas pool.
      Parameters:
      aPool - the new cas pool
    • analyze

      protected boolean analyze(Object[] aCasObjectList, ProcessTrace pTrTemp) throws Exception
      An alternate processing loop designed for the single-threaded CPM.
      Parameters:
      aCasObjectList - - a list of CASes to analyze
      pTrTemp - - process trace where statistics are added during analysis
      Returns:
      true, if successful
      Throws:
      Exception - the exception