Tech stories
Abstracting ML pipelines at Adyen
By Joshua Chacksfield, Machine Learning Engineer
This article will explain how we alleviated some of the issues encountered while building ML pipelines, enabling our users to scale up training to hundreds of models per week, resulting in faster iteration and experimentation.
Introduction
At Adyen, ML practitioners design complex Python pipelines to transform datasets, train models, and monitor their performance. Pipelines run on your typical big data setup using HDFS and PySpark to transform petabytes of data into models and insights. While powerful, distributed data processing tools can be unreliable, and ML pipelines take days to complete. Writing one script and hoping it succeeds from data wrangling through to model registration is not feasible. Not only does it not scale horizontally, but reliability and maintainability suffer. A failed pipeline can delay model deployments and potentially result in large amounts of lost revenue for Adyen and its customers. Models trained per day per team:
Pre-solution
The Data Solution at Adyen operates a single highly available Airflow deployment per environment (staging, prod, etc.), which schedules jobs from multiple Python packages; internally, we refer to these packages as applications. To read more about our HA setup, check out this other blog post: Apache Airflow at Adyen
These applications should not have to share requirements; different teams should be able to use different versions of the same library if they want. This results in every team’s packages and applications being installed into the different virtual environments separate from the one that Airflow is running in. Typical advice is to use the BashOperator or VirtualEnvironmentOperator to execute tasks in different virtual environments. This is similar to the original solution at Adyen, which was to specify the path to the function you wished to invoke and dynamically resolve which virtual environment and function to execute. See below for an example:
While operators provide access to a large ecosystem of open-source airflow extensions and offer an easily extensible interface, there are a few problems with using them:
Everything is manual: you look up the path to the function you want to reference, remember the inputs and outputs, and wire up data flows between operators.
There is no IDE support, type checking, or validation of the inputs for your string-referenced function.
Understanding what code is being executed by a DAG requires looking up the string reference from each operator in your codebase.
Passing (large) data artifacts between tasks is not available by default and requires using XCom and your serialization.
There is a lot of code duplication. Since it's hard to discover where a function is used, most DAGs redefine every branch of their pipeline.
Tight coupling with Airflow; the DAGs can't be run elsewhere.
End-to-end pipeline testing requires Airflow to be installed or run locally while developing or in CI.
Too much coupling exists between the configuration and the pipeline’s code, opening up the opportunity for bugs and adding friction to the maintenance of the pipelines, especially when these pipelines grow in size. Below is a small-ish pipeline for training a model.
The requirements were pretty clear; we needed a way to declaratively specify pipelines from different applications and load the pipeline simultaneously in our Airflow instance to construct a DAG, while still being testable and easy to use.
The Solution
Enter lib_workflow, a pipelining library that allows the declaration of Task’s representing python jobs that can be tied together inside a Workflow. The Workflow automatically tracks the jobs’ data dependencies while at runtime checkpointing the outputs to a distributed store, such as HDFS or S3.
Separating the pipelines into separate jobs allows quicker iteration, easily swapping out different components and functions while encouraging more efficient resource usage. The automatic checkpointing of data enables efficient reuse of the output of jobs, retrying of flaky jobs, quicker bug fixing, and adhoc ML tasks.
Below is an example pipeline of training a classifier on the iris dataset using sklearn.
A function that is decorated with the @task decorator creates a TaskFunction object, which can be executed like a normal function. The behavior of a TaskFunction changes when the object is called within a WorkflowContext. Instead of being executed, a Task object is created, which stores the function reference, its inputs, outputs and extra runtime metadata. The WorkflowContext is “opened” when calling the functions inside a @workflow decorated function or a Workflow context manager.
So, what can you do with this training_workflow function? Once this function is called, the metadata mentioned above will be collected and stored into a workflow object. This object can be executed locally for debugging and testing, running the tasks sequentially in the same process and saving the outputs to a dictionary.
It is also possible to submit it to an orchestrator to be executed in parallel with persistent storage.
Passing Data
Airflow does not have much support (apart from XCom) for passing data between different tasks, which can lead to a lot of boilerplate saving/loading data to/from disk between each function. We opted to abstract away the passing of data between tasks. As mentioned above, when a task is called within the context of a workflow, the code does not run, but a Task object is constructed, and an Artifact is created for each output of the function. These Artifacts are returned in place of the real data, and downstream functions can be called with these Artifacts as inputs, which will set up the data flows inside the workflow.
When using an orchestrator to execute a workflow, individual tasks may run on different machines; each task needs to know where the data it's consuming is saved and be able to load that data from disk. To achieve this, there is the concept of an Artifactory, a key-value store pointing from an Artifact to the data on disk. When a task is executed, some wrapper code loads the Artifacts consumed by the Task from disk and pushes them in as arguments. After the function is executed, the wrapper receives the outputs of the function and writes them back into the store.
Different Orchestrators
Instead of writing an orchestrator and dealing with access management, secrets, allocating resources, scheduling and retrying tasks, we decided to support two schedulers/orchestrators: Airflow and ArgoWorkflows. Airflow is ideal for statically defined jobs that do not change often and need to run on a schedule, and ArgoWorkflows is more suited to workflows that change at a higher rate, and are submitted on an ad-hoc basis. The workflow object has helpers to convert the workflow into a format consumable by a specific orchestrator.
The ability to convert the workflow to JSON plays a large role in enabling the speed up for our ML teams. To speed up the release of new ideas, we opted to ship the serialized workflows outside of the normal release process. Since these serialized workflows are effectively the configuration of a pipeline with no source code, it was deemed acceptable to allow them to be released directly to a specified cluster during a post-merge pipeline. A manifest for each workflow in the codebase is created and uploaded to an object store. These manifests are cached locally to Airflow and, during the DAG parsing, deserialized and converted into a DAG object.
The support for conversion to an ArgoWorkflows manifest also allows ML practitioners to submit their workflow to our Kubernetes clusters from JupyterLab or some other workspace, allowing for quicker iteration than waiting for branches to be merged and deployed from the deployment pipelines.
You might wonder what can be changed via a workflow if the source code is not being updated during a config push. Since every input argument that is not an Artifact (i.e., an output from a previous task), is serialized as part of the config, a significant amount of configuration of a Workflow can be pushed as a config change, for example hyperparameters of a model or data sampling technique.
Execution Engines
Not all tasks are built the same; for example, Spark or Dask jobs need to configure a cluster for the job to run on or maybe a Task should be mapped to use a custom entry point that boots a Kubernetes Job. To facilitate this, Tasks expose an extensible execution_engine which collects the configuration for a specific runtime. The execution_engine maps to and configures the relevant “runner” in the orchestrator; for example, the Spark execution_engine maps to the SparkSubmitOperator when running on Airflow or to a SparkApplication resource when running on Kubernetes with ArgoWorkflows.
Execution engines offer an easy way to extend the library, adding entry points to customise the Task’s wrapper and runner configuration. For example, an execution engine could request a GPU on Kubernetes, a whole spark cluster, or additional libraries to be installed for that particular task.
Conclusion
Building and adopting an in-house pipelining library has some negative aspects. There are requests for new features, supporting multiple orchestrators, and juggling bare-metal and Kubernetes deployments, all of which require a significant amount of attention. Any addition to the framework inevitably adds more complexity to the API design, which complicates the aim of trying to keep it extendable, maintainable, and easy to adopt by new teams.
There is also a fairly steep learning curve for people new to the framework, as it is required to keep the delayed execution of the pipeline in mind at all times. It feels vaguely comparable to learning early versions of TensorFlow, which had a similar mental overhead of constructing a pipeline in code that will run at a later date on different hardware, restricting how much code can be dynamic (for-loops, branching, etc.), and limiting the use of global variables and other captured state like lambdas. There is also the added consideration that the code constructing the pipeline will often run in CICD pipelines rather than on the production cluster, adding another layer of complexity.
However, by taking this approach, Adyen drastically increased the readability and maintainability of ML pipelines while adding many features, such as caching task executions and automatic resource tracking. We could transition to using the same codebase on ArgoWorkflows and Airflow, increasing the speed of deployment from one week to a few minutes for experimentation workloads. We believe having an abstract workflow representation that can span multiple orchestrators offers much value to the developer experience while avoiding “vendor lock-in”. The benefit of serializing the pipeline and referencing a container (docker image) is that it allows us to run any workflow anywhere.
Fresh insights, straight to your inbox
By submitting your information you confirm that you have read Adyen's Privacy Policy and agree to the use of your data in all Adyen communications.