Class AbstractQueueLoader<FROM,TO>

java.lang.Object
com.broadleafcommerce.search.index.core.queue.AbstractQueueLoader<FROM,TO>
Type Parameters:
FROM - The type that is being read from a data source and optionally converted to .
TO - The type that will be put on the ReindexQueue.
All Implemented Interfaces:
IndexableTypeAware, QueueLoader<TO>
Direct Known Subclasses:
AbstractBatchIdQueueLoader

public abstract class AbstractQueueLoader<FROM,TO> extends Object implements QueueLoader<TO>, IndexableTypeAware
Abstract component to allow reading data in batch from a data source or web service, converting those values, and putting them on a ReindexQueue.
Author:
Kelly Tisdell (ktisdell)
  • Constructor Details

    • AbstractQueueLoader

      public AbstractQueueLoader(long waitTime, int maxTries)
  • Method Details

    • populateQueue

      public final long populateQueue(ReindexQueue<TO> queue, ProcessStateHolder processStateHolder) throws com.broadleafcommerce.search.api.SearchIndexException
      Description copied from interface: QueueLoader
      Reads T from a source (e.g. REST service, DB, etc.) and puts them on a queue for processing by another thread.
      Specified by:
      populateQueue in interface QueueLoader<FROM>
      Parameters:
      queue - the queue to populate
      processStateHolder - the state holder tracking this process
      Returns:
      the number of IDs that were read and put on the queue.
      Throws:
      com.broadleafcommerce.search.api.SearchIndexException - if the queue was unable to be populated
    • createExecutor

      protected Executor createExecutor()
      Creates an Executor to allow us to populate the queue on different threads than what is reading the data. This is especially helpful when processing the data, aggregating raw results, etc. The default returns an unbounded ThreadPoolTaskExecutor. If you override this method to provide anything except a ThreadPoolTaskExecutor then you should override the destroyExecutor(java.util.concurrent.Executor) method as well, which expects an Executor and will, by default, shut down the Executor if it is a ThreadPoolTaskExecutor.
      Returns:
      new ThreadPoolTaskExecutor for populating the queue
    • destroyExecutor

      protected void destroyExecutor(Executor executor)
      Provides a mechanism to shutdown the Executor. By default this assumes that the executor is a ThreadPoolTaskExecutor.
      Parameters:
      executor - the executor to shut down
    • initialize

      protected abstract void initialize()
      This will be called exactly once prior to a call to readPage. This method may do nothing, but implementors may use it to reset state such as a cursor. This method should set the state of this object so that the next call to readNextIds will begin reading batches of IDs at the beginning of the list.
    • readPage

      @Deprecated(since="2.1.2-GA") protected abstract List<FROM> readPage() throws com.broadleafcommerce.search.api.SearchIndexException
      Deprecated.
      in favor of readPage(ProcessStateHolder) for the potential to utilize a additional process information.
      Throws:
      com.broadleafcommerce.search.api.SearchIndexException
    • readPage

      protected abstract List<FROM> readPage(ProcessStateHolder processStateHolder) throws com.broadleafcommerce.search.api.SearchIndexException
      Reads paged data to, optionally, transform and put on a BlockingQueue. This could be a list of IDs, Indexable objects, or any number of things.

      Implementors MUST return an empty list when there is no more data to return. Implementors must also keep track of their own pagination state. The initialize method will be called prior to the first call to this method.

      The results of this call will be passed to the transform method and then put on a queue for more intensive processing, transformation, persistence, etc.

      Most typically, this will read a rather large List of IDs. Additionally this sends process state information for potential filters.

      Parameters:
      processStateHolder - the state holder process information for the index thread
      Returns:
      a paged list of FROM items
      Throws:
      com.broadleafcommerce.search.api.SearchIndexException - if an error occurs reading paged data
    • transform

      protected abstract List<TO> transform(List<FROM> from) throws com.broadleafcommerce.search.api.SearchIndexException
      Provides an optional transformation step. This should be lightweight and is typically for aggregating data, spitting it into chunks, etc. This can be as simple as casting to the output, or if and are the same, this can be a pass through. The result of this call is what will go onto the queue.

      Think of this as early, lightweight processing to provide enough data for another thread at the other end of the queue to do further, more intensive queries and processing.

      If the readPage method typically returns a large list of IDs, this method would break things into chunks.

      Parameters:
      from - the read data page to transform
      Returns:
      the transformed data
      Throws:
      com.broadleafcommerce.search.api.SearchIndexException - if an error occurs transforming the data
    • countIndexablesToProcess

      protected abstract int countIndexablesToProcess(List<TO> queueEntries)
      Determine how many Indexables are being processed. Since (queueEntry) is generic and could be a Collection, a BatchIdHolder, or some other type, we provide this method so that implementors can return a count of the Indexable items that are expected to be processed by an AbstractWorker at the other end of the Queue.

      Return 0 if there is no way to know the number of indexables (e.g. if the queueEntry is an ID range).

      Parameters:
      queueEntries - the entries on the queue for which to count expected indexables
      Returns:
      the number of expected indexables to index for the queueEntries
    • getWaitTime

      public long getWaitTime()
      Time in milliseconds to wait put something on a bounded BlockingQueue. Keep this rather small. We want to wait and try again to ensure that we don't block indefinitely and to check that there was not a failure while we began waiting.
      Returns:
      time to wait to put into a bounded BlockingQueue
    • getMaxTries

      public int getMaxTries()
      Defines the number of times that we'll try to put something from a bounded BlockingQueue with the given wait time before giving up, for example when nothing is reading from the Queue.
      Returns:
      number of times to try putting into a bounded BlockingQueue
    • isRunning

      public boolean isRunning()
      Whether the loader is currently loading values into the queue.
      Returns:
      whether the loader is currently loading values into the queue