Aug 19 2020

Solving data ingestion at scale using argo workflows!

Apoorv Bhawsar

Principal Engineer


Challenges with data ingestion

At Unbxd, we process a massive volume of ecommerce catalog data for multiple sites to serve search results where product count varies from 5k to 50M. This multi-tenant architecture involves periodic refreshes of the complete catalog and incremental updates on fields like price, inventory, etc. The frequency of such uploads varies from client to client and spreads across multiple regions. The objective here is to treat a site with a catalog size of 5k products with hourly updates and a site with a catalog size of 10M with once a day updating the same, honoring the predefined SLAs in both cases while keeping the usage cost to a minimum.

A generic data ingestion of any search engine involves the following operations.

  1. A periodic full refresh of the catalog
  2. Frequent incremental updates on a set of fields, where field size may vary significantly on every update

For an eCommerce search engine, where a product, once added, is either removed or has very minor updates, operations can be:

  1. A periodic full refresh of the catalog where the change in products is usually less than 10% of the whole catalog size
  2. Frequent incremental updates on a fixed set of fields like inventory size, price of products, etc.
  3. Deletion of an out-of-stock product

Given the very specific requirement set and unpredictable frequency of updates, the frequently chosen pipeline of Kafka—>storm—>solr-cloud doesn’t work very well. Moreover, distinguishing different clients, considering their varying sizes, becomes tough and can increase overall infrastructure costs with a lot of unused bandwidth during off-peak hours.

Transforming ingestion request to the workflow 

We decided to treat every catalog ingestion request as a workflow. This gives us two significant advantages. First, every request is independent of the other. Resources are used only when there is an upload event.

Out of various workflow management platforms out there, Argo checked all the boxes for us. We decided to ditch other popular options like airflow because Argo is a container-native workflow engine on Kubernetes. Since we already have most of our services on Kubernetes, maintenance was not an issue.

On top of this, we take advantage of AWS spot instances, which ensures we have enough capacity to launch any number of workflows simultaneously.

Here's a fundamental view of our request flow


Challenges

1. Storage

Even though Kubernetes has revolutionized the way applications are deployed and maintained, it falls short on out-of-the-box storage solutions. If you’re on AWS (Amazon Web Services) like us, you get EBS and EFS as your two options. Our use case requires us to mount storage on multiple pods of a workflow simultaneously. Still, the number of parallel mounts jumps in some cases where the catalog file size is enormous and has to be split into multiple files for faster processing. EBS, as per amazon, has an upper limit on allowed parallel mounts. In this case, a pod requesting mount would get stuck indefinitely till other pods release the resource.

Thankfully, CSI (Container Storage Interface) is a standard set of specifications adapted to Kubernetes, with many driver plugins. We make use of Amazon FSx for Lustre, which doesn’t have any limits on parallel mounts and provides fast storage.

2. Scaling Solr indexing

While solr provides many search features out of the box, indexing still requires extensive configuration tweaks to ensure faster indexing while keeping search traffic unaffected.

The default setup works well for any incremental or in-place updates and deletes. But for cases where a vast catalog has to be indexed into the solr cloud cluster, it can significantly hit the search traffic. Adding to it, performance requirements for indexing can exceed searching and is usually a one-time or periodic job.

To solve this, we chose not to index into the production solr cluster. Instead, our workflow step responsible for indexing spawns an embedded Solr server to index it locally and imports the index into the prod cluster. This offers us the following benefits.

  1. The only interaction with the production cluster is done when the output index is to be imported, which is just a directory copy call.
  2. Given the requirement, a pod of any size appropriate for indexing can be launched temporarily.
  3. Parallel indexing into separate pods can maximize performance, the output of which can be later merged into one index.
  4. Allows us to tweak Lucene, which works underneath Solr to index documents

Following are the Lucene properties we tweak but YMMV

  1. Index Merge Policy: Every batch of documents in Lucene is written as a segment, periodically merged by a scheduler that tries to merge similar-sized segments. The objective here is to keep the segment count as low as possible because it slows down the search as it has to iterate over every segment to find matching documents.
    This is quite unnecessary while indexing, so we use NoMergePolicyFactory while merging segments is an operation we do later in the workflow.
  2. RAM_BUFFER_SIZE: As the name suggests, you can opt for a higher value of memory buffer to ensure docs are written to the file system in desirable chunks. We figured keeping a high value doesn’t always help, and you’ll have to arrive at a sweet spot by trial. Turning on the info stream is recommended while tweaking, as it gives detailed logs of what is happening behind the scenes while Lucene indexes documents.
"[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]:new segment has 0 deleted docs\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: new segment has no vectors; norms; docValues; prox; freqs\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flushedFiles=[_3.fdx, _3.nvd, _3_Lucene50_0.pos, _3_Lucene54_0.dvd, _3_Lucene50_0.doc, _3_Lucene50_0.tim, _3.nvm, _3.fnm, _3.fdt, _3_Lucene50_0.tip, _3_Lucene54_0.dvm]\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flushed codec=Lucene62\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flushed: segment=_3 ramUsed=53.245 MB newFlushedSize=19.837 MB docs/MB=504.115\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DWPT][main]: flush time 1508.68918 msec\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [DW][main]: publishFlushedSegment seg-private updates=null\n[main] org.apache.solr.update.LoggingInfoStream.message(LoggingInfoStream.java:34) [IW][main]: publishFlushedSegment"
"<indexConfig>        \n2<ramBufferSizeMB>${ram.buffer.size}</ramBufferSizeMB>        \n3<maxBufferedDocs>${max.buffer.docs}</maxBufferedDocs>        \n4<mergePolicyFactory class=\"${merge.policy}\"/>        \n5<infoStream>${infostream.enabled}</infoStream>    \n6</indexConfig>"

3. Scaling solr product retrieval

While solr offers both indexing and storing docs, retrieval of documents post searching can cause a toll on performance, which is why we chose to only index docs and retrieve unique IDs on search. In addition, we have our own implementation of a product store which works with aerospike and graph underneath. A retrieved set of unique Ids from solr is then stitched with docs from the product store.

Another big problem that it solves is identifying the no of products that actually require changes. In an ideal scenario, when a complete catalog is being sent for ingestion, the shift in product data is never more than 10% of the total catalog size, which, when once identified, is the only change we push to solr. In contrast, full catalog indexing is only done when a configuration changes.

Stitching it together!

Now that we’ve ironed out all the complications, the most crucial part is stitching it together.

Argo workflow works on two resource kinds, argo workflow template and argo workflow.

 

A complete functional workflow specification looks like this

"#### Workflow Definition\napiVersion: argoproj.io/v1alpha1\nkind: Workflow\nspec:\n  entrypoint: steps ### --1--\n  onExit: cleanup\n  parallelism: 15 ### --2--\n  volumes:\n    - name: fsx-claim\n      persistentVolumeClaim:\n        claimName: fsx-claim\n  templates:\n    - name: cleanup\n      steps:\n        - - name: cleanup\n            templateRef:\n              name: cleanup\n              template: cleanup-template\n            arguments:\n             parameters:\n              - name: name\n                value: cleanup\n              - name: id\n                value: 6131a465-318e-4ed4-825f-add951db6bc9\n                                                                             \n    - name: steps\n      steps:\n        - - name: step1\n            templateRef:\n              name: step-1\n              template: step-1-template\n            arguments:\n              parameters:\n                - name: name\n                  value: step-1\n                - name: id\n                  value: 6131a465-318e-4ed4-825f-add951db6bc9\n            withItems: ### --6--\n              - file1.json\n              - file2.json\n        - - name: step-2\n            templateRef:\n              name: step-2\n              template: step-2-template\n            arguments:\n              parameters:\n                - name: name\n                  value: step-2\n                - name: id\n                  value: 6131a465-318e-4ed4-825f-add951db6bc9  \n\n#### Workflow Template Definition \n### --3--\napiVersion: argoproj.io/v1alpha1\nkind: WorkflowTemplate\nmetadata:\n  name: cleanup\nspec:\n  templates:\n    - name: cleanup-template\n      retryStrategy: ### --5--\n        limit: 3\n      inputs:\n        parameters:\n          - name: name\n          - name: id\n      activeDeadlineSeconds: 1800 ### --8--\n      container:\n        image: <repo>/<image-name>:<image-tag>\n        imagePullPolicy: IfNotPresent\n        requests: ### --7--\n          memory: 400Mi\n          cpu: 500m\n        env:\n          - name: env-1\n            value: value-1\n        command: [\"./cleanup\"]\n        args\n          - \"--name={{inputs.parameters.name}}\"\n          - \"--id={{inputs.parameters.id}}\"\n        volumeMounts:\n          - name: fsx-claim\n            mountPath: /mnt/vol/\n "

For an intelligent workflow, it is essential to split your process into components that can be independent of each other and can be retried internally in case of failure from the last known save point.

We chose to split ours into the following steps.

  1. Analyzer: Runs through the product catalog, checks for any format errors, and emits appropriate metadata for the next step to scale accordingly, like total product count, per product size, etc.
  2. Enricher: Enriches the fields of catalog with various microservices around the system specifically aimed at it. More details about existing fields are added here to enhance the search experience
  3. Indexer: The indexer indexes data to target stores. In our case, we have solr and product store, which function as a search index and product retrieval service, respectively
  4. Resource Manager: The resource Manager, based on traffic stats and off-peak hours, decides how many resources to allocate to a tenant

Argo Syntax

  1. onExit: https://github.com/argoproj/argo/blob/master/examples/template-on-exit.yaml
    lets us configure a mandatory set of steps that will be executed on workflow completion, both in cases of success and failure
  2. parallelism: https://github.com/argoproj/argo/blob/master/examples/parallelism-limit.yaml
    lets us configure how many parallel pods can be spawned within a workflow; this is to keep an upper limit to ensure other workflow jobs do not get affected
  3. Workflow template: https://github.com/argoproj/argo/tree/master/examples/workflow-template – Connect to preview
    The above example explains in detail how to configure them. For frequently used templates that take part in multiple workflows, you create a    library that can be referenced in the original workflow
  4. retry strategy: https://github.com/argoproj/argo/blob/master/examples/retry-on-error.yaml
    You can configure a retry strategy for each step, defining retry action on a failure/error
  5. with items: https://github.com/argoproj/argo/blob/master/examples/loops-dag.yaml
  6. requests: you can configure pod-level restrictions on CPU and memory to be allocated

active deadline seconds: https://github.com/argoproj/argo/blob/master/examples/timeouts-workflow.yaml When specified, a pod will timeout, and onExit steps will be executed

Conclusion

We are actively moving towards an architecture where we try to ensure that any write operation on a live site has minimal effect on production traffic, ensuring seclusion from other active sites. As a result, we see a 3x improvement in indexing time.

Ecommerce is an ecosystem where an immediate reflection of any data change is equally important to ensure 100% site availability and fast response times. A workflow-driven architecture helps achieve this while making scaling up inherently feasible. Book a demo with us.


More To Explore
Grow your business
Schedule a Demo
Know how Unbxd can help you uplift on-site conversions
Talk to PIM expert
Know how Unbxd can help you uplift on-site conversions