Class ProcessingContainer_Impl

All Implemented Interfaces:
Runnable, CasProcessorController, RunnableContainer, ConfigurableResource, Resource

public class ProcessingContainer_Impl extends ProcessingContainer implements RunnableContainer
Manages a pool of CasProcessor instances. Provides access to CasProcessor instance to Processing Thread. Processing threads check out an instance of Cas Processor and when done invoking its process() method return it back to pool. The container aggregates counts and totals on behalf of all instances of Cas Processor. It also manages error and restart thresholds for Cas Processors as a group. Errors are aggregated for all instances of Cas Processor as a group NOT individually. The container takes appropriate actions when threshold are exceeded. What action is taken depends on declaritive specification in the cpe descriptor.
  • Field Details

    • casProcessorPool

      public ServiceProxyPool casProcessorPool
      The cas processor pool.
    • failedCasProcessorList

      public LinkedList failedCasProcessorList
      The failed cas processor list.
  • Constructor Details

  • Method Details

    • getMetadata

      public ProcessingResourceMetaData getMetadata()
      Returns component's input/output capabilities.
      Returns:
      the metadata
    • setMetadata

      public void setMetadata(ProcessingResourceMetaData aMetadata)
      Sets component's input/output capabilities.
      Specified by:
      setMetadata in class ProcessingContainer
      Parameters:
      aMetadata - component capabilities
    • setCasProcessorDeployer

      public void setCasProcessorDeployer(CasProcessorDeployer aDeployer)
      Plug in deployer object used to launch/deploy the CasProcessor instance. Used for restarts.
      Specified by:
      setCasProcessorDeployer in class ProcessingContainer
      Parameters:
      aDeployer - - object responsible for deploying/launching CasProcessor
    • getDeployer

      public CasProcessorDeployer getDeployer()
      Returns deployer object used to launch the CasProcessor.
      Specified by:
      getDeployer in class ProcessingContainer
      Returns:
      - CasProcessorDeployer - deployer object
    • logAbortedCases

      public void logAbortedCases(Object[] abortedCasList)
      Logs Cas'es that could not be processed.
      Specified by:
      logAbortedCases in class ProcessingContainer
      Parameters:
      abortedCasList - - an arrar of Cas'es that could not be processed by this CasProcessor
    • getBytesIn

      public long getBytesIn()
      Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.
      Specified by:
      getBytesIn in class ProcessingContainer
      Returns:
      - bytes processed
    • addBytesIn

      public void addBytesIn(long aBytesIn)
      Aggregate total bytes ingested by the CasProcessor.
      Specified by:
      addBytesIn in class ProcessingContainer
      Parameters:
      aBytesIn - - number of ingested bytes
    • getBytesOut

      public long getBytesOut()
      Returns total number of bytes processed so far by all CasProcessor instances managed by this container.
      Specified by:
      getBytesOut in class ProcessingContainer
      Returns:
      - bytes processed
    • addBytesOut

      public void addBytesOut(long aBytesOut)
      Aggregate total bytes processed by this CasProcessor.
      Specified by:
      addBytesOut in class ProcessingContainer
      Parameters:
      aBytesOut - the a bytes out
    • incrementRestartCount

      public void incrementRestartCount(int aCount)
      Increment number of times the casProcessor was restarted due to failures.
      Specified by:
      incrementRestartCount in class ProcessingContainer
      Parameters:
      aCount - - restart count
    • getRestartCount

      public int getRestartCount()
      Returns total number of all CasProcessor restarts.
      Specified by:
      getRestartCount in class ProcessingContainer
      Returns:
      number of restarts
    • incrementRetryCount

      public void incrementRetryCount(int aCount)
      Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems.
      Specified by:
      incrementRetryCount in class ProcessingContainer
      Parameters:
      aCount - - failure count
    • getRetryCount

      public int getRetryCount()
      Return the up todate number of retries recorded by the container.
      Specified by:
      getRetryCount in class ProcessingContainer
      Returns:
      - retry count
    • incrementAbortCount

      public void incrementAbortCount(int aCount)
      Increment number of aborted Cas'es due to inability to process the Cas.
      Specified by:
      incrementAbortCount in class ProcessingContainer
      Parameters:
      aCount - - number of aborts while processing Cas'es
    • getAbortCount

      public int getAbortCount()
      Return the up todate number of aborts recorded by the container.
      Specified by:
      getAbortCount in class ProcessingContainer
      Returns:
      - number of failed attempts to analyze CAS'es
    • incrementFilteredCount

      public void incrementFilteredCount(int aCount)
      Increments number of CAS'es filtered by the CasProcessor. Filtered CAS'es dont contain required features. Features that are required by the Cas Processor to perform analysis. Dependant feateurs are defined in the filter expression in the CPE descriptor
      Specified by:
      incrementFilteredCount in class ProcessingContainer
      Parameters:
      aCount - - number of filtered Cas'es
    • getFilteredCount

      public int getFilteredCount()
      Returns number of filtered Cas'es.
      Specified by:
      getFilteredCount in class ProcessingContainer
      Returns:
      # of filtered Cas'es
    • getRemaining

      public long getRemaining()
      Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.
      Specified by:
      getRemaining in class ProcessingContainer
      Returns:
      Number of entities yet to be processed
    • setRemaining

      public void setRemaining(long aRemainingCount)
      Copies number of entities the CasProcessor has yet to process.
      Specified by:
      setRemaining in class ProcessingContainer
      Parameters:
      aRemainingCount - - number of entities to process
    • setLastProcessedEntityId

      public void setLastProcessedEntityId(String aEntityId)
      Copies id of the last entity processed by the CasProcessor.
      Specified by:
      setLastProcessedEntityId in class ProcessingContainer
      Parameters:
      aEntityId - - id of the entity
    • getLastProcessedEntityId

      public String getLastProcessedEntityId()
      Returns id of the last entity processed by the CasProcessor.
      Specified by:
      getLastProcessedEntityId in class ProcessingContainer
      Returns:
      - id of entity
    • setLastCas

      @Deprecated public void setLastCas(Object aCasObject)
      Deprecated.
      Copies the last Cas Processed.
      Specified by:
      setLastCas in class ProcessingContainer
      Parameters:
      aCasObject - the new last cas
    • getLastCas

      @Deprecated public Object getLastCas()
      Deprecated.
      Returns the last Cas processed.
      Specified by:
      getLastCas in class ProcessingContainer
      Returns:
      the last cas
    • incrementProcessed

      public void incrementProcessed(int aIncrement)
      Increment processed.
      Parameters:
      aIncrement - the a increment
    • setProcessed

      public void setProcessed(long aProcessedCount)
      Used when recovering from checkpoint, sets the total number of entities before CPE stopped.
      Specified by:
      setProcessed in class ProcessingContainer
      Parameters:
      aProcessedCount - - number of entities processed before CPE stopped
    • getProcessed

      public long getProcessed()
      Returns number of entities processed so far.
      Specified by:
      getProcessed in class ProcessingContainer
      Returns:
      - processed - number of entities processed
    • resetRestartCount

      public void resetRestartCount()
      Specified by:
      resetRestartCount in class ProcessingContainer
    • incrementTotalTime

      public void incrementTotalTime(long aTime)
      Increments total time spend in the process() method of the CasProcessor.
      Specified by:
      incrementTotalTime in class ProcessingContainer
      Parameters:
      aTime - - total time in process()
    • getTotalTime

      public long getTotalTime()
      Returns total time spent in process().
      Specified by:
      getTotalTime in class ProcessingContainer
      Returns:
      - number of millis spent in process()
    • abortCPMOnError

      public boolean abortCPMOnError()
      Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown. It looks at the value of the action attribute of the <errorRateThreshold> element in the cpe descriptor.
      Specified by:
      abortCPMOnError in class ProcessingContainer
      Returns:
      - true if the CPE should stop processing, false otherwise
    • incrementCasProcessorErrors

      public void incrementCasProcessorErrors(Throwable aThrowable) throws Exception
      This routine determines what to do with an exception thrown during the CasProcessor processing. It interprets given exception and throws a new one according to configuration specified in the CPE descriptor. It examines provided thresholds and determines if the CPE should continue to run, if it should disable the CasProcessor (and all its instances), or disregard the error and continue.
      Specified by:
      incrementCasProcessorErrors in class ProcessingContainer
      Parameters:
      aThrowable - - exception to examine
      Throws:
      Exception - the exception
    • isEndOfBatch

      public boolean isEndOfBatch(CasProcessor aCasProcessor, int aProcessedSize) throws ResourceProcessException, IOException
      Specified by:
      isEndOfBatch in class ProcessingContainer
      Throws:
      ResourceProcessException
      IOException
    • processCas

      public boolean processCas(Object[] aCasList)
      Returns true if the Cas bundles should be processed by the CasProcessor. This routine checks for existance of dependent featues defined in the filter expression defined for the CasProcessor in the cpe descriptor. Currently this is done on per bundle basis. Meaning that all Cas'es must contain required features. If even one Cas does not have them, the entire bundle is skipped.
      Specified by:
      processCas in class ProcessingContainer
      Parameters:
      aCasList - - bundle containing instances of CAS
      Returns:
      true, if successful
    • getCasProcessorConfiguration

      public CasProcessorConfiguration getCasProcessorConfiguration()
      Returns CasProcessor configuration object. This object represents xml configuration defined in the <casProcessor> section of the cpe descriptor.
      Specified by:
      getCasProcessorConfiguration in class ProcessingContainer
      Returns:
      CasProcessorConfiguration instance
    • start

      @Deprecated public void start()
      Deprecated.
      Start.
      Specified by:
      start in interface RunnableContainer
    • stop

      @Deprecated public void stop()
      Deprecated.
      Stop.
      Specified by:
      stop in interface RunnableContainer
    • getCasProcessor

      public CasProcessor getCasProcessor()
      Returns available instance of the CasProcessor from the instance pool. It will wait indefinitely until an instance is available.
      Specified by:
      getCasProcessor in interface CasProcessorController
      Returns:
      the cas processor
    • releaseCasProcessor

      public void releaseCasProcessor(CasProcessor aCasProcessor)
      Returns a given casProcessor instance back to the pool.
      Specified by:
      releaseCasProcessor in class ProcessingContainer
      Parameters:
      aCasProcessor - - an instance of CasProcessor to return back to the pool
      See Also:
    • getStatus

      public int getStatus()
      Returns the current status of the CasProcessor.
      Specified by:
      getStatus in interface CasProcessorController
      Returns:
      the status
    • setStatus

      public void setStatus(int aStatus)
      Changes the status of the CasProcessor as a group.
      Specified by:
      setStatus in interface CasProcessorController
      Parameters:
      aStatus - - new status
    • isLocal

      @Deprecated public boolean isLocal()
      Deprecated.
      Checks if is local.
      Specified by:
      isLocal in interface CasProcessorController
      Returns:
      true, if is local
    • isRemote

      @Deprecated public boolean isRemote()
      Deprecated.
      Checks if is remote.
      Specified by:
      isRemote in interface CasProcessorController
      Returns:
      true, if is remote
    • isIntegrated

      @Deprecated public boolean isIntegrated()
      Deprecated.
      Checks if is integrated.
      Specified by:
      isIntegrated in interface CasProcessorController
      Returns:
      true, if is integrated
    • isAbortable

      public boolean isAbortable()
      Determines if instances of CasProcessor managed by this container are abortable. Abortable CasProcessor's action attribute in the <errorRateThreshold> element has a value of 'disable'.
      Specified by:
      isAbortable in interface CasProcessorController
      Returns:
      true if CasProcessor can be disabled
    • initialize

      public boolean initialize(ResourceSpecifier aSpecifier, Map aAdditionalParams) throws ResourceInitializationException
      Description copied from interface: Resource
      Initializes this Resource from a ResourceSpecifier. Applications do not need to call this method. It is called automatically by the ResourceFactory and cannot be called a second time.
      Specified by:
      initialize in interface Resource
      Overrides:
      initialize in class Resource_ImplBase
      Parameters:
      aSpecifier - specifies how to create a resource or locate an existing resource service.
      aAdditionalParams - a Map containing additional parameters. May be null if there are no parameters. Each class that implements this interface can decide what additional parameters it supports.
      Returns:
      true if and only if initialization completed successfully. Returns false if the given ResourceSpecifier is not of an appropriate type for this Resource. If the ResourceSpecifier is of an appropriate type but is invalid or if some other failure occurs, an exception should be thrown.
      Throws:
      ResourceInitializationException - if a failure occurs during initialization.
    • destroy

      public void destroy()
      Destroy instances of CasProcessors managed by this container. Before destroying the instance, this method notifies it with CollectionProcessComplete so that the component finalizes its logic and does appropriate cleanup before shutdown.
      Specified by:
      destroy in interface Resource
      Overrides:
      destroy in class Resource_ImplBase
    • run

      public void run()
      Specified by:
      run in interface Runnable
    • getConfigParameterValue

      public Object getConfigParameterValue(String aParamName)
      Description copied from interface: ConfigurableResource
      Looks up the value of a configuration parameter. This method will only return the value of a parameter that is not defined in any group.

      This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

      Specified by:
      getConfigParameterValue in interface ConfigurableResource
      Parameters:
      aParamName - the name of a parameter that is not in any group
      Returns:
      the value of the parameter with name aParamName, null is either the parameter does not exist or it has not been assigned a value.
    • getConfigParameterValue

      public Object getConfigParameterValue(String aGroupName, String aParamName)
      Description copied from interface: ConfigurableResource
      Looks up the value of a configuration parameter in a group. If the parameter has no value assigned within the group, fallback strategies will be followed.

      This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

      Specified by:
      getConfigParameterValue in interface ConfigurableResource
      Parameters:
      aGroupName - the name of a configuration group. If the group name is null, this method will return the same value as getParameterValue(String).
      aParamName - the name of a parameter in the group
      Returns:
      the value of the parameter in group aGroupName with name aParamName,,null is either the parameter does not exist or it has not been assigned a value.
    • setConfigParameterValue

      public void setConfigParameterValue(String aParamName, Object aValue)
      Description copied from interface: ConfigurableResource
      Sets the value of a configuration parameter. This only works for a parameter that is not defined in any group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
      Specified by:
      setConfigParameterValue in interface ConfigurableResource
      Parameters:
      aParamName - the name of a parameter that is not in any group
      aValue - the value to assign to the parameter
    • setConfigParameterValue

      public void setConfigParameterValue(String aGroupName, String aParamName, Object aValue)
      Description copied from interface: ConfigurableResource
      Sets the value of a configuration parameter in a group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
      Specified by:
      setConfigParameterValue in interface ConfigurableResource
      Parameters:
      aGroupName - the name of a configuration group. If this parameter is null, this method will have the same effect as setParameterValue(String,Object).
      aParamName - the name of a parameter in the group
      aValue - the value to assign to the parameter.
    • reconfigure

      public void reconfigure() throws ResourceConfigurationException
      Description copied from interface: ConfigurableResource
      Instructs this Resource to re-read its configuration parameter settings.
      Specified by:
      reconfigure in interface ConfigurableResource
      Throws:
      ResourceConfigurationException - if the configuration is not valid
    • getName

      public String getName()
      Returns the name of this container. It is the name of the Cas Processor.
      Specified by:
      getName in class ProcessingContainer
      Returns:
      the name
    • getMetaData

      public ResourceMetaData getMetaData()
      Description copied from interface: Resource
      Gets the metadata that describes this Resource.
      Specified by:
      getMetaData in interface Resource
      Overrides:
      getMetaData in class Resource_ImplBase
      Returns:
      an object containing all metadata for this resource.
    • incrementStat

      public void incrementStat(String aStatName, Integer aStat)
      Increment a value of a given stat.
      Specified by:
      incrementStat in class ProcessingContainer
      Parameters:
      aStatName - the a stat name
      aStat - the a stat
    • addStat

      public void addStat(String aStatName, Object aStat)
      Add an arbitrary object and bind it to a given name.
      Specified by:
      addStat in class ProcessingContainer
      Parameters:
      aStatName - the a stat name
      aStat - the a stat
    • getStat

      public Object getStat(String aStatName)
      Return an abject identified with a given name.
      Specified by:
      getStat in class ProcessingContainer
      Parameters:
      aStatName - the a stat name
      Returns:
      the stat
    • getAllStats

      public HashMap getAllStats()
      Returns all stats aggregate during the CPM run.
      Specified by:
      getAllStats in class ProcessingContainer
      Returns:
      a map of all stats aggregated during the CPM run
    • pause

      public void pause()
      Pauses the container until resumed. The CPM will pause to the Container while it is trying to re-connect to a shared remote service. While the Container is paused getCasProcessor() will not be allowed to return a new CasProcessor. All other methods are accessible and will function fine.
      Specified by:
      pause in class ProcessingContainer
    • resume

      public void resume()
      Specified by:
      resume in class ProcessingContainer
    • isPaused

      public boolean isPaused()
      Specified by:
      isPaused in class ProcessingContainer
    • getPool

      public ServiceProxyPool getPool()
      Specified by:
      getPool in class ProcessingContainer
    • setSingleFencedService

      public void setSingleFencedService(boolean aSingleFencedInstance)
      Specified by:
      setSingleFencedService in class ProcessingContainer
    • isSingleFencedService

      public boolean isSingleFencedService()
      Specified by:
      isSingleFencedService in class ProcessingContainer
    • getFetchTime

      public long getFetchTime()
      Gets the fetch time.
      Returns:
      the fetch time