Kronos - A Generalized Data Orchestration Engine
Kronos - A Generalized Data Orchestration Engine
Context
At Wayfair from 2015-2020, I spent a significant amount of time leading teams that built and leveraged Kronos, a generalized orchestration engine for data warehouses. The engine ended up having quite significant adoption, though it was too tightly coupled to Wayfair technology to be a viable open-source option. Post Wayfair, I realized that though I had a good sense of how Kronos fit into the broader landscape of data orchestration tools - of the time Oozie, Airflow, Luigi, SSIS, and more - and the tools of the future - DBT, etc - there were many who worked extensively with the platform who might not and could benefit from knowing how what they learned fit into the broader landscape.
This blog is intended for three audiences; those that used Kronos in the past, to help reassure you that you did actually learn generalized concepts that you can reuse; those that see Kronos mentioned in some other blog, and wonder what an HR Company has to do with data processing at Wayfair; and those that work with Kronos today, if it still exists; I donāt know what situation you currently have to deal with, but hereās how it got that way.
A note on the latter - Iāll use the present tense throughout this document, though Kronos may be long-gone.
What is it?
Kronos was a tool for orchestrating DAGs - Direct Acyclic Graphs - like most core ETL tools.
Kronos had 3 core principles that made it somewhat unique:
Automatic Dependency Resolution
Inclusion in a schedule - a DAG with a single ID - was always explicit, but the order of operations within a DAG was always implicit. Users never manually ordered dependencies globally. Concepts like "listeners" or "waitfors" used to poll in other tools were not required for an assset that was managed by Kronos directly; these still existed as interfaces to external systems.
Global SLA Based Prioritization
Tasks were prioritized globally, across all DAGs. Kronos would attempt to optimize landing times and resources with full knowledge of available resources and all work to be done.
Dynamic DAGs as a Core Feature
DAGs were dynamic. It was extremely common for a DAG node to spawn additional tasks within the same DAG, which could lead to some very dynamic execution patterns.
And many that were not unique, but were important to success
- Ran out of source control (SVN, then Git), with native support for reading from many repos tha could be owned/managed by other teams
- Serialized tasks (the ācompilation processā) into an internal database, which avoided expensive run-time evaluation and supported static analysis/resolution of immediate dependencies
- Robust templating support
Why was it?
These principles flowed from the problems Kronos was trying to solve. Kronos was created by a team trying to get rid of SSIS, with the following guiding principles:
Every task should be in source control, and a full software development life cycle should be possibl The barrier to entry should be extremely low - a user should not need to know Kronos specific syntax for any common tasks All kinds of jobs should be supported - from many flavors of SQL, to python, to R, to arbitrary docker containers Explicit reuse of common data assets should be simple, easy, and popular.
And even more specifically, Kronos was built to process large SSAS cubes very fast. At the time of creation, Wayfair had multiple terabyte SSAS cubes - Kronos would process these cubes to infer automatic dependencies and dynamically create per-partition XMLA processing jobs and optimistically scheduled partition updates as soon as the underlying fact was available. The optimization of the SQL before the cubes was a nice side effect that later became the main goal. Scalability In my time, Kronos processed millions of tasks each day, in a mix of daily batch DAGs, microbatch dags every 15, 30, or 60 minutes, and occasional one-off tasks. These tasks were coordinated by a set of active/active (up to an arbitrary amount) of schedulers that prioritized a global queue with a sub-minute SLA, and a larger set of workers that were assigned specific queues to process and might further federate that work to remove systems like databases.
The prioritization algorithm used an initial eager scheduling heuristic based on the sum of downstream execution time within a dag, supplemented by a slower asynchronous prioritization algorithm that optimized for landing times.
For optimally structured SQL processing - where every query that created a table was a subnode that could be eagerly scheduled as soon as possible.
How this looked in practice
In one period, Kronos was responsible for orchestrating a global codependency graph across 5 distinct Vertica clusters (+ many other systems). Each day, Kronos would execute a dynamic graph that would build tables on some clusters, copy them to others, derive new assets, and potentially even copy back to the first clusters for more processing. This was in service of daily updates to a large set of dashboards, OLAP cubes, and other reporting.
A new data engineer - or analyst creating a new table would fire up their IDE and write code to create a new table.
Once they had this working, they would edit this script to template in any variables they wanted - the most common by far being the ādateā the table was scheduled on, or a range - start_date to end_date - associated with the schedule, and commit this to a branch in source control.
At this point, the script could look like:
CREATE OR REPLACE my_fun_table AS SELECT My_id, My_value From my_table Join my_other_table on my_table.id = my_other_table.id WHERE My_table.date between āā and āā
At this point, they could immediately execute the code in Kronos from the branch. Kronos would detect if it needed to recompile the script (such as if it was new), pull it from source control, parse out the dependencies, and run it. Kronos would know that this table, in a given schedule, would need to run after it had updated āmy_tableā and āmy_other_tableā.
To get it scheduled, the user would typically have to merge it into their āmainā branch, which was one of a configurable set of schedulable branches. As soon as their pull request was approved, it would be pulled into any schedule assigned to that folder in Git and run automatically.
What worked well?
There is very little specific syntax required - the user is committing exactly what they could run in an IDE or DB tool, with some light variables that the IDE may support as well.
The user did not have to worry about the upstream tables, when they built, or what their schedule was.
What were the challenges?
The strict coupling to git caused some challenges for the UX and for adopting for analysts.
SQL parsing is hard! Edge cases could cause challenges.
Initially coupled execution environments for python code with the service code + execution environment. Had to teach people not to call exit()
Security! We built a very trusting model to start; this definitely had both security + performance implications and had to be refined over time as we scaled to a large user base
And much, much moreā¦
Comparison to Other Tools
Airflow
We used Airflow extensively, especially for ML cases. Sometimes we would have composite pipelines that would span both platforms. Airflow was the default option for self-contained ML workflows, while Kronos was used more heavily for the larger warehouse graph.
Key differences:
DAG Structure. Kronos used a global dag of dynamically assembled task nodes based on a common derived DAGID, rather than having a DAG be explicitly assigned.
Dependencies. Kronos automatically constructed the dependency graph based on input/output annotations on the operator that related to a URI, rather than assigning dependencies between operators directly. These input/output annotations were inferred automatically for common patterns like SQL.
Resource Management: Kronos included a global resource management and optimization framework, based on an estimated processing time for task nodes given historical values and queue resource modeling.