Class AbstractJsonArrayDataFeedCollector<ID,E extends CollectedEntity<ID>>
java.lang.Object
com.broadleafcommerce.datafeed.service.collector.AbstractJsonArrayDataFeedCollector<ID,E>
- Type Parameters:
ID
-E
-
- All Implemented Interfaces:
DataFeedCollector
- Direct Known Subclasses:
ProductDataFeedCollector
public abstract class AbstractJsonArrayDataFeedCollector<ID,E extends CollectedEntity<ID>>
extends Object
implements DataFeedCollector
Abstract component that handles the complexity of creating a
File
, collecting data, and
writing DataFeedEntities
to the file as an array of JSON objects for
future processing.
Because fetching data can be time-consuming, especially with additional decorations such as
pricing and inventory, this component uses a ThreadPoolTaskExecutor
to attempt to
multi-thread this process, assuming the executor is configured for multiple threads.- Author:
- Kelly Tisdell (ktisdell)
-
Constructor Summary
ConstructorDescriptionAbstractJsonArrayDataFeedCollector
(com.broadleafcommerce.common.extension.TypeFactory typeFactory, com.fasterxml.jackson.databind.ObjectMapper objectMapper, IncrementalDataFeedEntityUpdateRecordService<IncrementalDataFeedEntityUpdateRecord> incrementalDataFeedEntityUpdateRecordService, List<CollectedEntityContributor> collectedEntityContributors, int batchSize, int workerThreads) -
Method Summary
Modifier and TypeMethodDescriptionprotected com.broadleafcommerce.data.tracking.core.context.ContextInfo
buildContextInfo
(DataFeedCollectionRequest request, com.broadleafcommerce.data.tracking.core.type.OperationType operationType) protected Path
Create the local output file that will ultimately be populated with collected data.protected void
executeContributors
(List<E> records, @NonNull DataFeedCollectionRequest request, @NonNull Map<String, Object> processState, com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) protected int
protected List<CollectedEntityContributor>
protected int
We want to read the IDs in larger batches so that we can delegate sub batches to multiple threads for parallel processing.protected com.fasterxml.jackson.databind.ObjectMapper
protected cz.jirutka.rsql.parser.RSQLParser
protected com.broadleafcommerce.common.extension.TypeFactory
protected int
handle
(DataFeedCollectionRequest request) This method handles collecting and transforming data of a certain type into a JSON file that will be used to transform and publish to syndicates such as Google or Facebook.protected boolean
isIdDuplicateIncrementalId
(ID id, @NonNull Map<String, Object> collectorProcessState, com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) This provides a mechanism to ensure that we're not including the same ID twice during incremental updates.readDataFeedRecords
(@NonNull DataFeedCollectionRequest request, @NonNull List<ID> idBatch, @NonNull Map<String, Object> collectorProcessState, com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) readIdBatchForFullFeed
(@NonNull DataFeedCollectionRequest request, @NonNull Map<String, Object> collectorProcessState, int batchSize, com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) Returns aList
that provides a batch of IDs, sorted in ascending order.readIdBatchForPartialFeed
(DataFeedCollectionRequest collectionRequest, @NonNull List<IncrementalDataFeedEntityUpdateRecord> updateRecords, @NonNull Map<String, Object> sharedProcessState, com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) IncrementalDataFeedEntityUpdateRecord
can have multiple types of pointers to an entity (e.g.protected Stream<IncrementalDataFeedEntityUpdateRecord>
readIncrementalUpdateRecords
(DataFeedCollectionRequest request, com.broadleafcommerce.data.tracking.core.context.ContextInfo contextInfo) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.broadleafcommerce.datafeed.service.collector.DataFeedCollector
canHandle
-
Constructor Details
-
AbstractJsonArrayDataFeedCollector
public AbstractJsonArrayDataFeedCollector(com.broadleafcommerce.common.extension.TypeFactory typeFactory, com.fasterxml.jackson.databind.ObjectMapper objectMapper, IncrementalDataFeedEntityUpdateRecordService<IncrementalDataFeedEntityUpdateRecord> incrementalDataFeedEntityUpdateRecordService, List<CollectedEntityContributor> collectedEntityContributors, int batchSize, int workerThreads)
-
-
Method Details
-
handle
Description copied from interface:DataFeedCollector
This method handles collecting and transforming data of a certain type into a JSON file that will be used to transform and publish to syndicates such as Google or Facebook. Implementors must consume theDataFeedCollectionRequest
and use it to fetch data (e.g. from the CatalogService) and then write that data to a file system for future processing or publication.- Specified by:
handle
in interfaceDataFeedCollector
- Returns:
- the DataFeedCollectionResponse containing details about the data collection step
-
createEmptyLocalCollectedDataFile
protected Path createEmptyLocalCollectedDataFile(DataFeedCollectionRequest request) throws Exception Create the local output file that will ultimately be populated with collected data.In almost all cases, this should point to a temporary file (ex:
Files.createTempFile(String, String, FileAttribute[])
)Note that this method is intentionally simple and is not intended to be a place where file contents are initialized.
- Parameters:
request
- the collection request details- Returns:
- a reference to the created file
- Throws:
Exception
- if something went wrong
-
buildContextInfo
protected com.broadleafcommerce.data.tracking.core.context.ContextInfo buildContextInfo(DataFeedCollectionRequest request, com.broadleafcommerce.data.tracking.core.type.OperationType operationType) -
getIdBatchSize
protected int getIdBatchSize()We want to read the IDs in larger batches so that we can delegate sub batches to multiple threads for parallel processing.Default is
batchSize * workerThreads * 2
The multiplier of 2 is basically saying each worker thread will execute the flow a maximum of 2 times before coming back to the caller.
- Returns:
-
readIncrementalUpdateRecords
protected Stream<IncrementalDataFeedEntityUpdateRecord> readIncrementalUpdateRecords(DataFeedCollectionRequest request, com.broadleafcommerce.data.tracking.core.context.ContextInfo contextInfo) Returns aStream
containing allrecords
referencing which Entities need to be collected for the given dataFeedProcessExecutionId.- Parameters:
request
-contextInfo
-- Returns:
-
isIdDuplicateIncrementalId
protected boolean isIdDuplicateIncrementalId(@NonNull ID id, @NonNull @NonNull Map<String, Object> collectorProcessState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) This provides a mechanism to ensure that we're not including the same ID twice during incremental updates. Duplicates can happen when multipleIncrementalDataFeedEntityUpdateRecords
refer to the sameCollectedEntity
with different handles (e.g. a Product can be referenced by SKU, pricingKey, ID, or externalID). We don't want duplicates.- Parameters:
id
-collectorProcessState
- Thread-safe Map of shared state for this processreadContextInfo
- the ContextInfo for reading data- Returns:
- true if we've seen this ID before for this particular incremental Process Execution
-
executeContributors
-
readDataFeedRecords
protected abstract List<E> readDataFeedRecords(@NonNull @NonNull DataFeedCollectionRequest request, @NonNull @NonNull List<ID> idBatch, @NonNull @NonNull Map<String, Object> collectorProcessState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) - Parameters:
request
- the original request to collect dataidBatch
- a batch of unique IDscollectorProcessState
- Map to optionally hold additional state across multiple method invocations - this is thread safe and guaranteed to be the same Map instance on each invocationreadContextInfo
- ContextInfo to read data from the DB or from an API- Returns:
-
readIdBatchForFullFeed
protected abstract List<ID> readIdBatchForFullFeed(@NonNull @NonNull DataFeedCollectionRequest request, @NonNull @NonNull Map<String, Object> collectorProcessState, int batchSize, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) Returns aList
that provides a batch of IDs, sorted in ascending order. Implementations must keep track of their own cursor (stating point) using the collectorProcessState, which is guaranteed to be the same Map instance between invocations.- Parameters:
request
- the original request to collect datacollectorProcessState
- Map to optionally hold additional state across multiple method invocations - this is thread safe and guaranteed to be the same Map instance on each invocationbatchSize
- the size of the id batch to fetch, usually larger than the normal batch size.readContextInfo
- ContextInfo to read data from the DB or from an API- Returns:
-
readIdBatchForPartialFeed
protected abstract List<ID> readIdBatchForPartialFeed(DataFeedCollectionRequest collectionRequest, @NonNull @NonNull List<IncrementalDataFeedEntityUpdateRecord> updateRecords, @NonNull @NonNull Map<String, Object> sharedProcessState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo) IncrementalDataFeedEntityUpdateRecord
can have multiple types of pointers to an entity (e.g. ID, SKU, externalId, and pricingKey can all point to the same Product). We want to use these to fetch the IDs of the givenCollectedEntities
.- Parameters:
collectionRequest
- the original request to collect dataupdateRecords
-sharedProcessState
-readContextInfo
-- Returns:
-
getRsqlParser
protected cz.jirutka.rsql.parser.RSQLParser getRsqlParser() -
getTypeFactory
protected com.broadleafcommerce.common.extension.TypeFactory getTypeFactory() -
getObjectMapper
protected com.fasterxml.jackson.databind.ObjectMapper getObjectMapper() -
getIncrementalDataFeedEntityUpdateRecordService
protected IncrementalDataFeedEntityUpdateRecordService<IncrementalDataFeedEntityUpdateRecord> getIncrementalDataFeedEntityUpdateRecordService() -
getBatchSize
protected int getBatchSize() -
getWorkerThreads
protected int getWorkerThreads() -
getCollectedEntityContributors
-