We walk through the three levels of query optimisation: optimising functions individually; optimising a full query end-to-end; and optimising across multiple queries with reactive programming, incremental computation, and differential dataflow.
Introduction
Software performance matters… is a slightly unpopular view these days:
If you care that computers don’t waste 99.9% of their cycles, you are performance-focused now, I guess.
There are plenty of reasons why you might care about performance.
If you’re an engineer, you might care intrinsically about the beauty of your code: that craftsmanlike quality which permeates every line you write, it’s often tied to how fast your code will run. Most people, though, will care about performance insofar as it’s instrumental to their commercial goals, like serving users or winning market share.
One may be tempted to claim there’s enough software out there waiting to be built, that performance doesn’t matter. At least initially, until someone steals your crown with a vastly improved user experience – there are notable cases where this has happened due to superior performance.
Higher throughput and lower latency are equally important. The easier path is to throw more hardware at your problems, while building extra layers of abstraction. These layers are needed to manage the complexity that comes out of farming computations over a network of machines. Harder, perhaps, is mastering the dark arts of code optimisation. We lose these skills at our own peril, and they will be difficult to recover. So we should pursue both avenues.
Performance definitely matters in data analytics.
The proliferation of libraries for distributed data processing speaks for itself. There’s an ever-growing demand, from all kinds of data practitioners, to scale up the work they do on their laptops to larger datasets – if need be, straight onto a cluster with as little hassle as possible. Alongside, we have an ongoing trend of rewriting libraries to be more efficient – even locally on a single computer.
Money talks. As evidenced by the VC investments pouring into this space, which aim to commercialise managed solutions on top of OSS libraries: Anyscale (Ray), Voltron (Arrow), Coiled (Dask), Ponder (Modin), and so forth.
Ergonomics are just as important, if not more… this much we can hopefully agree on!
It’s no coincidence that most analytics libraries expose their functionality through Python – the de facto language of data science. Suffice to say, performance matters while not sacrificing usability.
This article has a few goals:
Provide a snapshot of mainstream analytics libraries in use today.
Have a look at how various libraries achieve their performance.
Discuss a novel approach that’s quite different from what is out there.
In our overview of the landscape, we’ll “rank” libraries into three tiers:
Libraries that optimise individual functions.
Libraries that optimise queries made up of several functions.
Libraries that optimise across multiple queries (spoiler: no one is doing this seriously).
This isn’t meant as criticism; it isn’t a totem-pole. It takes years to engineer new systems, and libraries that focus on performance benefit from lessons learnt at the lower rungs of this ladder. Indeed, the most mature libraries are the most widely adopted. They are the ones who’ve had the greatest impact and shaped entire industries – we have all benefited enormously from them.
Optimising functions
Performing analytics on a dataframe involves running a sequence of operations on it. These operations may be functions on a single column or row (e.g. mean, median, standard deviation), on a full table (e.g. group-by), or on multiple tables (e.g. merge, join, concatenate).
Data analysis and visualisation is often done using libraries like pandas, which favour an interactive style of programming where you call a function and then decide how to proceed with the result.
Speeding up these workflows requires a different approach than when you are aware of all computations to be performed ahead of time. The most natural way to proceed here is to make each compute function faster.
Apache Arrow is the biggest project in this area, and it’s essentially a re-engineering of pandas. It does a lot more than just provide an in-memory data structure for analytics, but one of its major contributions is to port over (an increasing number of) pandas’ compute functions from Python to C++, while introducing CPU optimisations like SIMD vectorisation.
RAPIDS, a suite of open source software libraries by NVIDIA includes cuDF, a library that implements a pandas-like API for executing compute functions on NVIDIA GPUs. Again, it takes the brute-force route of speeding up individual functions – either by rewriting them directly into CUDA/C++, or by leveraging the Numba compiler to translate user-defined functions written in Python. Incidentally, it builds on top of the columnar memory layout that was standardised by Arrow.
Optimising a full query
The next level of sophistication is to look at a query end-to-end.
A query is made up of several compute functions. It can be represented in a linear fashion when we have a series of operations on a single table. More generally, it’s represented as a DAG when there are multiple tables or results to be joined. This so-called dataflow graph describes the flow of computations end-to-end.
Making computation steps explicit shows their interdependencies, so we know which ones can be reordered or run independently. This affords us the opportunity to come up with strategies to evaluate our query more efficiently.
Compilation
Although each step may be optimised, passing data from function to function incurs a data transfer cost. This cost is especially significant on modern hardware – multi-core CPUs, GPUs, TPUs, FPGAs, etc. – and additionally so on heterogeneous systems that combine more than one kind of hardware accelerator. For data analytics, it’s therefore important to reason about the pipeline of computations in its entirety.
Weld is a rare example of a runtime for analytics that addresses this problem:
Even though individual functions can achieve high performance in isolation, the performance of the combined workflow is often an order of magnitude below hardware limits due to extensive data movement across the functions.
Weld’s take on solving this problem is to lazily build up a computation for the entire workflow, and then optimizing and evaluating it only when a result is needed.
This approach has proved to be a lot more popular in deep learning.
TensorFlow’s XLA compiler is arguably the state-of-the-art at implementing techniques such as kernel fusion – where multiple kernels, i.e. compute functions, are bundled together and their intermediate results kept within CPU/GPU/TPU registers – and is being adopted behind-the-scenes by Google’s next-generation framework Jax as well.
Outside of machine learning, which is a particularly well-funded and popular field nowadays, these compilation techniques are yet to gain widespread adoption.
In principle, they ought to provide fruitful results in scientific computing and data analytics, at the very least. In practice, progress in these adjacent areas has been slower and even an academic project as promising as Weld is no longer in development. The author’s doctoral thesis, though, does a fantastic job at articulating how powerful these tricks are for achieving fast analytics on modern hardware.
To be fair, there are practical reasons to eschew code-generation in favour of simpler approaches when building production systems: the developer experience won’t be as great, without significant investment into debug tooling; and you are blurring the abstraction boundaries between functions when you fuse them together, making it harder to observe what’s happening inside your code.1
Query optimisers
The general idea of optimising a query end-to-end, nonetheless, has borne fruit in data analytics.
It dates back to earlier work on query optimisation for relational databases.
For example, a typical SQL query optimiser will perform filter/predicate pushdown whereby the DAG of computations is rearranged so that data is filtered immediately, and we don’t unnecessarily read a full dataset into memory when it isn’t needed.
Parallel and distributed execution
Despite poor progress in vertical performance – we are nowhere near squeezing the resources that modern hardware offers – much work has been done in data analytics that allows us to achieve decent horizontal performance by scaling out workloads.
Once we have modelled our query as a DAG, we can identify independent paths to be executed in parallel or distributed across a cluster. Plus, we can simply avoid loading data and executing code paths until they are needed.
Progress is a lot more mature here, and numerous libraries have spawned out of open-source data science efforts.
In the Python community, Dask is perhaps the best-established library for parallelising data analytics, whereas Ray is a distributed execution framework that started off in reinforcement learning but now has more general coverage of analytics too. They both support Nvidia RAPIDS, with Dask providing integration and some of Ray’s machine learning libraries playing nicely with it. You can also use Dask on Ray, to run Dask’s collection of analytics via Ray’s task scheduler – which is a bottom-up scheduler rather than a centralised one.
Modin is interesting in that it scales up the (exact and almost complete) pandas API to out-of-core dataframes, though rather than building up a DAG, it does so by eager evaluation (as it caters to an audience of data scientists familiar with pandas). It acts as a drop-in replacement for pandas, backed by Dask or Ray as its compute engine. The authors took a highly principled approach, by developing a theoretical foundation for dataframes and consolidating pandas’ myriad of functions into a smaller core.
Vaex aims to be a similar replacement for out-of-core dataframes, but it doesn’t follow the pandas API quite as closely. Like the other libraries mentioned here, it performs lazy evaluation when streaming in data and evaluating expressions. Its core is written in C++, which shows great care for vertical performance too – although more along the lines of optimising individual functions (as opposed to end-to-end optimisations, e.g. kernel fusion).
There are also a few Rust libraries, such as Polars which is based on Arrow’s standardised columnar layout. DataFusion and Ballista tackle multi-threaded and distributed execution respectively – the latter makes use of Arrow’s protocol for serialising data over a network. Outside of the Arrow ecosystem, noteworthy mentions go to Constellation and Amadeus, which are experimental frameworks for distributed computing – this last one specifically for data analysis.2
Optimising multiple queries
In practice, a query is hardly ever run alone.
Data analysis is often carried out in an iterative fashion – we repeatedly run new queries as we explore our data. Or we might track some analytics over time, re-running the same queries whenever the underlying data gets updated.
Shouldn’t we then be optimising across multiple queries?
Reactive programming
The most common way to optimise for multiple queries is reactive programming.
Reactive programming is a widespread paradigm where computations are laid out in a DAG and the result of every compute step is cached – when we run a new query, we start from the last intermediate result that’s still valid and only need to recompute steps from there onward. It’s the same principle behind spreadsheet software, where changes to a cell propagate downstream only to those cells that depend on it.
The worst-case scenario is when new data streams in – because the underlying data changes, we must recompute our entire pipeline of operations. It can be seen as a special case of modifying a query, with new steps occurring at the start of our pipeline (i.e. streaming data is equivalent to a sequence of insert-row operations on a table).
Incremental computation
The best-case scenario is an incremental computation at the end of our pipeline – this is an update to our current state, which we can do without storing any intermediate results.
Wouldn’t it be nice if we could always end up in this best-case scenario?
Ideally, we’d keep only the current state at the end of our pipeline, and whenever a new piece of data streams in or we change a step of our query, we’d run a minimal incremental computation on this latest state.
Such an approach would be incredibly powerful: we’d no longer have to store vast amounts of intermediate states in anticipation of the next query (only the current state of our table); and it would save us from re-running what can be quite a large number of expensive computation steps – we would, instead, bypass them with a single computation at the end.
Differential dataflow
This is known as differential dataflow – and turns out, there’s a way to achieve it.
At Tably, we are developing differential dataflow based on Operational Transformation (OT).
This is a technique from real-time collaborative editing. It’s normally used in online text editors like Google Docs, where local copies of a document can be seen as computation pipelines, and each step as a character insert/delete operation. However, there’s no reason why an OT algorithm can’t work with operations on forms of data other than text.
OT works by transforming an operation such that chronologically it’s applied last but with the same effect logically as if it had occurred at its original position in the pipeline. This involves transforming it against all the other operations that would have occurred after it.
Similarly to Git, OT allows us to rebase operations to the tip of a branch – i.e. our pipeline – although without having to manually resolve conflicts. An appropriately designed OT algorithm will leave our existing history intact, so rebased operations get transformed into the minimal incremental changes we require on the current state.
Think of your table as sitting at the end of a history of operations that’s been materialised; a new query is some alternate history that branches off before this materialised view. With OT, you rebase each operation from the new branch on top of your existing branch, then you process these minimal incremental changes to update your table efficiently.
It should be noted our approach isn’t the only one. Materialize is a new database that supports streaming SQL analytics based on differential dataflow – in fact, its author originally coined the term – and it isn’t based on OT techniques.
No free lunch
Our approach is a compromise where we trade off computations on data for computations on computations. Rather than spend CPU cycles crunching our data through the existing computation DAG, we spend cycles transforming new nodes through the existing DAG into minimal incremental changes. Then we crunch our data through these more efficient steps.
As you might imagine, our technique scales nicely with larger datasets.
This is because a dataset, no matter how large, only needs to run through the minimum computations required, and deriving these computation steps is independent of the dataset size. It depends solely on the number of transformations: the cost of each operational transformation is independent of the dataset size; the number of transformations is proportional to how many nodes you are transforming (i.e. length of the new query), multiplied by how many nodes you are transforming them against (i.e. length of the old query). So you pay a minimal cost that depends on how much the new query differs from the old one.
Still, “big data” isn’t our primary use case.
If you’re curious about why, check out our vision for the future of data.
This post was originally published on tably.com.
Thanks to Wes McKinney for pointing out Databricks’ recent Photon paper.