Class AbstractQueueLoader<FROM,TO>
- java.lang.Object
-
- com.broadleafcommerce.search.index.core.queue.AbstractQueueLoader<FROM,TO>
-
- Type Parameters:
FROM
- The type that is being read from a data source and optionally converted to. TO
- The type that will be put on theReindexQueue
.
- All Implemented Interfaces:
IndexableTypeAware
,QueueLoader<TO>
- Direct Known Subclasses:
AbstractBatchIdQueueLoader
public abstract class AbstractQueueLoader<FROM,TO> extends Object implements QueueLoader<TO>, IndexableTypeAware
Abstract component to allow reading data in batch from a data source or web service, converting those values, and putting them on aReindexQueue
.- Author:
- Kelly Tisdell (ktisdell)
-
-
Constructor Summary
Constructors Constructor Description AbstractQueueLoader(long waitTime, int maxTries)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract int
countIndexablesToProcess(List<TO> queueEntries)
Determine how manyIndexable
s are being processed.protected Executor
createExecutor()
Creates an Executor to allow us to populate the queue on different threads than what is reading the data.protected void
destroyExecutor(Executor executor)
Provides a mechanism to shutdown the Executor.int
getMaxTries()
Defines the number of times that we'll try to put something from a bounded BlockingQueue with the given wait time before giving up, for example when nothing is reading from the Queue.long
getWaitTime()
Time in milliseconds to wait put something on a bounded BlockingQueue.protected abstract void
initialize()
This will be called exactly once prior to a call to readPage.boolean
isRunning()
Whether the loader is currently loading values into the queue.long
populateQueue(ReindexQueue<TO> queue, ProcessStateHolder processStateHolder)
Reads T from a source (e.g.protected abstract List<FROM>
readPage()
Reads paged data to, optionally, transform and put on aBlockingQueue
.protected abstract List<TO>
transform(List<FROM> from)
Provides an optional transformation step.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.broadleafcommerce.search.index.core.IndexableTypeAware
getIndexableType
-
-
-
-
Method Detail
-
populateQueue
public final long populateQueue(ReindexQueue<TO> queue, ProcessStateHolder processStateHolder) throws com.broadleafcommerce.search.api.SearchIndexException
Description copied from interface:QueueLoader
Reads T from a source (e.g. REST service, DB, etc.) and puts them on a queue for processing by another thread.- Specified by:
populateQueue
in interfaceQueueLoader<FROM>
- Parameters:
queue
- the queue to populateprocessStateHolder
- the state holder tracking this process- Returns:
- the number of IDs that were read and put on the queue.
- Throws:
com.broadleafcommerce.search.api.SearchIndexException
- if the queue was unable to be populated
-
createExecutor
protected Executor createExecutor()
Creates an Executor to allow us to populate the queue on different threads than what is reading the data. This is especially helpful when processing the data, aggregating raw results, etc. The default returns an unboundedThreadPoolTaskExecutor
. If you override this method to provide anything except aThreadPoolTaskExecutor
then you should override thedestroyExecutor(java.util.concurrent.Executor)
method as well, which expects anExecutor
and will, by default, shut down the Executor if it is a ThreadPoolTaskExecutor.- Returns:
- new
ThreadPoolTaskExecutor
for populating the queue
-
destroyExecutor
protected void destroyExecutor(Executor executor)
Provides a mechanism to shutdown the Executor. By default this assumes that the executor is aThreadPoolTaskExecutor
.- Parameters:
executor
- the executor to shut down
-
initialize
protected abstract void initialize()
This will be called exactly once prior to a call to readPage. This method may do nothing, but implementors may use it to reset state such as a cursor. This method should set the state of this object so that the next call to readNextIds will begin reading batches of IDs at the beginning of the list.
-
readPage
protected abstract List<FROM> readPage() throws com.broadleafcommerce.search.api.SearchIndexException
Reads paged data to, optionally, transform and put on aBlockingQueue
. This could be a list of IDs,Indexable
objects, or any number of things.Implementors MUST return an empty list when there is no more data to return. Implementors must also keep track of their own pagination state. The initialize method will be called prior to the first call to this method.
The results of this call will be passed to the transform method and then put on a queue for more intensive processing, transformation, persistence, etc.
Most typically, this will read a rather large List of IDs.
- Returns:
- a paged list of FROM items
- Throws:
com.broadleafcommerce.search.api.SearchIndexException
- if an error occurs reading paged data
-
transform
protected abstract List<TO> transform(List<FROM> from) throws com.broadleafcommerce.search.api.SearchIndexException
Provides an optional transformation step. This should be lightweight and is typically for aggregating data, spitting it into chunks, etc. This can be as simple as casting to the output, or ifand are the same, this can be a pass through. The result of this call is what will go onto the queue. Think of this as early, lightweight processing to provide enough data for another thread at the other end of the queue to do further, more intensive queries and processing.
If the readPage method typically returns a large list of IDs, this method would break things into chunks.
- Parameters:
from
- the read data page to transform- Returns:
- the transformed data
- Throws:
com.broadleafcommerce.search.api.SearchIndexException
- if an error occurs transforming the data
-
countIndexablesToProcess
protected abstract int countIndexablesToProcess(List<TO> queueEntries)
Determine how manyIndexable
s are being processed. Since(queueEntry) is generic and could be a Collection
, aBatchIdHolder
, or some other type, we provide this method so that implementors can return a count of theIndexable
items that are expected to be processed by anAbstractWorker
at the other end of the Queue.Return 0 if there is no way to know the number of indexables (e.g. if the queueEntry is an ID range).
- Parameters:
queueEntries
- the entries on the queue for which to count expected indexables- Returns:
- the number of expected indexables to index for the queueEntries
-
getWaitTime
public long getWaitTime()
Time in milliseconds to wait put something on a bounded BlockingQueue. Keep this rather small. We want to wait and try again to ensure that we don't block indefinitely and to check that there was not a failure while we began waiting.- Returns:
- time to wait to put into a bounded BlockingQueue
-
getMaxTries
public int getMaxTries()
Defines the number of times that we'll try to put something from a bounded BlockingQueue with the given wait time before giving up, for example when nothing is reading from the Queue.- Returns:
- number of times to try putting into a bounded BlockingQueue
-
isRunning
public boolean isRunning()
Whether the loader is currently loading values into the queue.- Returns:
- whether the loader is currently loading values into the queue
-
-