Class AbstractQueueLoader<FROM,TO>
- Type Parameters:
FROM
- The type that is being read from a data source and optionally converted to. TO
- The type that will be put on theReindexQueue
.
- All Implemented Interfaces:
IndexableTypeAware
,QueueLoader<TO>
- Direct Known Subclasses:
AbstractBatchIdQueueLoader
ReindexQueue
.- Author:
- Kelly Tisdell (ktisdell)
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract int
countIndexablesToProcess
(List<TO> queueEntries) Determine how manyIndexable
s are being processed.protected Executor
Creates an Executor to allow us to populate the queue on different threads than what is reading the data.protected void
destroyExecutor
(Executor executor) Provides a mechanism to shutdown the Executor.int
Defines the number of times that we'll try to put something from a bounded BlockingQueue with the given wait time before giving up, for example when nothing is reading from the Queue.long
Time in milliseconds to wait put something on a bounded BlockingQueue.protected abstract void
This will be called exactly once prior to a call to readPage.boolean
Whether the loader is currently loading values into the queue.final long
populateQueue
(ReindexQueue<TO> queue, ProcessStateHolder processStateHolder) Reads T from a source (e.g.readPage()
Deprecated.readPage
(ProcessStateHolder processStateHolder) Reads paged data to, optionally, transform and put on aBlockingQueue
.Provides an optional transformation step.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.broadleafcommerce.search.index.core.IndexableTypeAware
getIndexableType
-
Constructor Details
-
AbstractQueueLoader
public AbstractQueueLoader(long waitTime, int maxTries)
-
-
Method Details
-
populateQueue
public final long populateQueue(ReindexQueue<TO> queue, ProcessStateHolder processStateHolder) throws com.broadleafcommerce.search.api.SearchIndexException Description copied from interface:QueueLoader
Reads T from a source (e.g. REST service, DB, etc.) and puts them on a queue for processing by another thread.- Specified by:
populateQueue
in interfaceQueueLoader<FROM>
- Parameters:
queue
- the queue to populateprocessStateHolder
- the state holder tracking this process- Returns:
- the number of IDs that were read and put on the queue.
- Throws:
com.broadleafcommerce.search.api.SearchIndexException
- if the queue was unable to be populated
-
createExecutor
Creates an Executor to allow us to populate the queue on different threads than what is reading the data. This is especially helpful when processing the data, aggregating raw results, etc. The default returns an unboundedThreadPoolTaskExecutor
. If you override this method to provide anything except aThreadPoolTaskExecutor
then you should override thedestroyExecutor(java.util.concurrent.Executor)
method as well, which expects anExecutor
and will, by default, shut down the Executor if it is a ThreadPoolTaskExecutor.- Returns:
- new
ThreadPoolTaskExecutor
for populating the queue
-
destroyExecutor
Provides a mechanism to shutdown the Executor. By default this assumes that the executor is aThreadPoolTaskExecutor
.- Parameters:
executor
- the executor to shut down
-
initialize
protected abstract void initialize()This will be called exactly once prior to a call to readPage. This method may do nothing, but implementors may use it to reset state such as a cursor. This method should set the state of this object so that the next call to readNextIds will begin reading batches of IDs at the beginning of the list. -
readPage
@Deprecated(since="2.1.2-GA") protected abstract List<FROM> readPage() throws com.broadleafcommerce.search.api.SearchIndexExceptionDeprecated.in favor ofreadPage(ProcessStateHolder)
for the potential to utilize a additional process information.- Throws:
com.broadleafcommerce.search.api.SearchIndexException
-
readPage
protected abstract List<FROM> readPage(ProcessStateHolder processStateHolder) throws com.broadleafcommerce.search.api.SearchIndexException Reads paged data to, optionally, transform and put on aBlockingQueue
. This could be a list of IDs,Indexable
objects, or any number of things.Implementors MUST return an empty list when there is no more data to return. Implementors must also keep track of their own pagination state. The initialize method will be called prior to the first call to this method.
The results of this call will be passed to the transform method and then put on a queue for more intensive processing, transformation, persistence, etc.
Most typically, this will read a rather large List of IDs. Additionally this sends process state information for potential filters.
- Parameters:
processStateHolder
- the state holder process information for the index thread- Returns:
- a paged list of FROM items
- Throws:
com.broadleafcommerce.search.api.SearchIndexException
- if an error occurs reading paged data
-
transform
protected abstract List<TO> transform(List<FROM> from) throws com.broadleafcommerce.search.api.SearchIndexException Provides an optional transformation step. This should be lightweight and is typically for aggregating data, spitting it into chunks, etc. This can be as simple as casting to the output, or ifand are the same, this can be a pass through. The result of this call is what will go onto the queue. Think of this as early, lightweight processing to provide enough data for another thread at the other end of the queue to do further, more intensive queries and processing.
If the readPage method typically returns a large list of IDs, this method would break things into chunks.
- Parameters:
from
- the read data page to transform- Returns:
- the transformed data
- Throws:
com.broadleafcommerce.search.api.SearchIndexException
- if an error occurs transforming the data
-
countIndexablesToProcess
Determine how manyIndexable
s are being processed. Since(queueEntry) is generic and could be a Collection
, aBatchIdHolder
, or some other type, we provide this method so that implementors can return a count of theIndexable
items that are expected to be processed by anAbstractWorker
at the other end of the Queue.Return 0 if there is no way to know the number of indexables (e.g. if the queueEntry is an ID range).
- Parameters:
queueEntries
- the entries on the queue for which to count expected indexables- Returns:
- the number of expected indexables to index for the queueEntries
-
getWaitTime
public long getWaitTime()Time in milliseconds to wait put something on a bounded BlockingQueue. Keep this rather small. We want to wait and try again to ensure that we don't block indefinitely and to check that there was not a failure while we began waiting.- Returns:
- time to wait to put into a bounded BlockingQueue
-
getMaxTries
public int getMaxTries()Defines the number of times that we'll try to put something from a bounded BlockingQueue with the given wait time before giving up, for example when nothing is reading from the Queue.- Returns:
- number of times to try putting into a bounded BlockingQueue
-
isRunning
public boolean isRunning()Whether the loader is currently loading values into the queue.- Returns:
- whether the loader is currently loading values into the queue
-
readPage(ProcessStateHolder)
for the potential to utilize a additional process information.