Class AbstractJsonArrayDataFeedCollector<ID,E extends CollectedEntity<ID>>

java.lang.Object
com.broadleafcommerce.datafeed.service.collector.AbstractJsonArrayDataFeedCollector<ID,E>
Type Parameters:
ID -
E -
All Implemented Interfaces:
DataFeedCollector
Direct Known Subclasses:
ProductDataFeedCollector

public abstract class AbstractJsonArrayDataFeedCollector<ID,E extends CollectedEntity<ID>> extends Object implements DataFeedCollector
Abstract component that handles the complexity of creating a File, collecting data, and writing DataFeedEntities to the file as an array of JSON objects for future processing. Because fetching data can be time-consuming, especially with additional decorations such as pricing and inventory, this component uses a ThreadPoolTaskExecutor to attempt to multi-thread this process, assuming the executor is configured for multiple threads.
Author:
Kelly Tisdell (ktisdell)
  • Constructor Details

  • Method Details

    • handle

      Description copied from interface: DataFeedCollector
      This method handles collecting and transforming data of a certain type into a JSON file that will be used to transform and publish to syndicates such as Google or Facebook. Implementors must consume the DataFeedCollectionRequest and use it to fetch data (e.g. from the CatalogService) and then write that data to a file system for future processing or publication.
      Specified by:
      handle in interface DataFeedCollector
      Returns:
      the DataFeedCollectionResponse containing details about the data collection step
    • createEmptyLocalCollectedDataFile

      protected Path createEmptyLocalCollectedDataFile(DataFeedCollectionRequest request) throws Exception
      Create the local output file that will ultimately be populated with collected data.

      In almost all cases, this should point to a temporary file (ex: Files.createTempFile(String, String, FileAttribute[]))

      Note that this method is intentionally simple and is not intended to be a place where file contents are initialized.

      Parameters:
      request - the collection request details
      Returns:
      a reference to the created file
      Throws:
      Exception - if something went wrong
    • buildContextInfo

      protected com.broadleafcommerce.data.tracking.core.context.ContextInfo buildContextInfo(DataFeedCollectionRequest request, com.broadleafcommerce.data.tracking.core.type.OperationType operationType)
    • getIdBatchSize

      protected int getIdBatchSize()
      We want to read the IDs in larger batches so that we can delegate sub batches to multiple threads for parallel processing.

      Default is batchSize * workerThreads * 2

      The multiplier of 2 is basically saying each worker thread will execute the flow a maximum of 2 times before coming back to the caller.

      Returns:
    • readIncrementalUpdateRecords

      protected Stream<IncrementalDataFeedEntityUpdateRecord> readIncrementalUpdateRecords(DataFeedCollectionRequest request, com.broadleafcommerce.data.tracking.core.context.ContextInfo contextInfo)
      Returns a Stream containing all records referencing which Entities need to be collected for the given dataFeedProcessExecutionId.
      Parameters:
      request -
      contextInfo -
      Returns:
    • isIdDuplicateIncrementalId

      protected boolean isIdDuplicateIncrementalId(@NonNull ID id, @NonNull @NonNull Map<String,Object> collectorProcessState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo)
      This provides a mechanism to ensure that we're not including the same ID twice during incremental updates. Duplicates can happen when multiple IncrementalDataFeedEntityUpdateRecords refer to the same CollectedEntity with different handles (e.g. a Product can be referenced by SKU, pricingKey, ID, or externalID). We don't want duplicates.
      Parameters:
      id -
      collectorProcessState - Thread-safe Map of shared state for this process
      readContextInfo - the ContextInfo for reading data
      Returns:
      true if we've seen this ID before for this particular incremental Process Execution
    • executeContributors

      protected void executeContributors(List<E> records, @NonNull @NonNull DataFeedCollectionRequest request, @NonNull @NonNull Map<String,Object> processState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo)
    • readDataFeedRecords

      protected abstract List<E> readDataFeedRecords(@NonNull @NonNull DataFeedCollectionRequest request, @NonNull @NonNull List<ID> idBatch, @NonNull @NonNull Map<String,Object> collectorProcessState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo)
      Parameters:
      request - the original request to collect data
      idBatch - a batch of unique IDs
      collectorProcessState - Map to optionally hold additional state across multiple method invocations - this is thread safe and guaranteed to be the same Map instance on each invocation
      readContextInfo - ContextInfo to read data from the DB or from an API
      Returns:
    • readIdBatchForFullFeed

      protected abstract List<ID> readIdBatchForFullFeed(@NonNull @NonNull DataFeedCollectionRequest request, @NonNull @NonNull Map<String,Object> collectorProcessState, int batchSize, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo)
      Returns a List that provides a batch of IDs, sorted in ascending order. Implementations must keep track of their own cursor (stating point) using the collectorProcessState, which is guaranteed to be the same Map instance between invocations.
      Parameters:
      request - the original request to collect data
      collectorProcessState - Map to optionally hold additional state across multiple method invocations - this is thread safe and guaranteed to be the same Map instance on each invocation
      batchSize - the size of the id batch to fetch, usually larger than the normal batch size.
      readContextInfo - ContextInfo to read data from the DB or from an API
      Returns:
    • readIdBatchForPartialFeed

      protected abstract List<ID> readIdBatchForPartialFeed(DataFeedCollectionRequest collectionRequest, @NonNull @NonNull List<IncrementalDataFeedEntityUpdateRecord> updateRecords, @NonNull @NonNull Map<String,Object> sharedProcessState, @Nullable com.broadleafcommerce.data.tracking.core.context.ContextInfo readContextInfo)
      IncrementalDataFeedEntityUpdateRecord can have multiple types of pointers to an entity (e.g. ID, SKU, externalId, and pricingKey can all point to the same Product). We want to use these to fetch the IDs of the given CollectedEntities.
      Parameters:
      collectionRequest - the original request to collect data
      updateRecords -
      sharedProcessState -
      readContextInfo -
      Returns:
    • getRsqlParser

      protected cz.jirutka.rsql.parser.RSQLParser getRsqlParser()
    • getTypeFactory

      protected com.broadleafcommerce.common.extension.TypeFactory getTypeFactory()
    • getObjectMapper

      protected com.fasterxml.jackson.databind.ObjectMapper getObjectMapper()
    • getIncrementalDataFeedEntityUpdateRecordService

      protected IncrementalDataFeedEntityUpdateRecordService<IncrementalDataFeedEntityUpdateRecord> getIncrementalDataFeedEntityUpdateRecordService()
    • getBatchSize

      protected int getBatchSize()
    • getWorkerThreads

      protected int getWorkerThreads()
    • getCollectedEntityContributors

      protected List<CollectedEntityContributor> getCollectedEntityContributors()