# Envision VM (part 2), Thunks and the Execution Model

This article is the second of a four-part series on the Envision virtual machine’s inner workings: the software that runs Envision scripts. See part 1, part 3 and part 4. This series doesn’t cover the Envision compiler (maybe some other time), so let’s just assume that the script has somehow been converted to the bytecode that the Envision virtual machine takes as input.

Like most other parallel execution systems, Envision produces a directed acyclic graph (DAG) where each node represents an operation that needs to be performed, and each edge represents a data dependency where the downstream node needs the output of the upstream node in order to run.

The nodes are called thunks, after the very similar concept from Haskell and other languages with lazy evaluation.

Examples of thunks that can be found in a typical Envision script:

• Parse an input file in .xlsx, .csv or .csv.gz format, and convert it to a columnar representation that will be used by the rest of the script.
• Load a range of lines $M..N$ from an individual column; this column can be taken either from the result of parsing an input file (see above), or from Lokad’s own .ion columnar file format optimized for storage in Microsoft Azure Blob Storage.
• Given a range of lines $M..N$ from a very large vector $A$, a smaller vector $B$, a projector $\pi$ which associates every line in $A$ with a line in $B$, and a function $f$, compute $f(A[l], B[\pi(l)])$. This is called a map-side join.
• Use Monte Carlo simulation to estimate the average, variance or distribution of the result of a random process. The result of multiple Monte Carlo thunks, executed in parallel, can then be combined by a final thunk.

In general, a thunk is expected to take between a few hundred milliseconds (for small-scale data manipulation) and a few minutes (for Monte Carlo simulations or gradient descent). This is a strong assumption: the Envision virtual machine is allowed to have significant overhead for the evaluation of each thunk, on the order of milliseconds. A script should produce a small number of thunks (between 1 000 and 100 000), with each thunk performing a fairly large unit of work.

### Referential transparency

Thunks are pure functions: they are deterministic and cannot have side-effects. They operate by reading their immutable inputs, and returning the same value on every execution. This important property helps in many ways:

1. Since the evaluation of a thunk has no side-effects, it will not interfere with the evaluation of another thunk, and so all thunks can be run concurrently (so long as their inputs are available) on several CPU cores, or even distributed on several workers. The Envision virtual machine keeps track of each script’s frontier (the set of thunks that can be executed because all their inputs are available), and picks a new thunk from it whenever a CPU becomes available.
2. Conversely, it’s possible to evaluate thunks one by one and reach the same result. For instance when the cluster is under heavy load, when cluster workers are unavailable, or when reproducing a script’s evaluation on a developer’s workstation in order to investigate an issue.
3. Two workers executing the same thunk is not an error, just a waste of time. As such, it’s not something that must be avoided (with all the difficulty involved in synchronization on a distributed system), it is enough to ensure that it does not happen too often1.
4. If the result of a thunk is lost (due to a worker crash or network unavailability), it is possible to run it again. Even if several thunks are lost, the original DAG remains available, and can be used as a data lineage to re-compute the needed values.

However, this also means that thunks cannot communicate with each other (for instance, by opening a channel and transmitting data between them). This restricts the available strategies for concurrency and parallelism.

### Thunk production

In many distributed computation frameworks, the execution DAG is produced outside the cluster (for instance, on a scheduler machine), and then portions of the graph are pushed to individual workers for execution. Quite often, the DAG must be produced in several steps: for example, a join operation can be optimized differently depending on the size of the tables2, and it is not always possible to know the size of a table before actually evaluating its contents, so it is worth waiting for the table sizes to be known before generating the DAG portion that performs the join. This means that a back-and-forth will occur between the scheduler and workers, where the scheduler will produce additional tasks based on the results from the workers.

This turns the scheduler into a single point of failure, and allowing multiple active schedulers, or a failover scheme between an active and a passive scheduler, would add quite some complexity. For Envision, our resilience target was instead to ensure that a single worker is able to compute an entire mission, without involving the scheduler. As such, even though a ten-minute scheduler downtime would prevent new missions from being submitted, it would not interrupt already-started missions from completing. However, this means that workers should be able to generate new portions of the DAG without help from the scheduler.

We achieve this by letting a thunk return a new thunk instead of a value - to reuse more Haskell terms, building the DAG involves monads instead of just functors. This new thunk has its own parents, which may be new thunks as well, and so on, forming a complete new DAG. In practice, the new DAG often shares many of its thunks with the old DAG, because it needs the results of those computations.

When submitting a new mission to the cluster, only a single thunk is submitted (containing the script to compile and execute, and the references to all the input files). This thunk then produces the initial execution DAG, which will grow a few more times until it becomes complete.

### Merkle graph

In order to be transmitted over the network, thunks are also serializable, using a custom binary format designed for having a low footprint. On a DAG with 100 000 thunks, a 10MiB budget can only support 104 bytes per thunk!

Support for binary serialization allowed us to turn the DAG into a Merkle DAG, where each thunk has an identifier determined by the binary contents of that thunk and all of the thunk’s ancestors3. We call this identifier the thunk’s hash.

Using a Merkle DAG has two main benefits. First, thunks that perform the same operation are automatically merged because, having the same contents and ancestors, they also have the same identifier.

Second, it’s possible for two scripts to share some of their thunks — maybe they read the same input files and apply the same operations to them, or maybe a Supply Chain Scientist is working on the script, changing a few lines at a time between executions. When this happens, the outputs of the shared thunks can be reused if they are still available in memory, greatly reducing the script’s execution time. Being able to edit and re-execute a script creates a short feedback loop that helps the Supply Chain Scientists’ productivity.

### Local thunk scheduling

We will go into more details in a future article about how thunk execution is distributed across several machines in a cluster. For now, just consider that each worker holds a copy of the entire DAG, knows which thunks have already been executed (and where to find their results), knows which thunks are currently being executed by the cluster, and is responsible for scheduling additional thunks to run on its 32 cores. This local scheduling is done by a single-threaded service called the kernel (which is not to be confused with the Linux kernel). The kernel, as well as the worker threads that will actually execute the thunks, all run in the same .NET process in order to share managed objects with each other.

Finding a new thunk is nearly instantaneous, since the kernel keeps a frontier of ready-to-execute thunks for each DAG, and only needs to pick one at random. Most of the kernel’s time is spent instead on updating the frontier whenever a thunk starts running (it needs to leave the frontier), finishes running (its descendants may join the frontier, depending on whether it has any non-executed parents left), or becomes lost due to the worker holding its result becoming unavailable (its descendants must leave the frontier, but the thunk itself may be added back to the frontier if its own parents are still available).

Tending to the frontiers is work with a very high variability, it can take between a microsecond and several seconds—over a million times longer! For example, a shuffle step has a layer of $N$ thunks that read the outputs of another layer of $M$ thunks. Each downstream thunk reads the outputs of all $M$ upstream thunks, resulting in $M\cdot N$ edges in the DAG. For $M = N = 1000$ (a very likely degree of parallelization, when dealing with billions of lines), that’s a million edges. If left unchecked, this phenomenon can cause the kernel to pause for seconds at a time, during which no new thunks are scheduled to run, and so up to 32 cores remain idle4.

We resolve this issue by introducing virtual nodes to the DAG to represent this kind of connection between layers. The virtual node has $M$ inputs (one for each thunk in the upstream layer) and $N$ outputs (one for each thunk in the downstream layer). This decreases the number of edges to $M + N$, which is significantly more manageable!

### Low-granularity code generation

The first versions of Envision, in 2013 and 2014, operated on the basis that each vector operation is performed by a single thunk. When executing T.A / (T.B + 1) there would be thunk one for broadcasting 1 into table T, thunk two for adding T.B to the result of thunk one, and thunk three for dividing T.A by the result of thunk two. This had the benefit that we could easily implement each operation as a C# function, executed as a single thunk, which is an excellent idea during the early implementation of a DSL. It has, of course, the downside that unnecessary amounts of memory are consumed (thunk one would produce a vector of millions of copies of the value 1), and memory takes time to be written and read back.

It was imperative to have thunks that evaluate several operations in succession, instead of having one thunk for each operation.

Many SQL databases operate on variations of the volcano model, where the query is turned into a tree of iterators. Each iterator acts as an impure function that returns the next value in the iteration every time it is called, and can recursively call other iterators. In this model, broadcasting a scalar into a table would be a constant-returning iterator, adding or dividing two vectors would hold references to two iterators, and reading from a vector would increment through it:

Func<float> BroadcastScalar(float x) = () => x;
Func<float> Add(Func<float> x, Func<float> y) = () => x() + y();
Func<float> Div(Func<float> x, Func<float> y) = () => x() / y();
Func<float> Read(float[] v) { var i = 0; return () => v[i++]; }


Compiling a query to the volcano model consists of building the tree of iterators:

Div(Read(A), Div(Read(B), BroadcastScalar(1)))


This has the advantage that no memory allocations are performed for the intermediate vectors. However, the overhead of calling functions dominates the simple arithmetic operations that those functions perform.

Because of this, in 2015 Envision moved to just-in-time code generation. The principle is quite similar to Apache Spark’s Tungsten execution engine: compile the operation T.A / (T.B + 1) to a function in an imperative language.

float[] GeneratedFunction(float[] a, float[] b) {
var result = new float[a.Length];
for (var i = 0; i < a.Length; ++i)
result[i] = a[i] / (b[i] + 1);
return result;
}


The target we use for this compilation is .NET IL, the bytecode language used by .NET for its assemblies. This lets us leverage the .NET JIT compiler to produce optimized machine code from our generated IL.

This runtime code generation proved to be the biggest hurdle when migrating Envision from .NET Framework to .NET Core in 2017. Indeed, while .NET Core supports the same System.Reflection APIs as .NET Framework for producing and running IL at runtime, it does not support saving that IL to disk as a DLL. While that is not a requirement for running Envision, it certainly is a requirement for developing the Envision compiler! System.Reflection does nothing to prevent invalid IL from being created, and only reports a rather useless InvalidProgramException when a method containing invalid IL is executed. The only reasonable approach to investigating such issues is to save an assembly file and use ILVerify or ILSpy. Because of this requirement, we actually kept targeting both .NET Framework and .NET Core for two years—production would run on .NET Core, and IL debugging would be done on .NET Framework. Finally, in 2019 we published our own library Lokad.ILPack as a replacement for this feature, and migrated away from .NET Framework.

This concludes today’s analysis of how Envision executes scripts. In the next article, we will discuss how intermediate results are stored.

Shameless plug: we are hiring software engineers. Remote work is possible.

1. Workers broadcast to the cluster whenever they start a new thunk, and avoid running thunks that other workers have claimed. There remains the rare case where two workers start the same thunk at nearly the same time; we avoid this by having each worker pick a random thunk from the frontier, and having the scheduler reduce the number of workers when the frontier shrinks too much. This means duplicate execution is not impossible, but very unlikely. ↩︎

2. A costly shuffle join is used for two large tables, and the cheaper map-side join is used when one of the tables is small enough to fit in memory. ↩︎

3. For thunks without ancestors, such as those that read from input files, we include the hash of the contents of those input files inside the thunk’s body. This ensures that if two thunks read the same input file, they will have the same hash, and if they read two different input files, including two different versions of the file at a given path, then they will have different hashes. ↩︎

4. This also has an effect on serialization size. Indeed, if all edges are represented in the serialized DAG, even with just two bytes per edge, it already represents 2MB of data! ↩︎

#### ‹ PREVIOUS

Envision VM (part 1), Environment and General Architecture

#### NEXT ›

Envision VM (part 3), Atoms and Data Storage