This article is the fourth of a four-part series on the Envision virtual machine’s inner workings: the software that runs Envision scripts. See part 1, part 2 and part 3. This series doesn’t cover the Envision compiler (maybe some other time), so let’s just assume that the script has somehow been converted to the bytecode that the Envision virtual machine takes as input.

The previous articles mostly examined how individual workers executed Envision scripts. However, both for resilience and for performance, Envision is actually executed across a cluster of machines.

Each layer in a worker communicates with the same layer in the other workers, or with other layers in the same worker. This ensures that network communication can stay a private implementation detail of every layer.

At a low level, each worker opens two TLS connections to each other machine in the cluster, and the communications from the various layers are multiplexed through these two connections (one connection is used for short messages, the other for large data transfers).

Abstract distributed execution

Control Layer

This layer is used by the scheduler to assign and unassign missions to the workers and involves no worker-to-worker communication. The main messages of this layer are:

  • Scheduler asks worker to start working on a mission.
  • Scheduler asks worker to stop working on a mission.
  • Worker tells scheduler that it has encountered a catastrophic error during the execution of a mission (usually a non-deterministic issue, such as “NVMe drive caught on fire”, which means the same mission can be attempted again in the future or on another worker).
  • Worker gives the scheduler statistics about its current state: list of missions, size of the frontier of each mission’s DAG, total number of thunks left to be executed in each mission’s DAG.

The scheduler uses these statistics to decide when to re-assign missions. The actual rules for doing so are quite complex, as they depend on priority rules, fairness between multiple tenants and between scripts of the same tenant, and overall load of the cluster at that point, but the general trend is that missions with a large enough frontier can be spread to multiple workers, so long as those workers are not already overloaded. Given the same amount of work to perform, it is more efficient to run four missions on a single worker each, than to spread all of them over all workers.

Execution Layer

Each worker keeps track of which thunks it is currently executing, and it broadcasts this list to the other workers every time it schedules a new thunk1. This ensures that, outside of the very short window related to network latency, two workers will not start running the same thunk.

Of course, if a worker stops sending these updates (for instance, because it has crashed or become disconnected from the rest of the cluster), its peers will treat any list older than a few seconds as stale, and will allow themselves to run those thunks.

Metadata Layer

Each worker tries to keep a copy of the full metadata, but does not actually synchronize. We chose to provide no guarantee that all workers agree on the exact same metadata, and instead work with eventual consistency guarantees. This makes distributing the metadata layer the most challenging in terms of design2.

The eventual consistency of this layer follows three main rules:

  1. Every local change to the metadata layer is immediately broadcast to all other workers. This broadcast may fail, and will not be attempted again.
  2. Remote changes received from other workers are merged into the local metadata layer, based on a monotonic progression3: a “no result” value for a thunk can be overwritten by a “checkpoint” value (meaning the thunk has started, but not finished, executing), which can be overwritten by an “alias” value (meaning the thunk has returned a DAG to be executed in its place), which can be overwritten by a “result” value (which can either be a successful result with its associated atoms, or a fatal error).
  3. Whenever another layer sends a network response based on a value in the metadata layer, the metadata layer also broadcasts that value again.

The third rule is designed to force a level of synchronization when it is actually relevant. For instance, consider the following sequence of events:

  • The scheduler asks a worker to execute a mission (through the control layer)
  • The worker executes the mission and broadcasts the result (through the metadata layer), but the message is lost en route to the scheduler.
  • The scheduler notices that the worker is no longer running the mission (through the control layer), and asks it to run it again.
  • The worker observes that the mission’s thunk already has a result in the metadata layer, and does nothing, because nothing needs to be done.

This is a deadlock where the scheduler and the worker disagree on the state of a thunk in the metadata layer (worker believes it is done, scheduler believes it is not). The third rule resolves this by deciding that since the worker’s response of «I no longer works on this mission» and is based on the worker’s observation that the thunk has a result, then the metadata layer should broadcast this information again. The deadlock is then resolved:

  • The worker’s metadata layer broadcasts the thunk result again, and it is received by the scheduler.
  • The scheduler reacts to the appearance of a result for a mission’s thunk, by flagging that mission as complete, and notifying the client that requested that mission.

Atom Layer

The workers combine their atom layers to create a distributed blob store, where each atom can be requested by its identifier—the 128-bit hash of its contents, created with SpookyHash. This is not a distributed hash table (DHT), because that would provide the wrong trade-offs: in a DHT, finding an atom would be quick (given its hash, the identifier of the worker holding it can be computed with a simple function), but writing an atom would be slow (it would need to be sent from the machine that computed it, to the machine that is expected to hold it given the DHT’s current layout). Given that most atoms are expected to be consumed on the same machine that produced them, this is wasteful.

Instead, whenever a worker requests an atom from its own atom layer, it first looks for that atom on its own NVMe drives. If it is not found, then the other workers are queried for the existence of that atom. This is the largest performance challenge of Envision’s distributed design, since these queries must be completed as quickly as possible, and a complex timeout strategy is needed to deal non-responsive workers: wait too long, and you’ve wasted seconds waiting for a response that never came; give up too soon, and you will need to re-compute an atom that could have been downloaded from another worker.

To help with this, the atom layer also batches multiple requests together, to ensure that all other workers keep a full pipeline of requests they need to answer, and to more easily detect when a worker’s response times suddenly spikes.

Once at least another worker has confirmed the existence of the atom on its disk, a second request is sent to download the atom. Such download requests tend to be very spiky, since many thunks request their atoms first, and then start processing their contents. Because of this, the atom layer is aware that there is a single download queue for every pair of workers, and does not panic if a given atom request fails to receive its first byte for several seconds (if the queue is full and other atoms are receiving their bytes, then there is nothing to worry about). In a sense, the timeout is not at the atom request level, but at the level of the entire layer.

In addition, there are two optimizations applied to the transfer queue:

  1. Each request specifies what thunk needs the data, so that the sender will try and group together requests from the same thunk (the faster a given thunk is unblocked, the faster it will be able to start processing its inputs).
  2. When a thunk’s execution is canceled (because of an error, because of a change in priority, or because it is discovered that another worker has already finished it), the atom layer communicates this cancellation so that all of that thunk’s requests can be purged from the download queue.

A typical worker will be sending out data in bursts of 1GB/s, usually covering 7GB of data per burst.

Logging Layer

This layer preserves additional information about the state of execution, so that it can be reviewed afterwards to investigate issues or measure performance. It is very detailed, containing information such as which thunks were executed, how long they took to run and what kind of result they produced. Important events, such as the construction of a new DAG (including the serialized DAG itself), or the discovery that an atom is missing, are also logged. In total, several gigabytes are produced each day for every worker.

To minimize the performance impact, each worker writes out the accumulated logs every 60 seconds, or whenever 4 megabytes are accumulated (which often happens when there is a burst of activity). This is written to an Azure Blob Storage block blob4, and each worker has its own dedicated blob in order to avoid having to support multiple writers on a single blob.

We then have other machines (outside the Envision production environment) that can read these log blobs after the fact, and compile detailed statistics about what happened on the cluster.

Shameless plug: we are hiring software engineers. Remote work is possible.


  1. This might seem wasteful in terms of bandwidth, but consider that each thunk identifier weighs 24 bytes, and there are up to 32 thunks per worker, so each update only takes 768 bytes—less than a TCP packet! ↩︎

  2. Although, in terms of performance, the atom layer is far more challenging. ↩︎

  3. The metadata layer is essentially a huge vector clock, where the clocks are kept per-thunk rather than per-worker. ↩︎

  4. Why not Append Blobs? Well, both Block Blobs and Append Blobs have major performance problems when reading a file composed of many small writes: read performance drops from ~60MB/s for a normal blob, to under ~2MB/s! A 5GB log blob takes around 40 minutes to read at that rate. We have contacted Microsoft about this issue, but there are no plans to fix it. To work around this issue, we rely on the fact that a Block Blob can be manually re-compacted (take the last 1000 small writes, erase them from the blob, and write them back again as a single large write), whereas an Append Blob cannot be modified in this way. ↩︎