Consistent Hashing


Daniel Lewin was an Israeli-American mathematician and entrepreneur. He was aboard the American Airlines Flight 11, which was hijacked by al-Qaeda during the September 11 attacks.

Tom Leighton is a professor (on leave) of Applied Mathematics at CSAIL @ MIT and an expert on algorithms for network applications.

Together, Lewin and Leighton founded the company Akamai, which was a pioneer in the business of content delivery networks (CDNs) and is currently one of the top players in the segment. One of the key technologies employed by the company was the use of consistent hashing, which we’ll present in this post.


One of the main purposes of the CDN is to be a cache for static data. Due to large amounts of data, we cannot possibly store the cache in a single machine. Instead we’ll have many servers each of which will be responsible for storing a portion of the data.

We can see this as a distributed key-value store, and we have two main operations: read and write. For the write part, we provide the data to be written and an associated key (address). For the read part, we provide the key and the system either returns the stored data or decides it doesn’t exist.

In scenarios where we cannot make any assumptions over the pattern of data and keys, we can try to distribute the entries uniformly over the set of servers. One simple way to do this is to hash the keys and get the remainder of the division by N (mod N), where N corresponds to the number of servers. Then we assign the entry (key, value) to the corresponding server.

The problem arises when the set of servers changes very frequently. This can happen in practice, for example, if servers fail and need to be put offline, or we might need to reintroduce servers after reboots or even add new servers for scaling.

Changing the value of N would cause almost complete redistribution of the keys to different servers which is very inefficient. We need to devise a way to hash the keys in a way that adding or removing servers will only require few keys from changing servers.

Consistent Hashing

The key idea of the consistent hashing algorithm is to include the key for the server in the hash table. A possible key for the server could be its IP address.

Say that our hash function h() generates a 32-bit integer. Then, to determine to which server we will send a key k, we find the server s whose hash h(s) is the smallest that is larger than h(k). To make the process simpler, we assume the table is circular, which means that if we cannot find a server with hash larger than h(k), we wrap around and start looking from the beginning of the array.


Big blue circles are servers, orange circles are keys. Right: If we remove server S3, only entries corresponding to keys K5 and K4 need to be moved / re-assigned.

If we assume that the hash distributes the keys uniformly, including the server keys, we’ll still get a uniform distribution of keys to each server.

The advantage comes to when adding and removing servers to the list. When adding a new server sx to the system, its hash will be in between 2 server hashes, say h(s1) and h(s2) in the circle. Only the keys from h(s1) to h(sx), which belonged to s2, will change servers, to sx. Conversely, when removing a server sx, only the keys assigned to it will need to go to a different server, in this case the server that immediately follows sx.

How can we find the server associated to a given key? The naive way is to scan linearly the hashes until we find a server hash. A more efficient way is to keep the server hashes in a binary balanced search tree, so we can find the leaf with the smallest value larger that h(x) in O(log n), while adding and removing servers to the tree is also a O(log n) operation.

Implementation in Rust

We will provide an implementation of the ideas above in Rust as an exercise. We define the interface of our structure as

pub struct ConsistentHashTable {
containers: rbtree::RBTree<u32, Entry>,
entries: HashSet<u32>,
// The hash function must have the property of mapping strings to
// the space of u32 numbers with uniform probability.
hash_function: fn (&String) > u32

Note that we’ll store the list of servers (containers) and keys (entries) in separate structures. We can store the entries in a simple hash table since we just need efficient insertion, deletion and look up. For the containers we need insertion, deletion but also finding the smallest element that is larger than a given value, which we’ll call successor. As we discussed above, we can use a binary balanced search tree which allow all these operations in O(log n), for example a Red-Black tree. I found this Rust implementation of the Red-Black tree [1].

Finally, we also include the hash function as part of the structure in case we want customize the implementation (handy for testing), but we can provide a default implementation.

To “construct” a new structure, we define a method new() in the implementation section, and use farmhash as the default implementation for the hash function [2].

impl ConsistentHashTable {
pub fn new() -> ConsistentHashTable {
return ConsistentHashTable {
containers: rbtree::RBTree::new(),
entries: HashSet::new(),
hash_function: hash_function,
fn hash_function(value: &String) -> u32 {
return farmhash::hash32(&value.as_bytes());

The insertion and removal are already provided by the data structures, and are trivial to extend to ours. The interesting method is determining the server corresponding to a given key, namely get_container_id_for_entry().

In there we need to traverse the Red-Black tree to find the successor of our value v. The API of the Red-Black tree doesn’t have such method, only one to search for the exact key. However due to the nature of binary search trees, we can guarantee that the smallest element greater than the searched value v will be visited while searching for v.

Thus, we can modify the search algorithm to include a visitor, that is, a callback that is called whenever a node is visited during the search. In the code below we start with a reference to the root, temp, and in a loop we keep traversing the tree depending on comparison between the key and the value at the current node.

fn find_node_with_visitor<F>(
k: &K,
mut visitor: F
) -> NodePtr<K, V> where F: FnMut(&K) {
if self.root.is_null() {
return NodePtr::null();
let mut temp = &self.root;
unsafe {
loop {
let key = &(*temp.0).key;
let next = match k.cmp(key) {
Ordering::Less => &mut (*temp.0).left,
Ordering::Greater => &mut (*temp.0).right,
Ordering::Equal => {
return *temp;
if next.is_null() {
temp = next;

view raw
hosted with ❤ by GitHub

Let’s take a detour to study the Rust code a bit. First, we see the unsafe block [3]. It can be used to de-reference a raw pointer. A raw pointer is similar to a C pointer, i.e. it points to a specific memory address. When we de-reference the pointer, we have access to the value stored in that memory address. For example:

let mut num = 5;
let r1 = &num as *const i32;
unsafe {
println!("r1 is: {}", *r1);

view raw
hosted with ❤ by GitHub

The reason we need the unsafe block in our implementation is that self.root is a raw pointer to RBTreeNode, as we can see in line 1 and 4 below:

struct NodePtr<K: Ord, V>(*mut RBTreeNode<K, V>);
pub struct RBTree<K: Ord, V> {
root: NodePtr<K, V>,
// in find_node_with_visitor()
let mut temp = &self.root;
let key = &(*temp.0).key;

view raw
hosted with ❤ by GitHub

The other part worth mentioning is the type of the visitor function. It’s defined as

fn find_node_with_visitor<F>(
k: &K,
mut visitor: F
) -> where F: FnMut(&K) {

view raw
hosted with ❤ by GitHub

It relies on several concepts from Rust, including Traits, Closures, and Trait Bounds [4, 5]. The syntax indicates that the type of visitor must be FnMut(&K), which in turns mean a closure that has a single parameter of type &K (K is the type of the key of the RB tree). There are three traits a closure can implement: Fn, FnMut and FnOnce. FnMut allows closures that can capture and mutate variables in their environment (see Capturing the Environment with Closures). We need this because our visitor will update a variable defined outside of the closure as we’ll see next.

We are now done with our detour into the Rust features realm, so we can analyze the closure we pass as visitor. It’s a simple idea: whenever we visit a node, we check if it’s greater than our searched value and if it’s smaller than the one we found so far. It’s worth noticing we define closest_key outside of the closure but mutate it inside it:

let mut closest_key: u32 = std::u32::MAX;
|node_key| {
if (
distance(closest_key, target_key) >
distance(*node_key, target_key) &&
*node_key > target_key
) {
closest_key = *node_key;

view raw
hosted with ❤ by GitHub

We also need to handle a corner case which is that if the hash of the value is larger than all of those of the containers, in which case we wrap around our virtual circular table and return the container with smallest hash:

// Every container key is smaller than the target_key. In this case we 'wrap around' the
// table and select the first element.
if (closest_key == std::u32::MAX) {
let result = self.containers.get_first();
match result {
None => {
return Err("Did not find first entry.");
Some((_, entry)) => {
let container_id = &;
return Ok(container_id);

view raw
hosted with ❤ by GitHub

The full implementation is on Github and it also contains a set of basic unit tests.


The idea of a consistent hash is very clever. It relies on the fact that binary search trees can be used to search not only exact values (those stored in the nodes) but also the closest value to a given query.

In a sense, this use of binary trees is analogous to a common use of quad-trees, which is to subdivide the 2d space into regions. In our case we’re subdividing the 1d line into segments, or more precisely, we’re subdividing  a 1d circumference into segments, since our line wraps around.

I struggled quite a bit with the Rust strict typing, especially around passing lambda functions as arguments and also setting up the testing. I found the mocking capability from the Rust toolchain lacking, and decided to work with dependency injection to mock the hash function and easier to test. I did learn a ton, though!


[1] GitHub: /tickbh/rbtree-rs
[2] GitHub: seiflotfy/rust-farmhash
[3] The Rust Programming Language book – Ch19: Unsafe Rust
[4] The Rust Programming Language book – Ch13: Closures: Anonymous Functions that Can Capture Their Environment
[5] The Rust Programming Language book – Ch13: Traits: Defining Shared Behavior


Blockchain has become a very popular technology recently due to the spread of Bitcoin. In this post, we’ll focus on the details of blockchain with a focus on Computer Science, studying it as a distributed data structure.


Blockchain is a distributed data structure that can be used to implement a distributed ledger system. A distributed ledger orchestrates important operations (for example financial transactions) without the need of a centralized arbiter (such as a financial institution).

The reason to prefer decentralized systems could be from costs of operations: having a few financial institutions mediate all transactions require a lot of work; To avoid a single point of failure (more reliable), and finally to not have to trust a few companies with our assets.

Additionally, by being decentralized, the expectation is that it becomes less likely to regulate and thus far it has enable global currencies like bitcoin.


The main challenge with a distributed ledger is how to distribute the work and data across nodes in a network and, more importantly, how to make sure we can trust that information.

An initial naive idea could be to store a copy of all accounts balances in every node of the network. The problem of storing only the balances of accounts is that it’s very hard to verify whether a given payment went through or make sure the balances are consistent. To address that, the blockchain also stores the whole history of transactions that led to the current balances (think of version control).

Safety. Even if we have the whole history of the transactions, it’s possible for a bad actor to tamper with this history on their own benefit, so we need a safety mechanism to prevent that.

Consistency. Finally, because we store copies of data that is changing all the time in multiple machines, we’ll invariably hit problems with consistency and sources of truth.

Blockchain is a data structure designed to address these challenges as we shall see next. We’ll use bitcoin as the application when describing examples.

Data Structure

A blockchain is a chain of entities called blocks that is replicated to several nodes in a network. Each block has a hash which is computed as a function of its contents and the hash of the previous node in the chain.

For the bitcoin case, the content of the block is a set of transactions adding up to 1MB. The hash of the contents is the hash of the combination of the hashes of individual transactions. More specifically, these hashes can be organized in a binary tree (also known as Merkle Tree) where the transactions hashes are the leaves and the hashes of individual inner nodes are the hash of the concatenation of the hashes of the children.


Merkle Tree

Source of truth. There might be several different versions of the chain around the network, either due to inconsistency or bad actors. The assumption is that the chain with most nodes that is agreed upon by the majority of the nodes (over 50%) is the accurate and most up-to-date version of the chain and the one that should be trusted.



User C receiving the chains from A and B. Since B’s chain is longer, C will take it as the source of truth.

Inserting Blocks

The insertion consists of adding a new transaction to the blockchain. In terms of our bitcoin example, this is essentially user A sending some amount of money to user B.

To start the process, node A broadcasts a message with the transaction, signing it with its private key. The other nodes on the network have node A’s public key, so they can check the authenticity of the transaction.

The transaction stays in a pool of unresolved transactions. At any time, there are many nodes in the network constructing a new block, which contains several transactions. These nodes are also called miners. Not all nodes in the network need to be miners. Note that each block can pick any transaction from the pool so the set of transactions in a block under construction can be different between miners.

Adding a transaction to its block consists of verifying things like its authenticity, the validity (e.g. to make sure there are enough funds to be spend), then inserting it to the Merkle Tree, which allows recomputing the root hash in O(log n) time.

Each block should be around 1MB in size, so when a miner is done collecting enough transactions it can start wrapping up its work. It needs to compute the block hash which is a string such that when concatenated with the Merkle tree root hash and the previous block in the largest chain will generate a hash with k leading 0s. Since the hash function used is cryptographic, it cannot be easily reverse-engineered. The only way to find such string is via brute force. The number k is a parameter that determines the difficulty in finding the hash (the value is controlled by some central authority but rarely changes). The idea of this hash computation is that it’s purposely a CPU-expensive operation, so that it’s too costly for attackers to forge hashes as we’ll see later.

Once it finds the hash, it can broadcast the new block to the network where other miners can start to validate this block. They can check that:

  • The transactions in the block are valid
  • The block was added to the largest chain known
  • The hash generated for the block is correct

The verification step should be much cheaper than the process of generating the hash. Once the block passes the validation, the miner adds it to the largest chain. Now, when there’s a call to get the largest chain, this last block will be included. On the other hand, if any of the validation steps failed, the block is rejected.

Because of this, miners building their blocks can also periodically check for updates in the network for larger chains. If, while they’re building their blocks or computing the hash a new larger chain arrives, it’s better to start over, since it will be rejected later anyway.

Checking for valid transactions

In the bitcoin example, suppose I’m a vendor and I want to verify that your payment to me went through before I ship you my product.

To check whether a transaction was validated, a node just needs to ask for the current longest blockchain, which is the source of truth, and see if the transaction appears in any of the blocks.


Say user A has exactly 1 coin, and that it sends to B as payment. Immediately after, A sends that same coin to another user C. How does C make sure that the coin it’s receiving is not spent before?

In a decentralized ledger, this could lead to problems since different nodes could disagree to where the coin went. In the blockchain, there are two scenarios:

  • A to B  got included in a block first which later became part of the longest chain (and hence the source of truth). The transaction from A to C would be rejected from being added to future blocks due to insufficient funds.
  • A to B and A to C got picked up to be included in the same block. The miner constructing the block would consider the second transaction it picked up invalid. Even if it was malicious and it included both transactions, it would fail validation when broadcasting it to the network.

The case in which A to C gets picked up first is analogous.

Changing History

Suppose that user A, after performing a payment to and receiving its product from user B, wants to reuse the transaction and send the money to another user C. In theory it could edit the destination of the transaction from the chain (or even replace the block entirely), but to make this modified chain become the source of truth, it would also need to make it the longest.

As we’ve seen above, adding a block to a chain with a valid hash is extremely expensive. To make it the new source of truth, it would need to add at least one block in addition to the forged one. This means it would need to outpace all the other miners in the network that are also continuously computing new blocks in parallel. The paper [1] claims that unless the attacker controls over 50% of the CPU power in the network, this is extremely unlikely to succeed.

Note that a user cannot modify the transactions it is not the sender of, since a transaction from user A to user B is signed with A’s private key. If user C wanted to redirect A’s money to itself, it would need to know A’s private key, so it could sign the transaction pretending it’s A.


As we mentioned earlier, not all nodes in the network need to be miners. So why would any node volunteer to spend energy in form of CPU cycles to compute hashes? In bitcoin, the incentive is that the miner whose block gets added to the chain receives bitcoins as reward. These bitcoins are collected as fees from the normal transactions.


Bitcoin Mining Farms to compute block hashes and collect rewards

There must incentives to make sure that a miner won’t keep building blocks with one or zero transactions, to skip doing any validations for transactions. The reward should take into account the transaction, and possibly its value and their age (to make sure “unimportant” transactions won’t be ignored forever).


In researching the material for this post, I ran into a lot of articles and videos covering blockchain without going into much detail. As always, I only realized my understanding had a lot of gaps once I started writing this post.

Overall I know a good deal more about what Blockchain is and what it’s not. I’m curious to see what other applications will make use of this technology and if we can come up with a trustworthy system that doesn’t require wasting a lot of CPU power.


[1] Bitcoin: A Peer-to-Peer Electronic Cash System
[2] Hashcash: A Denial of Service Counter-Measure – A proof-of-work algorithm
[3] Blockchain: how mining works and transactions are processed in seven steps
[4] Bitcoin P2P e-cash paper – The proof-of-work chain is a solution to the Byzantine Generals’ Problem

Paper Reading – F1: A Distributed SQL Database That Scales

In this post we’ll discuss F1: A Distributed SQL Database That Scales. We’ll provide a brief overview of the paper’s contents and study in more details the architecture of the system and the implementation details. At the end, we provide an Appendix to cover some distributed systems and databases concepts mentioned throughout the paper.

F1 is a database used at Google to serve data for the AdWords product. This is the system that displays embedded ads in external pages using Google’s

F1 is named after a term from Genetics, F1 hybrid, in analogy to the idea that it combines the best aspects of relational databases and NoSQL systems (my initial thought was that it was named after Formula One).

In a high-level, F1 is a layer on top of the Spanner database which we covered in a previous post. It adds features such a SQL interface (worth noting that Spanner recently has evolved to support SQL natively too). Spanner handles a lot of the distributed system difficulties under the hood, providing some guarantees and F1 builds relational database features on top of that.

The paper mentions that AdWords migrated from a sharded MySQL solution, which was very expensive to maintain, especially when the data outgrew the instance and re-sharding was necessary. Now, this data redistribution is handled transparently by Spanner and while F1 is theoretically slower, in practice they were able to structure the data in such a way that it’s performant in practice (5-10 ms for reads and 50-150ms for writes).


F1 is deployed into multiple servers, across geographically distributed data centers. A client sends requests to a load balancer that then redirects to an F1 server, usually in the closest datacenter.

Screen Shot 2018-01-31 at 8.30.13 PM

Architecture of F1: In display are two datacenters containing multiple machines running F1 servers, Spanner and CFS instances.

F1 servers are usually in the same Data Center as the Spanner servers that contain their data. F1 can communicate to Spanner servers outside of its datacenter directly, but Spanner only communicates with the Colossus file system (CFS) within the same datacenter because CFS is not globally replicated.

F1 servers are stateless except when the client performs pessimistic transactions, which make them easier to scale, since no data movement is required.

In addition to F1 servers, there are F1 master and a pool of F1 slaves. According to the paper the master controls which processes are in the pool, and that F1 slaves execute part of the query plan on behalf of F1 servers, but it’s not clear why they need to be a separate component.

Data Model

The table data can be organized in a hierarchical schema much like Spanner (see
“Data Model” in the post about Spanner). This hierarchical schema is optional but it’s a key piece in making the performance of F1 competitive with the previous sharded MySQL system AdWords used.

In this hierarchy, we have a root table and a child table. For each row in the root table, called root row, we have rows clustered under that row based on their matching keys.

For example, say that we have tables Customer and Campaign. The Customer table has customerID and Campaign (customerID, campaignID). A possible structure for the rows would be:

Screen Shot 2018-01-31 at 9.05.08 PM

Where Campaign rows with customerID=3 are clustered together under the corresponding Customer row.

The paper mentions that a given cluster of child rows fall within the same Spanner directory, which means that a transaction would query a single Spanner server, avoiding the overhead of synchronization between multiple servers (see Read-write transactions in the Spanner post).

Indexes. Tables support two types of indexes: local or global. Local index entries are stored in the same Spanner server of the rows the index but it must include the key of the root table. Global indexes do not have such restrictions but they can be distributed across multiple Spanner servers and very expensive to update within a transaction.

Schema changes. Updating the schema of a table is challenging because the rows of the table are distributed, which makes schema consistency expensive to achieve, and the system cannot have downtime.

The proposed solution is to break schema changes into smaller steps, such that as if no 2 severs are more than 2 steps apart, the changes are safe.



F1 supports 3 types of transactions: snapshot transactions, pessimist transactions and optimistic transactions. It relies on the transactions supported by Spanner, so it’s worth re-reading the Implementation Details of the Spanner post.

Snapshot transactions are read-only transactions and map to the corresponding read-only transactions from Spanner. The pessimistic transaction maps to the read-write transactions from Spanner.

The optimistic transaction consists of multiple reads followed by a single write. F1 stores an extra column with the last modified timestamp for each row. When the client performs the reads, it stores the largest timestamp it saw for any row it received. It sends this timestamp together with the final write request. F1 then performs a pessimistic transaction to read only the timestamps of the affected rows. If any of them differ from the timestamp sent by the client, it aborts. Otherwise it performs the write.

It’s worth noting that F1 didn’t create a transaction until the very last write request was sent. It’s assuming there were no writes between these reads and the final write, so we say it’s optimistic. This is the default transaction type and the paper describes a few advantages over the pessimistic type.


F1 supports very granular locking of the tables, including row level and cell (set of columns for a given row) level.

Change history

F1 stores change history of the data. The changes are stored as regular tables in a structure called ChangeBatch, children of a root table. If a transaction updates multiple root rows, several ChangeBatches are created under their corresponding root tables.

This feature allows clients to subscribe to changes in F1 tables using a publish-subscriber system. Whenever the client receives a notification, it can ask for incremental changes after a specific timestamp and apply to the data it has stored locally.


F1 supports both NoSQL and SQL interfaces. The SQL dialect was extended to support Protocol Buffers, which are complex data types with strong types (as opposed to loosely typed structures such as JSON). This extension allows, for example, to read and update internal fields of such structures.

Local vs distributed. F1 supports centralized execution (running on a single F1 node) or distributed. The query optimizer decides which one to execute. The paper details the process of executing the query in a distributed fashion.

Distributed Query Example

The paper describes an example of a SQL query being mapped to an execution plan. It involves two joins, filters and aggregations:

Screen Shot 2018-02-02 at 11.10.01 PM

This SQL query is parsed and converted to a query plan that will be executed by multiple machines called operators. A possible execution plan for the sample query is:

Screen Shot 2018-02-02 at 11.13.23 PM

The arrows indicate the data flow, starting at the bottom. The first join is a lookup join (hash join). The operation reads rows from the AdClick table until it has about 100k unique lookup keys stored in a hash table. Then it performs a single (batch) lookup to the other table, AdGroupCreative. The rows from the AdClick table are kept in memory for faster lookup.

As soon as the join for these keys are completed, they’re streamed to the next stage of the pipeline.

The second join with Creative is a distributed join. It first repartitions each row from each table based on the values of the columns listed in the USING clause. Each partition might end up in different machines for the next stage which consists of joining the columns of matching rows.

Finally, the rows are again repartitioned, now by the values from the group by columns and then aggregators apply the aggregation for sets of rows under the same partition.

Distributed Execution Overview

More generally, the query plan created by F1 is a DAG (directed aclyclic graph) where each node is an operator like the join or aggregator described above. Note that there are multiple operators running the same operation in parallel.

The paper says:

A technique frequently used by distributed database systems is to take advantage of an explicit co-partitioning of the stored data.

It’s not very clear to me what they mean with that, especially because they don’t cite any references, but from context it suggests that the base operators (the scan and lookup join) are in the same machine as the data (co-located) and they do as much of the processing upfront as possible. This helps minimize data transfer which can become the bottleneck in a distributed computation. F1 cannot do that because Spanner abstracts the data location from F1.

A side effect is that there’s a lot of network traffic. The authors claim that Google has network switch hardware improvements which allows servers to communicate with each other close to full network speed.

When the hash tables in memory grow too large, they write part of the data to disk. So while F1 doesn’t store data in a persistent way, it still needs to write to disk for intermediate operations.

For efficiency, F1 doesn’t write checkpoints to disk. The downside is that the system is not fault tolerant. Failures in any stage of the execution causes the entire query to fail. Retries are done transparently but long queries (>1h) are bound to fail.

Other features

F1 exposes data from intermediate nodes to clients. This avoid having all the data concentrating at the last node of the query execution. They cite Map-Reduce jobs as examples of such feature.


In this post we learned about one of Google’s many distributed databases, F1. It’s a loosely coupled layer on top of Spanner to provide a more familiar level of abstraction which are relational databases.

It seems that we could make an analogy between Google’s systems and similar open source solutions. The Colossus File System (CFS) is the distributed filesystem that could map to Hadoop Distributed File System (HDFS), and Spanner would map to Hadoop’s YARN, and F1 providing SQL semantics on top of Spanner could be mapped to Hive which does the same for Hadoop. It’s a very rough comparison, and maybe Spanner is more similar to Spark but it’s interesting to see the patterns and relationship between these systems.


[1] F1: A Distributed SQL Database That Scales
[2] Spanner: Google’s Globally-Distributed Database

Appendix: Terminology and Concepts

I was unfamiliar with several of the terminology and concepts used throughout the paper, so I had to do some extra research. Here I attempt to explain some of these topics by quoting snippets from the paper.

We also have a lot of experience with eventual consistency
systems at Google

Eventual consistency means that a set of servers might not contain the most recent updates but eventually will. For example, the client might issue a write that affects multiple machines. If the system only provides eventual consistency then a subsequent read is not guaranteed to get the data up-to-date with those writes.

The quote mentions experience with eventual consistency in a negative way, because of the extra complexities that clients have to deal with to work around this limitation.

Paper Reading – Spanner: Google’s Globally-Distributed Database

This is the first post under the category “Paper Reading” which will consist in summarizing my notes after reading a paper.

In this post we’ll discuss Spanner: Google’s Globally-Distributed Database. We’ll provide a brief overview of the paper’s contents and study in more details the architecture of the system and the implementation details. At the end, we provide an Appendix to cover some distributed systems and databases concepts mentioned throughout the paper.

Paper’s Overview

Spanner is a distributed database used at Google. It provides a SQL API and several features such as reading data from a past timestamp, lock-free read-only transactions and atomic schema changes.

In Section 2, Implementation, it describes how the data is distributed in machines called spanservers, which themselves contain data structures called tablets, which are stored as B-tree-like files in a distributed file system called Colossus. It describes the data model, which allows hierarchy of tables, where the parent table’s rows are interleaved with the children tables’.

The paper puts emphasis on the TrueTime API as the linchpin that address many challenges in real-world distributed systems, especially around latency. This is described in Section 3.

Section 4 describes technical details on how to implement the distributed transactions and safe schema changes.

In Section 5, the authors provide benchmarks and use cases, in particular F1, a database built on top of Spanner and used by Google’s AdWords team, to address some scaling limitations of a sharded MySQL database.



Figure 1: Spanner stack [1]

As we can see in Figure 1, Spanner is organized in a set of zones, which are the unit of administrative deployment. Even though there might be more than one zone per data center, each zone correspond to a physically disjoint set of servers (cluster). Within a zone, we have the zonemaster, which the paper doesn’t delve into, the location proxy, which serves as a index that directs requests to the appropriate spanserver, and the spanserver itself, which is the main unit in the system, containing the actual data.

Figure 2: Spanner architecture

A spanserver contains multiple tablets, which are a map

(key, timestamp) -> value

They are stored in Colossus, a distributed file system (successor of Google File System). A spanserver contains multiple replicas of the same data and the set of replicas form a Paxos group, which we can see in Figure 2. Reads can go directly to any replica that is sufficiently up-to-date. Writes must go through a leader, which is chosen via Paxos. The lock table depicted on top of the leader replica in Figure 2 allows concurrency control. The transaction manager is used to coordinate distributed transactions (that is, across multiple groups) and it also chooses the participant leader. If the transaction involves more than one Paxos group, a coordinator leader must be elected among the participant leaders of each group.

Within a tablet, keys are organized in directories, which is a set of contiguous keys sharing a common prefix. A directory is the unit of data, meaning that data movement happens by directories, not individual keys or rows. Directories can be moved between Paxos groups.

Data Model

The paper mentions that its data is in a semi-relational table. This is because it has characteristics from both relational tables (e.g. MySQL tables) and non-relational table (e.g. HBase tables). It looks like a relational table because it has rows, columns and versioned values. It looks like a key-value store table because rows must have a unique identifier, which acts a key, while the row is the value. The qualifier schematized seems to denote that these tables have well-defined schemas (including column types and primary keys).

In particular, the schema supports Protocol Buffers. This means that besides the native types existing in databases (string, int, etc.), the schema supports the strongly typed structures from Protocol Buffers.

Another feature of this database is how it allows storing related tables with their rows interleaved. This helps having data that is often queried together to be located physically close (co-located). In the example of Figure 3, it stores the Users and Albums interleaved. If we think of this as an use case for the Google Photos application, it should be a pretty common operation to fetch all albums of a given user.

Figure 3: Interleaved tables

TrueTime API

The main method from the true time API (TT) is: [earliest, latest]

It returns the current universal time within a confidence interval. In other words, there’s a very strong guarantee that the actual time lies within the returned time range. Let \epsilon be half of a given confidence interval length and \bar\epsilon the average across all \epsilon‘s. According to their experiments, \bar\epsilon varied from 1 to 7ms during the pollings.

Of course, the strength doesn’t lie on the API itself, but the small confidence interval it can guarantee. They achieve this through a combination of atomic clocks and GPSs, each of which have different uncorrelated synchronization and failure sources, which means that even if one of them fails, the other is likely working properly.

Implementation Details

Spanner has three main types of operations: read-write transaction, read-only transaction and snapshot read at a timestamp.

Read-write transactions

The writes in a read-write transaction, or RW for short, are buffered in the client until commit, which means the reads in that transaction do not see the effects of the writes.

First, the client perform the reads, by issuing read requests to each of the leader of the (Paxos) groups that have the data. The leader acquires read locks and reads the most recent data.

Then the client starts the 2 phase commit for the writes. It chooses a coordinator group (a representative among the Paxos groups) and notify the leaders of the other groups with the identity of the coordinator plus the write requests. A non-coordinator group’s leader first acquires write locks and then chooses a prepare timestamp that is greater than all the timestamps issued by this group, to preserve monotonicity, and sends this timestamp to the coordinator’s leader.

The coordinator’s leader acquires write locks and receives all the timestamps from the other leaders and chooses a timestamp s that is greater than all of these, greater than the at the time it received the commit request and greater than any timestamps issued within the coordinator group.

The coordinator’s leader might need to wait until it’s sure s is in the past, that is, until > s. After that, the coordinator sends s to all other leaders which commit their transactions with timestamp s and release their write locks.

This choice of timestamp can be shown to guarantee the external consistency property, that is, if the start of a transaction T2 happens after the commit of transaction T1, then the commit timestamp assigned to T2 has to be grater than T1’s.

Read-only transactions

If the read involves a single Paxos group, then it chooses a read timestamp as the timestamp of the latest committed write in the group (note that if there’s a write going on, it would have a lock on it). If the read involves more than one group, Spanner will choose for timestamp, possibly waiting until it’s sure that timestamp is in the past.


In this post we learned about some of the most recent Google’s databases, the main pioneer in large-scale distributed systems. It addresses some limitations with other solutions developed in the past: Big Table, which lacks schema and strong consistency; Megastore, which has poor write performance; and a sharded MySQL, which required the application to know about the sharding schema and resharding being a risky and lengthy process.

One thing I missed from this paper is whether Spanner can perform more advanced relational database operations such as aggregations, subqueries or joins. Usually performing these in a distributed system requires some extra component to store the intermediate values, which was not mentioned in the paper.


  • [1] Spanner: Google’s Globally-Distributed Database
  • [2] Principles of Computer Systems – Distributed Transactions
  • [3] Radar – Google’s Spanner is all about time
  • [4] Implementing Fault-Tolerant Services Using the State Machine Approach: A Tutorial
  • [5] System level concurrency control for distributed database systems

Appendix: Terminology and concepts

I was unfamiliar with several of the terminology and concepts used throughout the paper, so I had to do some extra research. Here I attempt to explain some of these topics by quoting snippets from the paper.

Spanner is a database that shards data across many set of Paxos.

Sharding is basically distributing the data into multiple machines. It’s used commonly in a database context meaning partitioning the table’s rows into multiple machines. Paxos is a protocol used to solve the consensus problem, in which a set of machines (participants) in a distributed environment must agree on a value (consensus). We discussed it in a previous post, and it guarantees that at least a quorum (subset of more than half of the participants) agree on a value. In “set of Paxos”, it seems that Paxos is an allegory to represent a set of machines using the Paxos protocol.

Spanner has two features that are difficult to implement in a distributed database: it provides externally consistent reads and writes, and globally-consistent reads across the database at a timestamp.

In the context of transactions in a concurrent environment, we need to have a total order of the transactions. This is to avoid problems such as the concurrent balance updates [2]: Imagine we have a bank account with balance B and two concurrent transactions: Transaction 1 (T1) reads the current balance and adds $1. Transaction 2 (T2) reads the current balance and subtracts $1. After T1 and T2 are executed (no matter in which order), one would expect that the final balance remains B. However, if the read from T2 happens before the write from T1 and the write from T2 after, T1 will be overridden and the final balance would be B-$1. The total ordering guarantees that T1 and T2 are disjoint (in time).

To obtain external consistency, the following property must hold: T1’s start time is less than T2’s, then T1 comes before T2 in the total order.

Google has datacenters all over the world. By providing globally-consistent reads at a timestamp, it guarantees that, given a timestamp, it will return the same data no matter which datacenter you ask the data from.

Running two-phase commit over Paxos mitigates the availability problems.

Two-phase commit or 2PC is a protocol to guarantee consistency of distributed transactions. It guarantees that either all transactions will succeed or none will be performed (they’ll rollback). The protocol requires the election of a coordinator among the participant transactions and such election can be performed using the Paxos protocol, which seems to be the case here. In the first phase, the leader must obtain an agreement from all participants and after that it starts the second phase and sends a message to each participant informing them to proceed with the transaction.

To support replication, each spanserver implements a single Paxos state machine on top of each tablet

A Paxos state machine is a method for implementing fault-tolerance [4]. This seem complicated enough to warrant an entire post on this topic, but the basic idea seems to use replicas, each of which containing a copy of a state machine and use this information to guarantee correctness and availability under failures.

Reads within read-write transactions use woundwait to avoid deadlocks

Wound-wait lock is a mechanism used between transactions to prevent deadlock [5]. More specifically, assume we have transactions T1 and T2 with associated unique timestamps and T2 is currently holding a lock. Let t(T) be the timestamp of a transaction T. We have two scenarios: either t(T1) < t(T2) or t(T1) > t(T2). In the first case, T1 is older than T2 and the protocol says that T2 should abort, rollback and re-tries later with the same timestamp. We say T1 wounds T2. In the second case, T1 is younger than T2 and it’s allowed to wait until the resource is available.

A converse approach is the wait-die mechanism. The comparison of these methods is explained here. I haven’t researched enough to understand what the tradeoffs between these two approaches are and why Spanner decided on the first.

Notes on Zookeeper


Since I’ve read about the Paxos protocol, I’ve been wanting to learn more about Zookeeper. I’ve collected some notes of this system based on the official documentation and blog posts about the subject.

We’ll understand in a high-level how Zookeeper works, which guarantees it provides and some applications we can construct using the base API.


Zookeeper is a distributed coordination service for distributed applications. It was developed at Yahoo! by Ben Reed and Flavio Junqueira in 2006 and open-sourced in 2007.

It can be deployed in a set of servers (called ensemble), in which one is elected a leader and the others are called followers. If a leader fails, one of the followers is promoted to a leader.

Each server running zookeeper maintains a copy of a tree structure. The nodes in the tree, called znodes, can hold data and are referred by filesystem-like paths, that is, a list of node names concatenated with “/”. Each server also has a replica of the tree in a persistent storage, for recovery.

Clients can connect to one of these hosts to perform CRUD (Create, Read, Update, Delete) operations on the nodes of the tree. All write requests goes to the leader. It then broadcasts the update proposals to the followers, that have to obtain a quorum (at least half) to agree on the proposal, so we have guarantees the data in all followers is consistent (Zookeeper uses the Paxos protocol for that purpose). Every update in a znode increases its version number.

A given znode can be marked as sequential and/or ephemeral. If it’s marked as sequential, a sequence number will be appended to the node name when creating it, which Zookeeper guarantees is unique. If it’s marked as ephemeral, the znode will be deleted when the client that created the node ends the connection.

Clients can subscribe to changes on a given node of the tree via watches by providing callbacks, which are called when the event it subscribes to is fired. The events are only fires once, no clients have to setup the watch again after the first firing.

Similar to a filesystem, it’s possible to associate create, read, update and delete permissions to znodes. These permissions are refereed to as ACL (access control list).


Zookeeper is a simple system, but it provides a set of guarantees that is otherwise hard to get right in practice:

* Sequential consistency – If a client requests a sequence of updates to the tree, they will happen in the same order
* Atomic updates – Either the update succeeds or fails, with no partial updates
* Single System Image – The client will see the same tree no matter to which server it connects to
* Reliability – Updates made to the tree will never be lost
* Timeliness – The tree the clients view are up-to-date within a certain amount of time.


One of the main interesting things about Zookeeper is that it provides a very small API that does general things well, so it can be used for different purposes. We’ll now cover some of them described in [1].

Barrier. Barriers are useful for synchronizing distributed nodes. They all block until a condition is met and the barrier is removed. One way to implement this mechanism in Zookeeper is to have all clients watch for a given znode, and delete the znode when the barrier is complete.

The Zookeeper recipe wiki [1] describes a recipe for implementing double barriers.

Distributed queue. A distributed queue is a data structure similar to a regular queue, but it’s available to a distributed set of clients, which can either enqueue an element at the end of the queue or retrieve an element from the front.

We can use Zookeeper to implement a simple distributed queue. We create a root znode representing the tree. Inserting an element in the queue corresponds to create a new node under that znode, while retrieving an element corresponds to removing the corresponding znode.

To make sure the order is respected, we can mark the nodes created as sequential, so they have a number consistent with their creation order. For insertion, Zookeeper will handle race conditions for us with the sequential nodes.

To dequeue an element we need to handle race conditions in which two clients are trying to dequeue an element at the same time. One simple way to do this is by having a client getting the list of elements in the queue (get all children from the root node), sorting them and try to remove the element with the smallest sequence number. If the znode is already deleted by another client, keep trying with the next node in the sequence, until it tries all elements it has downloaded. After that it may need to get a new list of elements, since they can have been inserted since the first call. If it returns empty, then it can throw an exception due to the empty queue.

In [2], Henry Robson writes about Zookeeper and how to write a distributed queue using this Python API. He covers the design mentioned above in more details.

One special interesting consideration is dealing with failures during insertion. When performing an enqueuing, the connection between the client and the Zookeeper server might go away before the client can tell if it succeeded. Retrying might lead to a node being inserted twice and even checking if the node is not in the queue can’t guarantee it was not inserted, since it might have been dequeued already. One solution is using a flag that the client can set atomically only when a node is successfully enqueued. When dequeuing a node, the client ignores it if it doesn’t have the flag set. This way, if the enqueuer client can’t guarantee a node was inserted, it won’t be processed, so it can retry until it manages to set the flag.

[3] discusses some drawbacks of using Zookeeper to implement a queue.

Distributed lock. Distributed clients can try to acquire the lock. If the lock is available, the client successfully acquires it. Otherwise they block until the lock is released.

We can use a similar idea we had for the queue, creating a root znode representing the lock and when a client tries to acquire a lock, it does so by creating an ephemeral sequential node. It then gets the children of the directory and if it has the lowest sequence number, it has acquired the lock. Otherwise, it picks the highest sequence number that is less than itself in the directory and set a watch on that znode and waits.

When the watched znode is deleted, the current znode will wake up and can try to acquire the lock again. Note that with this design, at most one node is watching another node. If we otherwise chose to watch any changes in the directory, every time a node was deleted, it would wake up all waiting nodes, which would try to acquire the lock at the same time, causing a thundering herd problem.

The recipes [1] wiki page also describes a distributed read-write lock design.

Master election. A set of clients can use a zookeeper ensemble to elect a master among them. First, each client creates an ephemeral sequential znode and whoever has the lowest sequence number gets to be the leader. The other clients find out which znode has the immediately before their znode in the sequence and set a watch on them.

The clients will keep a connection open and keep sending heartbeats signaling they’re alive. If the current master crashes or hangs, their corresponding connection will be closed, the ephemeral znode deleted and the client that was watching this node can be promoted to master.

Curator is an open source project started by Netflix and aims to implement common recipes on top of Zookeeper and provide them as a library.


[1] ZooKeeper Recipes and Solutions
[2] Cloudera – Building a distributed concurrent queue with Apache ZooKeeper
[3] Apache Curator – Tech Note 4


In this post we learned a little bit about Zookeeper and saw some applications that can be built on top of it.

The motivation for this post was a tech that Patrick Hunt – one of the earlier contributors to Zookeeper and now at Cloudera – gave at AirBnB:

The Paxos Protocol


Leslie Lamport is an american computer scientist. He’s known for his work in distributed systems and for the LaTeX system.

He was recently the recipient of the 2014 Turing award for his work in distributed systems. Among his contributions, the Paxos protocol for solving the consensus problem is one of the most famous. Several real-world applications are built using the Paxos protocol such as Yahoo Zookeeper and Google’s Chubby.

In this post, which we’ll talk we’ll provide a brief overview of this protocol.


Consider a set of processes running in a network, where failures may happen. The consensus problem requires these distributed processes to agree on a single value.

More precisely, processes communicate with each other through asynchronous messages. These messages can take arbitrarily long to be delivered, can be duplicated and can be lost, but we assume no Byzantine failures may happen.

Also, processes may fail, but it can be restarted and restore information from before the failure.

To solve the consensus problem, Leslie Lamport devised a protocol named Paxos. There are actually many variants of this algorithm with different trade-offs but here we’ll discuss one of the simplest versions.

The protocol is named after a Greek island named Paxos. According to [3], Leo Guibas, a greek professor at Stanford, suggested the name.


There are three roles that processes can have in the protocol: the proposer, the acceptor and the learner.

A proposer is the process that makes proposals of values. Most of the time, only one process acts as proposer (we’ll discuss later when we can have multiple proposers), thus it’s also called the leader.

An acceptor can accept (multiple) proposals. A subset of acceptors that represents a majority is known as quorum. When a proposal has been accepted by a quorum, we say that the proposal was chosen.

After a proposal is chosen, the proposer informs the processes known as learners about the chosen value.


In the Paxos protocol the proposals have unique labels, which are natural numbers used to order the proposals.

The Paxos protocol has 2 phases. In the first phase, the leader will basically try to learn if any value has already been accepted by the acceptors. It will then pick a value that will guarantee the correctness of the algorithm.

In the second phase, it will propose the picked value from phase 1 to the acceptors. After a quorum of acceptors acknowledge the leader that they accepted the proposal, the leader can inform the learners.

More specifically, the protocol is as follows:

Phase 1 – Prepare

* Proposer:

First, the proposer picks up an unique number n and broadcasts a prepare request wit label n to the acceptors.

* Acceptor:

When an acceptor receives a prepare request with label n, it will ignore it if it has already received a prepare request with label greater than n. Otherwise, it will promise not to accept any request (prepare or accept) with label less than n from now on. It will then return a proposal with the highest label less than n that it has already accepted (or null if it has no such proposal).

Phase 2 – Accept

* Proposer:

If the proposer receives responses from a quorum of the acceptors, it selects, among all the proposals returned, the one with the highest label. Let v be the value of such proposal (if no proposal was returned, the proposer is free to pick up any value). The proposer then sends an accept request to each of those acceptors with value v, and label n.

If it didn’t get a response from a majority of acceptors, it may repeat phase 1 after a timeout.

* Acceptor:

If an acceptor receives an accept request with value v and label n it accepts this request, unless it has already responded to some other prepare request with a label higher than n.

If the acceptor accepts the request, it sends a response notifying the proposer. If the proposer gets a confirmation from a quorum, it can now broadcast the value to the learners.


Intuitively, when a proposal with value v gets accepted by a majority of acceptors S, then the next value a proposer will get from phase 1 is necessarily v, since any other majority includes some element from S. Inductively, at any point the value obtained from phase 1 by a proposer will be v, thus we have the following property [2]:

Property 1. If a proposal v and label n is chosen, then every proposal issued with a higher label have value v.

What could go wrong?

We can think of some scenarios of failures to understand why some steps are necessary for the correctness of the algorithm.

Failure of a learner or failure of less than half of the acceptors. In this case the protocol is still capable to obtain a quorum and inform the alive learners about the proposed value. By relying on a majority instead of all acceptors, the algorithm becomes more fault-tolerant.

Failure of leader A leader may fail in different stages of the protocol. If it fails after getting a response from a quorum, but before broadcasting the accept requests, the new leader will start fresh, because acceptors only store information after they accept an accept request.

The leader might be sending accept requests, and a quorum of acceptors accept it, but while the leader is sending the value value v to the learners, it fails. So what happened here is that a proposal was chosen but the new leader might not know about it. It might try to propose a new value to the acceptors, but due to Property 1, if it’s accepted we can assume it has the same value v, so we can guarantee that all learners will get the same value.

Multiple leaders. Since a leader can fail, the system needs to be able to elect a new leader. The problem is that the original leader might recover in which case we may have multiple leaders. Even in this case, Property 1 guarantees the correctness of the algorithm, but it may never terminate.

Failure and recover of an acceptor. The leader might be sending accept requests, and the minimum possible quorum accepts it, but one of the acceptors fail before sending the accept response. The original leader will think it didn’t get a quorum. Consider the case in which another leader proposed a different value and managed to get a quorum of acceptors. By this time, the acceptor that failed can recover and finally send the accept response to the first leader. In this scenario we wouldn’t have achieved a consensus. But again, due to Property 1, it’s guaranteed that the value proposed by the second leader is the same as the one accepted by the first quorum.


It has been a while since I’ve been wanting to start studying distributed algorithms. I had a distributed systems course during my undergrad, but I didn’t learn much at the time.

The paper “Paxos Algorithm made simple” [2] was very hard for me to understand and I’m not completely sure I did. Complementing with other resources on the internet helped me getting a better grasp at the protocol [1, 4, 5].


[1] Wikipedia – Paxos
[2] Paxos Algorithm made simple
[3] My Writings – Leslie Lamport
[4] Distributed Systems – Lecture 17: Agreement in Distributed Systems: Three Phase Commit, Paxos
[5] Consensus Protocols: Paxos