Paper Reading – F1: A Distributed SQL Database That Scales

In this post we’ll discuss F1: A Distributed SQL Database That Scales. We’ll provide a brief overview of the paper’s contents and study in more details the architecture of the system and the implementation details. At the end, we provide an Appendix to cover some distributed systems and databases concepts mentioned throughout the paper.

F1 is a database used at Google to serve data for the AdWords product. This is the system that displays embedded ads in external pages using Google’s
infrastructure.

F1 is named after a term from Genetics, F1 hybrid, in analogy to the idea that it combines the best aspects of relational databases and NoSQL systems (my initial thought was that it was named after Formula One).

In a high-level, F1 is a layer on top of the Spanner database which we covered in a previous post. It adds features such a SQL interface (worth noting that Spanner recently has evolved to support SQL natively too). Spanner handles a lot of the distributed system difficulties under the hood, providing some guarantees and F1 builds relational database features on top of that.

The paper mentions that AdWords migrated from a sharded MySQL solution, which was very expensive to maintain, especially when the data outgrew the instance and re-sharding was necessary. Now, this data redistribution is handled transparently by Spanner and while F1 is theoretically slower, in practice they were able to structure the data in such a way that it’s performant in practice (5-10 ms for reads and 50-150ms for writes).

Architecture

F1 is deployed into multiple servers, across geographically distributed data centers. A client sends requests to a load balancer that then redirects to an F1 server, usually in the closest datacenter.

Screen Shot 2018-01-31 at 8.30.13 PM

Architecture of F1: In display are two datacenters containing multiple machines running F1 servers, Spanner and CFS instances.

F1 servers are usually in the same Data Center as the Spanner servers that contain their data. F1 can communicate to Spanner servers outside of its datacenter directly, but Spanner only communicates with the Colossus file system (CFS) within the same datacenter because CFS is not globally replicated.

F1 servers are stateless except when the client performs pessimistic transactions, which make them easier to scale, since no data movement is required.

In addition to F1 servers, there are F1 master and a pool of F1 slaves. According to the paper the master controls which processes are in the pool, and that F1 slaves execute part of the query plan on behalf of F1 servers, but it’s not clear why they need to be a separate component.

Data Model

The table data can be organized in a hierarchical schema much like Spanner (see
“Data Model” in the post about Spanner). This hierarchical schema is optional but it’s a key piece in making the performance of F1 competitive with the previous sharded MySQL system AdWords used.

In this hierarchy, we have a root table and a child table. For each row in the root table, called root row, we have rows clustered under that row based on their matching keys.

For example, say that we have tables Customer and Campaign. The Customer table has customerID and Campaign (customerID, campaignID). A possible structure for the rows would be:

Screen Shot 2018-01-31 at 9.05.08 PM

Where Campaign rows with customerID=3 are clustered together under the corresponding Customer row.

The paper mentions that a given cluster of child rows fall within the same Spanner directory, which means that a transaction would query a single Spanner server, avoiding the overhead of synchronization between multiple servers (see Read-write transactions in the Spanner post).

Indexes. Tables support two types of indexes: local or global. Local index entries are stored in the same Spanner server of the rows the index but it must include the key of the root table. Global indexes do not have such restrictions but they can be distributed across multiple Spanner servers and very expensive to update within a transaction.

Schema changes. Updating the schema of a table is challenging because the rows of the table are distributed, which makes schema consistency expensive to achieve, and the system cannot have downtime.

The proposed solution is to break schema changes into smaller steps, such that as if no 2 severs are more than 2 steps apart, the changes are safe.

Writes

Transactions

F1 supports 3 types of transactions: snapshot transactions, pessimist transactions and optimistic transactions. It relies on the transactions supported by Spanner, so it’s worth re-reading the Implementation Details of the Spanner post.

Snapshot transactions are read-only transactions and map to the corresponding read-only transactions from Spanner. The pessimistic transaction maps to the read-write transactions from Spanner.

The optimistic transaction consists of multiple reads followed by a single write. F1 stores an extra column with the last modified timestamp for each row. When the client performs the reads, it stores the largest timestamp it saw for any row it received. It sends this timestamp together with the final write request. F1 then performs a pessimistic transaction to read only the timestamps of the affected rows. If any of them differ from the timestamp sent by the client, it aborts. Otherwise it performs the write.

It’s worth noting that F1 didn’t create a transaction until the very last write request was sent. It’s assuming there were no writes between these reads and the final write, so we say it’s optimistic. This is the default transaction type and the paper describes a few advantages over the pessimistic type.

Locks

F1 supports very granular locking of the tables, including row level and cell (set of columns for a given row) level.

Change history

F1 stores change history of the data. The changes are stored as regular tables in a structure called ChangeBatch, children of a root table. If a transaction updates multiple root rows, several ChangeBatches are created under their corresponding root tables.

This feature allows clients to subscribe to changes in F1 tables using a publish-subscriber system. Whenever the client receives a notification, it can ask for incremental changes after a specific timestamp and apply to the data it has stored locally.

Reads

F1 supports both NoSQL and SQL interfaces. The SQL dialect was extended to support Protocol Buffers, which are complex data types with strong types (as opposed to loosely typed structures such as JSON). This extension allows, for example, to read and update internal fields of such structures.

Local vs distributed. F1 supports centralized execution (running on a single F1 node) or distributed. The query optimizer decides which one to execute. The paper details the process of executing the query in a distributed fashion.

Distributed Query Example

The paper describes an example of a SQL query being mapped to an execution plan. It involves two joins, filters and aggregations:

Screen Shot 2018-02-02 at 11.10.01 PM

This SQL query is parsed and converted to a query plan that will be executed by multiple machines called operators. A possible execution plan for the sample query is:

Screen Shot 2018-02-02 at 11.13.23 PM

The arrows indicate the data flow, starting at the bottom. The first join is a lookup join (hash join). The operation reads rows from the AdClick table until it has about 100k unique lookup keys stored in a hash table. Then it performs a single (batch) lookup to the other table, AdGroupCreative. The rows from the AdClick table are kept in memory for faster lookup.

As soon as the join for these keys are completed, they’re streamed to the next stage of the pipeline.

The second join with Creative is a distributed join. It first repartitions each row from each table based on the values of the columns listed in the USING clause. Each partition might end up in different machines for the next stage which consists of joining the columns of matching rows.

Finally, the rows are again repartitioned, now by the values from the group by columns and then aggregators apply the aggregation for sets of rows under the same partition.

Distributed Execution Overview

More generally, the query plan created by F1 is a DAG (directed aclyclic graph) where each node is an operator like the join or aggregator described above. Note that there are multiple operators running the same operation in parallel.

The paper says:

A technique frequently used by distributed database systems is to take advantage of an explicit co-partitioning of the stored data.

It’s not very clear to me what they mean with that, especially because they don’t cite any references, but from context it suggests that the base operators (the scan and lookup join) are in the same machine as the data (co-located) and they do as much of the processing upfront as possible. This helps minimize data transfer which can become the bottleneck in a distributed computation. F1 cannot do that because Spanner abstracts the data location from F1.

A side effect is that there’s a lot of network traffic. The authors claim that Google has network switch hardware improvements which allows servers to communicate with each other close to full network speed.

When the hash tables in memory grow too large, they write part of the data to disk. So while F1 doesn’t store data in a persistent way, it still needs to write to disk for intermediate operations.

For efficiency, F1 doesn’t write checkpoints to disk. The downside is that the system is not fault tolerant. Failures in any stage of the execution causes the entire query to fail. Retries are done transparently but long queries (>1h) are bound to fail.

Other features

F1 exposes data from intermediate nodes to clients. This avoid having all the data concentrating at the last node of the query execution. They cite Map-Reduce jobs as examples of such feature.

Conclusion

In this post we learned about one of Google’s many distributed databases, F1. It’s a loosely coupled layer on top of Spanner to provide a more familiar level of abstraction which are relational databases.

It seems that we could make an analogy between Google’s systems and similar open source solutions. The Colossus File System (CFS) is the distributed filesystem that could map to Hadoop Distributed File System (HDFS), and Spanner would map to Hadoop’s YARN, and F1 providing SQL semantics on top of Spanner could be mapped to Hive which does the same for Hadoop. It’s a very rough comparison, and maybe Spanner is more similar to Spark but it’s interesting to see the patterns and relationship between these systems.

References

[1] F1: A Distributed SQL Database That Scales
[2] Spanner: Google’s Globally-Distributed Database

Appendix: Terminology and Concepts

I was unfamiliar with several of the terminology and concepts used throughout the paper, so I had to do some extra research. Here I attempt to explain some of these topics by quoting snippets from the paper.

We also have a lot of experience with eventual consistency
systems at Google

Eventual consistency means that a set of servers might not contain the most recent updates but eventually will. For example, the client might issue a write that affects multiple machines. If the system only provides eventual consistency then a subsequent read is not guaranteed to get the data up-to-date with those writes.

The quote mentions experience with eventual consistency in a negative way, because of the extra complexities that clients have to deal with to work around this limitation.

Advertisements

Paper Reading – Spanner: Google’s Globally-Distributed Database

This is the first post under the category “Paper Reading” which will consist in summarizing my notes after reading a paper.

In this post we’ll discuss Spanner: Google’s Globally-Distributed Database. We’ll provide a brief overview of the paper’s contents and study in more details the architecture of the system and the implementation details. At the end, we provide an Appendix to cover some distributed systems and databases concepts mentioned throughout the paper.

Paper’s Overview

Spanner is a distributed database used at Google. It provides a SQL API and several features such as reading data from a past timestamp, lock-free read-only transactions and atomic schema changes.

In Section 2, Implementation, it describes how the data is distributed in machines called spanservers, which themselves contain data structures called tablets, which are stored as B-tree-like files in a distributed file system called Colossus. It describes the data model, which allows hierarchy of tables, where the parent table’s rows are interleaved with the children tables’.

The paper puts emphasis on the TrueTime API as the linchpin that address many challenges in real-world distributed systems, especially around latency. This is described in Section 3.

Section 4 describes technical details on how to implement the distributed transactions and safe schema changes.

In Section 5, the authors provide benchmarks and use cases, in particular F1, a database built on top of Spanner and used by Google’s AdWords team, to address some scaling limitations of a sharded MySQL database.

Architecture

Spanservers

Figure 1: Spanner stack [1]

As we can see in Figure 1, Spanner is organized in a set of zones, which are the unit of administrative deployment. Even though there might be more than one zone per data center, each zone correspond to a physically disjoint set of servers (cluster). Within a zone, we have the zonemaster, which the paper doesn’t delve into, the location proxy, which serves as a index that directs requests to the appropriate spanserver, and the spanserver itself, which is the main unit in the system, containing the actual data.

Figure 2: Spanner architecture

A spanserver contains multiple tablets, which are a map

(key, timestamp) -> value

They are stored in Colossus, a distributed file system (successor of Google File System). A spanserver contains multiple replicas of the same data and the set of replicas form a Paxos group, which we can see in Figure 2. Reads can go directly to any replica that is sufficiently up-to-date. Writes must go through a leader, which is chosen via Paxos. The lock table depicted on top of the leader replica in Figure 2 allows concurrency control. The transaction manager is used to coordinate distributed transactions (that is, across multiple groups) and it also chooses the participant leader. If the transaction involves more than one Paxos group, a coordinator leader must be elected among the participant leaders of each group.

Within a tablet, keys are organized in directories, which is a set of contiguous keys sharing a common prefix. A directory is the unit of data, meaning that data movement happens by directories, not individual keys or rows. Directories can be moved between Paxos groups.

Data Model

The paper mentions that its data is in a semi-relational table. This is because it has characteristics from both relational tables (e.g. MySQL tables) and non-relational table (e.g. HBase tables). It looks like a relational table because it has rows, columns and versioned values. It looks like a key-value store table because rows must have a unique identifier, which acts a key, while the row is the value. The qualifier schematized seems to denote that these tables have well-defined schemas (including column types and primary keys).

In particular, the schema supports Protocol Buffers. This means that besides the native types existing in databases (string, int, etc.), the schema supports the strongly typed structures from Protocol Buffers.

Another feature of this database is how it allows storing related tables with their rows interleaved. This helps having data that is often queried together to be located physically close (co-located). In the example of Figure 3, it stores the Users and Albums interleaved. If we think of this as an use case for the Google Photos application, it should be a pretty common operation to fetch all albums of a given user.

Figure 3: Interleaved tables

TrueTime API

The main method from the true time API (TT) is:

TT.now(): [earliest, latest]

It returns the current universal time within a confidence interval. In other words, there’s a very strong guarantee that the actual time lies within the returned time range. Let \epsilon be half of a given confidence interval length and \bar\epsilon the average across all \epsilon‘s. According to their experiments, \bar\epsilon varied from 1 to 7ms during the pollings.

Of course, the strength doesn’t lie on the API itself, but the small confidence interval it can guarantee. They achieve this through a combination of atomic clocks and GPSs, each of which have different uncorrelated synchronization and failure sources, which means that even if one of them fails, the other is likely working properly.

Implementation Details

Spanner has three main types of operations: read-write transaction, read-only transaction and snapshot read at a timestamp.

Read-write transactions

The writes in a read-write transaction, or RW for short, are buffered in the client until commit, which means the reads in that transaction do not see the effects of the writes.

First, the client perform the reads, by issuing read requests to each of the leader of the (Paxos) groups that have the data. The leader acquires read locks and reads the most recent data.

Then the client starts the 2 phase commit for the writes. It chooses a coordinator group (a representative among the Paxos groups) and notify the leaders of the other groups with the identity of the coordinator plus the write requests. A non-coordinator group’s leader first acquires write locks and then chooses a prepare timestamp that is greater than all the timestamps issued by this group, to preserve monotonicity, and sends this timestamp to the coordinator’s leader.

The coordinator’s leader acquires write locks and receives all the timestamps from the other leaders and chooses a timestamp s that is greater than all of these, greater than the TT.now().latest at the time it received the commit request and greater than any timestamps issued within the coordinator group.

The coordinator’s leader might need to wait until it’s sure s is in the past, that is, until TT.now().earliest > s. After that, the coordinator sends s to all other leaders which commit their transactions with timestamp s and release their write locks.

This choice of timestamp can be shown to guarantee the external consistency property, that is, if the start of a transaction T2 happens after the commit of transaction T1, then the commit timestamp assigned to T2 has to be grater than T1’s.

Read-only transactions

If the read involves a single Paxos group, then it chooses a read timestamp as the timestamp of the latest committed write in the group (note that if there’s a write going on, it would have a lock on it). If the read involves more than one group, Spanner will choose for timestamp TT.now().latest, possibly waiting until it’s sure that timestamp is in the past.

Conclusion

In this post we learned about some of the most recent Google’s databases, the main pioneer in large-scale distributed systems. It addresses some limitations with other solutions developed in the past: Big Table, which lacks schema and strong consistency; Megastore, which has poor write performance; and a sharded MySQL, which required the application to know about the sharding schema and resharding being a risky and lengthy process.

One thing I missed from this paper is whether Spanner can perform more advanced relational database operations such as aggregations, subqueries or joins. Usually performing these in a distributed system requires some extra component to store the intermediate values, which was not mentioned in the paper.

References

  • [1] Spanner: Google’s Globally-Distributed Database
  • [2] Principles of Computer Systems – Distributed Transactions
  • [3] Radar – Google’s Spanner is all about time
  • [4] Implementing Fault-Tolerant Services Using the State Machine Approach: A Tutorial
  • [5] System level concurrency control for distributed database systems

Appendix: Terminology and concepts

I was unfamiliar with several of the terminology and concepts used throughout the paper, so I had to do some extra research. Here I attempt to explain some of these topics by quoting snippets from the paper.

Spanner is a database that shards data across many set of Paxos.

Sharding is basically distributing the data into multiple machines. It’s used commonly in a database context meaning partitioning the table’s rows into multiple machines. Paxos is a protocol used to solve the consensus problem, in which a set of machines (participants) in a distributed environment must agree on a value (consensus). We discussed it in a previous post, and it guarantees that at least a quorum (subset of more than half of the participants) agree on a value. In “set of Paxos”, it seems that Paxos is an allegory to represent a set of machines using the Paxos protocol.

Spanner has two features that are difficult to implement in a distributed database: it provides externally consistent reads and writes, and globally-consistent reads across the database at a timestamp.

In the context of transactions in a concurrent environment, we need to have a total order of the transactions. This is to avoid problems such as the concurrent balance updates [2]: Imagine we have a bank account with balance B and two concurrent transactions: Transaction 1 (T1) reads the current balance and adds $1. Transaction 2 (T2) reads the current balance and subtracts $1. After T1 and T2 are executed (no matter in which order), one would expect that the final balance remains B. However, if the read from T2 happens before the write from T1 and the write from T2 after, T1 will be overridden and the final balance would be B-$1. The total ordering guarantees that T1 and T2 are disjoint (in time).

To obtain external consistency, the following property must hold: T1’s start time is less than T2’s, then T1 comes before T2 in the total order.

Google has datacenters all over the world. By providing globally-consistent reads at a timestamp, it guarantees that, given a timestamp, it will return the same data no matter which datacenter you ask the data from.

Running two-phase commit over Paxos mitigates the availability problems.

Two-phase commit or 2PC is a protocol to guarantee consistency of distributed transactions. It guarantees that either all transactions will succeed or none will be performed (they’ll rollback). The protocol requires the election of a coordinator among the participant transactions and such election can be performed using the Paxos protocol, which seems to be the case here. In the first phase, the leader must obtain an agreement from all participants and after that it starts the second phase and sends a message to each participant informing them to proceed with the transaction.

To support replication, each spanserver implements a single Paxos state machine on top of each tablet

A Paxos state machine is a method for implementing fault-tolerance [4]. This seem complicated enough to warrant an entire post on this topic, but the basic idea seems to use replicas, each of which containing a copy of a state machine and use this information to guarantee correctness and availability under failures.

Reads within read-write transactions use woundwait to avoid deadlocks

Wound-wait lock is a mechanism used between transactions to prevent deadlock [5]. More specifically, assume we have transactions T1 and T2 with associated unique timestamps and T2 is currently holding a lock. Let t(T) be the timestamp of a transaction T. We have two scenarios: either t(T1) < t(T2) or t(T1) > t(T2). In the first case, T1 is older than T2 and the protocol says that T2 should abort, rollback and re-tries later with the same timestamp. We say T1 wounds T2. In the second case, T1 is younger than T2 and it’s allowed to wait until the resource is available.

A converse approach is the wait-die mechanism. The comparison of these methods is explained here. I haven’t researched enough to understand what the tradeoffs between these two approaches are and why Spanner decided on the first.

Notes on Zookeeper

zookeeper-logo

Since I’ve read about the Paxos protocol, I’ve been wanting to learn more about Zookeeper. I’ve collected some notes of this system based on the official documentation and blog posts about the subject.

We’ll understand in a high-level how Zookeeper works, which guarantees it provides and some applications we can construct using the base API.

Introduction

Zookeeper is a distributed coordination service for distributed applications. It was developed at Yahoo! by Ben Reed and Flavio Junqueira in 2006 and open-sourced in 2007.

It can be deployed in a set of servers (called ensemble), in which one is elected a leader and the others are called followers. If a leader fails, one of the followers is promoted to a leader.

Each server running zookeeper maintains a copy of a tree structure. The nodes in the tree, called znodes, can hold data and are referred by filesystem-like paths, that is, a list of node names concatenated with “/”. Each server also has a replica of the tree in a persistent storage, for recovery.

Clients can connect to one of these hosts to perform CRUD (Create, Read, Update, Delete) operations on the nodes of the tree. All write requests goes to the leader. It then broadcasts the update proposals to the followers, that have to obtain a quorum (at least half) to agree on the proposal, so we have guarantees the data in all followers is consistent (Zookeeper uses the Paxos protocol for that purpose). Every update in a znode increases its version number.

A given znode can be marked as sequential and/or ephemeral. If it’s marked as sequential, a sequence number will be appended to the node name when creating it, which Zookeeper guarantees is unique. If it’s marked as ephemeral, the znode will be deleted when the client that created the node ends the connection.

Clients can subscribe to changes on a given node of the tree via watches by providing callbacks, which are called when the event it subscribes to is fired. The events are only fires once, no clients have to setup the watch again after the first firing.

Similar to a filesystem, it’s possible to associate create, read, update and delete permissions to znodes. These permissions are refereed to as ACL (access control list).

Guarantees

Zookeeper is a simple system, but it provides a set of guarantees that is otherwise hard to get right in practice:

* Sequential consistency – If a client requests a sequence of updates to the tree, they will happen in the same order
* Atomic updates – Either the update succeeds or fails, with no partial updates
* Single System Image – The client will see the same tree no matter to which server it connects to
* Reliability – Updates made to the tree will never be lost
* Timeliness – The tree the clients view are up-to-date within a certain amount of time.

Applications

One of the main interesting things about Zookeeper is that it provides a very small API that does general things well, so it can be used for different purposes. We’ll now cover some of them described in [1].

Barrier. Barriers are useful for synchronizing distributed nodes. They all block until a condition is met and the barrier is removed. One way to implement this mechanism in Zookeeper is to have all clients watch for a given znode, and delete the znode when the barrier is complete.

The Zookeeper recipe wiki [1] describes a recipe for implementing double barriers.

Distributed queue. A distributed queue is a data structure similar to a regular queue, but it’s available to a distributed set of clients, which can either enqueue an element at the end of the queue or retrieve an element from the front.

We can use Zookeeper to implement a simple distributed queue. We create a root znode representing the tree. Inserting an element in the queue corresponds to create a new node under that znode, while retrieving an element corresponds to removing the corresponding znode.

To make sure the order is respected, we can mark the nodes created as sequential, so they have a number consistent with their creation order. For insertion, Zookeeper will handle race conditions for us with the sequential nodes.

To dequeue an element we need to handle race conditions in which two clients are trying to dequeue an element at the same time. One simple way to do this is by having a client getting the list of elements in the queue (get all children from the root node), sorting them and try to remove the element with the smallest sequence number. If the znode is already deleted by another client, keep trying with the next node in the sequence, until it tries all elements it has downloaded. After that it may need to get a new list of elements, since they can have been inserted since the first call. If it returns empty, then it can throw an exception due to the empty queue.

In [2], Henry Robson writes about Zookeeper and how to write a distributed queue using this Python API. He covers the design mentioned above in more details.

One special interesting consideration is dealing with failures during insertion. When performing an enqueuing, the connection between the client and the Zookeeper server might go away before the client can tell if it succeeded. Retrying might lead to a node being inserted twice and even checking if the node is not in the queue can’t guarantee it was not inserted, since it might have been dequeued already. One solution is using a flag that the client can set atomically only when a node is successfully enqueued. When dequeuing a node, the client ignores it if it doesn’t have the flag set. This way, if the enqueuer client can’t guarantee a node was inserted, it won’t be processed, so it can retry until it manages to set the flag.

[3] discusses some drawbacks of using Zookeeper to implement a queue.

Distributed lock. Distributed clients can try to acquire the lock. If the lock is available, the client successfully acquires it. Otherwise they block until the lock is released.

We can use a similar idea we had for the queue, creating a root znode representing the lock and when a client tries to acquire a lock, it does so by creating an ephemeral sequential node. It then gets the children of the directory and if it has the lowest sequence number, it has acquired the lock. Otherwise, it picks the highest sequence number that is less than itself in the directory and set a watch on that znode and waits.

When the watched znode is deleted, the current znode will wake up and can try to acquire the lock again. Note that with this design, at most one node is watching another node. If we otherwise chose to watch any changes in the directory, every time a node was deleted, it would wake up all waiting nodes, which would try to acquire the lock at the same time, causing a thundering herd problem.

The recipes [1] wiki page also describes a distributed read-write lock design.

Master election. A set of clients can use a zookeeper ensemble to elect a master among them. First, each client creates an ephemeral sequential znode and whoever has the lowest sequence number gets to be the leader. The other clients find out which znode has the immediately before their znode in the sequence and set a watch on them.

The clients will keep a connection open and keep sending heartbeats signaling they’re alive. If the current master crashes or hangs, their corresponding connection will be closed, the ephemeral znode deleted and the client that was watching this node can be promoted to master.

Curator is an open source project started by Netflix and aims to implement common recipes on top of Zookeeper and provide them as a library.

References

[1] ZooKeeper Recipes and Solutions
[2] Cloudera – Building a distributed concurrent queue with Apache ZooKeeper
[3] Apache Curator – Tech Note 4

Conclusion

In this post we learned a little bit about Zookeeper and saw some applications that can be built on top of it.

The motivation for this post was a tech that Patrick Hunt – one of the earlier contributors to Zookeeper and now at Cloudera – gave at AirBnB:

The Paxos Protocol

lamport

Leslie Lamport is an american computer scientist. He’s known for his work in distributed systems and for the LaTeX system.

He was recently the recipient of the 2014 Turing award for his work in distributed systems. Among his contributions, the Paxos protocol for solving the consensus problem is one of the most famous. Several real-world applications are built using the Paxos protocol such as Yahoo Zookeeper and Google’s Chubby.

In this post, which we’ll talk we’ll provide a brief overview of this protocol.

Introduction

Consider a set of processes running in a network, where failures may happen. The consensus problem requires these distributed processes to agree on a single value.

More precisely, processes communicate with each other through asynchronous messages. These messages can take arbitrarily long to be delivered, can be duplicated and can be lost, but we assume no Byzantine failures may happen.

Also, processes may fail, but it can be restarted and restore information from before the failure.

To solve the consensus problem, Leslie Lamport devised a protocol named Paxos. There are actually many variants of this algorithm with different trade-offs but here we’ll discuss one of the simplest versions.

The protocol is named after a Greek island named Paxos. According to [3], Leo Guibas, a greek professor at Stanford, suggested the name.

Definitions

There are three roles that processes can have in the protocol: the proposer, the acceptor and the learner.

A proposer is the process that makes proposals of values. Most of the time, only one process acts as proposer (we’ll discuss later when we can have multiple proposers), thus it’s also called the leader.

An acceptor can accept (multiple) proposals. A subset of acceptors that represents a majority is known as quorum. When a proposal has been accepted by a quorum, we say that the proposal was chosen.

After a proposal is chosen, the proposer informs the processes known as learners about the chosen value.

Protocol

In the Paxos protocol the proposals have unique labels, which are natural numbers used to order the proposals.

The Paxos protocol has 2 phases. In the first phase, the leader will basically try to learn if any value has already been accepted by the acceptors. It will then pick a value that will guarantee the correctness of the algorithm.

In the second phase, it will propose the picked value from phase 1 to the acceptors. After a quorum of acceptors acknowledge the leader that they accepted the proposal, the leader can inform the learners.

More specifically, the protocol is as follows:

Phase 1 – Prepare

* Proposer:

First, the proposer picks up an unique number n and broadcasts a prepare request wit label n to the acceptors.

* Acceptor:

When an acceptor receives a prepare request with label n, it will ignore it if it has already received a prepare request with label greater than n. Otherwise, it will promise not to accept any request (prepare or accept) with label less than n from now on. It will then return a proposal with the highest label less than n that it has already accepted (or null if it has no such proposal).

Phase 2 – Accept

* Proposer:

If the proposer receives responses from a quorum of the acceptors, it selects, among all the proposals returned, the one with the highest label. Let v be the value of such proposal (if no proposal was returned, the proposer is free to pick up any value). The proposer then sends an accept request to each of those acceptors with value v, and label n.

If it didn’t get a response from a majority of acceptors, it may repeat phase 1 after a timeout.

* Acceptor:

If an acceptor receives an accept request with value v and label n it accepts this request, unless it has already responded to some other prepare request with a label higher than n.

If the acceptor accepts the request, it sends a response notifying the proposer. If the proposer gets a confirmation from a quorum, it can now broadcast the value to the learners.

Properties

Intuitively, when a proposal with value v gets accepted by a majority of acceptors S, then the next value a proposer will get from phase 1 is necessarily v, since any other majority includes some element from S. Inductively, at any point the value obtained from phase 1 by a proposer will be v, thus we have the following property [2]:

Property 1. If a proposal v and label n is chosen, then every proposal issued with a higher label have value v.

What could go wrong?

We can think of some scenarios of failures to understand why some steps are necessary for the correctness of the algorithm.

Failure of a learner or failure of less than half of the acceptors. In this case the protocol is still capable to obtain a quorum and inform the alive learners about the proposed value. By relying on a majority instead of all acceptors, the algorithm becomes more fault-tolerant.

Failure of leader A leader may fail in different stages of the protocol. If it fails after getting a response from a quorum, but before broadcasting the accept requests, the new leader will start fresh, because acceptors only store information after they accept an accept request.

The leader might be sending accept requests, and a quorum of acceptors accept it, but while the leader is sending the value value v to the learners, it fails. So what happened here is that a proposal was chosen but the new leader might not know about it. It might try to propose a new value to the acceptors, but due to Property 1, if it’s accepted we can assume it has the same value v, so we can guarantee that all learners will get the same value.

Multiple leaders. Since a leader can fail, the system needs to be able to elect a new leader. The problem is that the original leader might recover in which case we may have multiple leaders. Even in this case, Property 1 guarantees the correctness of the algorithm, but it may never terminate.

Failure and recover of an acceptor. The leader might be sending accept requests, and the minimum possible quorum accepts it, but one of the acceptors fail before sending the accept response. The original leader will think it didn’t get a quorum. Consider the case in which another leader proposed a different value and managed to get a quorum of acceptors. By this time, the acceptor that failed can recover and finally send the accept response to the first leader. In this scenario we wouldn’t have achieved a consensus. But again, due to Property 1, it’s guaranteed that the value proposed by the second leader is the same as the one accepted by the first quorum.

Conclusion

It has been a while since I’ve been wanting to start studying distributed algorithms. I had a distributed systems course during my undergrad, but I didn’t learn much at the time.

The paper “Paxos Algorithm made simple” [2] was very hard for me to understand and I’m not completely sure I did. Complementing with other resources on the internet helped me getting a better grasp at the protocol [1, 4, 5].

References

[1] Wikipedia – Paxos
[2] Paxos Algorithm made simple
[3] My Writings – Leslie Lamport
[4] Distributed Systems – Lecture 17: Agreement in Distributed Systems: Three Phase Commit, Paxos
[5] Consensus Protocols: Paxos