Designing Data-Intensive Applications

70 minute read

I spent the last few months working through Designing Data-Intensive Applications. Not gunna lie, without an engineering background this was tough! But I did learn a lot, and I think it will be particularly useful for me when trying to understand why things went wrong (even if I won’t be able to proactively apply a lot of it without more practice).

This blog is a summary of my notes from the book.

Chapter 1: Reliable, Scalable, and Maintainable Applications

The author sets the stage for the book by pointing out that most modern applications are data-intensive rather than compute-intensive, meaning that CPU power is rarely a limiting factor for these applications—bigger problems are usually the amount of data, the complexity of data, and the speed at which it is changing.

Increasingly, many applications now have requirements that are too demanding or wide-ranging for a single tool to meet all of its data processing and storage needs. Instead, tools focus on individual tasks, and many tools are used together in one system.

This can lead to complexity. For example, how do you ensure that the data remains correct and complete, even when things go wrong internally? How do you provide consistently good performance to clients, even when parts of your system are degraded? How do you scale to handle an increase in load? Or, what does a good API for the service look like?

To address these questions, the author highlights three important considerations: reliability, scalability, and maintainability:

  • Reliability: The system should continue to work correctly even in the face of adversity (hardware or software faults, and even human error).
  • Scalability: As the system grows (in terms of data volume, traffic volume, or complexity), there should be reasonable ways of dealing with that growth.
  • Maintainability: Over time, many different people will work on the system (either to maintain or to add new functionality), and they should all be able to work on it productively.

Taking a closer look at reliability, this essentially means that the application performs the function the user expects, it can tolerate the user making mistakes or using the software in unexpected ways, the performance is good under the expected load and data volume, and the system prevents any unauthorized access or abuse.

The things that can go wrong are called faults, and systems that anticipate faults and can cope with them are called fault-tolerant or resilient. Note that a fault is different from a failure; A fault is usually defined as one component of the system deviating from its spec, whereas a failure is when the system as a whole stops providing the required service to the user. The goal is to be able to tolerate faults without causing failures.

Unfortunately, we can’t be resilient to all faults (i.e. a black hole swallows the earth). So we focus on certain types of faults.

Hardware faults are things like faulty RAM, a power grid blackout, or a hard disk crash. A first response to these kinds of faults is typically hardware redundancy, so that we have a backup if something goes wrong, which has traditionally worked well. However, as data volumes and applications’ computing demands have increased, more applications have begun using larger numbers of machines, which proportionally increases the rate of hardware faults. Moreover, in some cloud platforms such as Amazon Web Services (AWS) it is fairly common for virtual machine instances to become unavailable without warning, as the platforms are designed to prioritize flexibility and elasticity over single-machine reliability.

So, there is a move toward systems that can tolerate the loss of entire machines, by using software fault-tolerance techniques instead of or in addition to hardware redundancy.

While hardware faults tend to be random and independent from each other, software faults are systemic within the system. Such faults are harder to anticipate, and because they are correlated across nodes, they tend to cause many more system failures than uncorrelated hardware faults. Some examples of software faults include a bug that causes a server to crash when given a particular bad input, a runaway process that uses up some shared resource (like memory or disk space), or cascading failures, where a small fault in one component triggers a fault in another component, which in turn triggers further faults.

There is no quick solution to the problem of systematic faults in software, but lots of small things can help, like thorough testing, monitoring, and analyzing system behavior in production.

Finally, human error is actually responsible for the majority of outages. Some preventive measures include:

  • Designing systems in a way that minimizes opportunities for error (the book mentioned this briefly, but I’m curious about more details here, since that seems obvious but I’m not sure about best practices)
  • Decoupling the places where people make the most mistakes from the places where they can cause failures (basically, sandbox environments)
  • Testing, from unit tests to integration testing.
  • Setting up monitoring.
  • Training people well.

Next, turning to scalability, we first need to be able to describe load: Load can be described with a few numbers called load parameters, which differ depending on the architecture of your system. Some examples of load parameters include requests per second to a web server, the ratio of reads to writes in a database, the number of simultaneously active users in a chat room, the hit rate on a cache, etc. Load can either be described by the average case, or extreme cases.

Once we have a way to describe load, we then need clarity on how to describe performance. In a batch processing system such as Hadoop, we usually care about throughput—so, the number of records we can process per second, or the total time it takes to run a job on a dataset of a certain size. In online systems, what’s usually more important is the service’s response time (the time between a client sending a request and receiving a response). Note that response time will always vary slightly with each request, so it’s better to think of it as a distribution of values. Also, average response time is often reported, but this misses outliers (users who experienced a delay), so it’s typically better to use percentiles.

Scaling to accommodate increased load can be accomplished in multiple ways:

  • Scaling up (also called vertical scaling) refers to using a more powerful machine
  • Scaling out (horizontal scaling) means distributing the load across multiple smaller machines.
  • Some systems are elastic, meaning that they can automatically add computing resources when they detect a load increase, whereas other systems are scaled manually. An elastic system can be useful if load is highly unpredictable, but manually scaled systems are simpler and may have fewer operational surprises

The author notes that the majority of the cost of software is not in its initial development, but in its ongoing maintenance—fixing bugs, keeping its systems operational, investigating failures, adapting it to new platforms, modifying it for new use cases, repaying technical debt, and adding new features.

There are a few design principles to minimize pain during maintenance:

First, operability. Operability aims to make life easy for the team responsible for things like monitoring the health of the system, keeping systems up to date, coordinating systems, anticipating future problems, and maintenance tasks. This can be accomplished by:

  • Providing visibility into the runtime behavior and internals of the system with good monitoring
  • Providing good support for automation and integration with standard tools.
  • Avoiding dependency on individual machines (thereby allowing machines to be taken down for maintenance while the system as a whole continues running uninterrupted).
  • Providing good documentation.
  • Providing good default behavior, but also giving administrators the freedom to override defaults when needed.
  • Self-healing where appropriate, but also giving administrators manual control over the system state when needed.
  • Exhibiting predictable behavior and minimizing surprises.

Next, we should (of course) avoid complex code (where complex code refers to things like a tight coupling of modules, tangled dependencies, inconsistent naming and terminology, and hacks aimed at solving performance problems). Complexity makes it easier to introduce bugs and harder for engineers to learn/maintain code, so simplicity is key to maintainability.

One of the best tools we have for removing accidental complexity is abstraction. A good abstraction can hide a great deal of implementation detail behind a clean, simple-to-understand facade.

Finally, evolvability refers to the ease with which you can modify a data system, and adapt it to changing requirements. From what I understand this is basically the same idea as agility, but for data-systems.

Chapter 2: Data Models and Query Languages

Data Models

“A data model is an abstract model that organizes elements of data and standardizes how they relate to one another and to the properties of real-world entities.” In other words, it is how we choose to represent data, often in layers.

For example:

  1. A user generates data that is captured in a data structure in application code.
  2. Then stored in another data model like a relational database
  3. That data is converted into bytes on disk.
  4. Even beyond that, representing bytes in hardware would be another layer.

Adding to this, application code is often object-oriented but data is often stored in relational databases, leading to the need for object-relational mapping tools or complex translation layers.

​​Historically, data started out being represented as one big tree (the hierarchical model), but that wasn’t good for representing many-to-many relationships, so the relational model was invented to solve that problem.

The best-known relational model is SQL and relational databases.

  • Data is organized into relations (tables).
  • Each relation is an unordered collection of tuples (rows).
  • Naturally handles many-to-one relationships.
  • Joins are easy.
  • Used for transaction processing (e.g. stock in a warehouse, airline reservations) and batch processing (e.g. customer invoicing, payroll, reporting).

More recently, developers found that some applications don’t fit well in the relational model either:

  • Scalability for big data and high-write throughput.
  • Preference for open source.
  • Operations not well supported by relational models.
  • Need for dynamic or expressive data models.

So, NoSQL came into being. These datastores have diverged in two main directions:

Document models target the use cases where data comes in self-contained documents, and relationships between one document and another are rare.

  • More scalable.
  • Need for less restrictions than relational databases have.
  • Data is stored in ‘documents’ instead of tables (JSON or XML format, basically a string).
  • Can be more flexible, lack of schema.
  • All the info is in one place, so no need to query multiple tables (one to many relationship between id and info).
  • Many-to-many relationships can be a bit more challenging.

For example, if we think of a of resume or LinkedIn profile:

  • A relational model might require multiple queries or a complex join for a single resume, but in a document model like JSON one query can populate everything.
  • Many developers find this reduces mismatch between application code and storage.
  • Can significantly improve data locality, particularly in one-to-many relationships
  • May make the structure of data more explicit - for example the implied tree structure of one-to-many relationships.
  • Best used when relationships between documents are rare - joins are rare/unsupported
  • New changes to applications may introduce complex many-to-one or many-to-many relationships - in the LinkedIn example, what if organizations become their own entity?

The main arguments in favor of the document data model are schema flexibility, better performance due to locality, and that for some applications it is closer to the data structures used by the application. If the data in your application has a document-like structure (i.e., a tree of one-to- many relationships, where typically the entire tree is loaded at once), then it’s probably a good idea to use a document model. The relational technique of shredding— splitting a document-like structure into multiple tables (like positions, education, and contact_info)—can lead to cumbersome schemas and unnecessarily complicated application code.

Graph-like data models go in the opposite direction, targeting use cases where anything is potentially related to everything. For example, social connections (vertices are people and edges represent connections between people) or a web graph (web pages are vertices and edges are hyperlinks connecting them).

  • Relations are edges connecting entities at vertices in a graph.
  • Effective for highly-connected data with mostly many-to-many relationships.
  • Allows for use of well-studied algorithms for traversing graphs like pathfinding or shortest distance algorithms.
  • Can easily relate many types of objects.

If your data has lots of many-to-many relationships, graph models are the best bet. Graphs are good for evolvability: as you add features to your application, a graph can easily be extended to accommodate changes in your application’s data structures.

One thing that document and graph databases have in common is that they typically don’t enforce a schema for the data they store, which can make it easier to adapt applications to changing requirements. However, your application most likely still assumes that data has a certain structure; it’s just a question of whether the schema is explicit (enforced on write) or implicit (handled on read).

Also, in general the different data models are starting to converge, since many relational databases allow JSON, and some NoSQL allow joins, etc.

Query Languages

Like SQL, many query languages are declarative rather than imperative.

  • Imperative languages tell the computer what to do and in what order. For example, C/C++ and Java.
  • Declarative languages tell the computer the output you want, and it is free to accomplish this however, which is nice because it can optimize without user input, and is easier to parallelize. Examples are SQL and CSS styling.

We also have MapReduce, which processes large volumes of data in bulk and is neither declarative nor fully imperative, but rather based on snippets of code called repeatedly. It is a fairly low-level programming language compared to other query languages.

Chapter 3: Storage and Retrieval

Databases need to do two things: store data, and then retrieve it when requested.

One of the key factors in designing data systems is understanding your use case, and which kinds of queries reads/writes are going to need to be supported:

  • OLTPs: Online transaction processing (OLTPs) are user-facing and deal with a high-volume of relatively simple requests. They use an index and a key to access data, and disk seek time is often the bottleneck. These are typically row-based, where all of the data is in one row (like user_id, order #, items ordered, etc. all in one row).

  • OLAPs: Online Analytic Processing (OLAPs) are used for making business decisions. Analytic queries often scan over a huge number of records, reading only a few columns per record and calculating aggregate statistics vs. returning raw data to the user. They handle a lower volume of more-demanding queries, and disk bandwidth is the typical bottleneck. These are column-based, which helps to make read queries faster, though writes are more difficult.

An index is an additional structure that is derived from the primary data. It can help to improve the performance of queries, but this will incur overhead, especially on writes. For this reason databases don’t index everything by default, but require the application developer to choose them manually.

There are two families of storage engines that these databases use:

  • Log-structured: In this case only appending and deleting files is allowed (no updates). This system will have very fast writes, but linear (O(n)) performance on reads – not great for big data. The chapter goes into a lot of detail on different options for this (hash index, SSTables, and LSM-trees).
  • Update-in-place: In this case we have fixed-sized pages that can be overwritten. This is faster for reads, and B-trees are the most common structure.

These are both answers to limitations on disk space. In practice, most storage engines use some combinations of the above.

There are way more details about all of the above in the book. Here are several other resources with good notes 1, 2, and 3.

Chapter 4 - Encoding and Evolution

In chapter 1, the author introduced the concept of evolvability. Since applications change over time, we should aim to build systems that make it easy to adapt to that change.

In most chases, changing an application’s features also requires changing the data that it stores or uses. Maybe we need to add a new field, or change the way existing data is presented, etc..

Different data models have different approaches to dealing with this change. For example, relational databases generally assume that all data in the database conforms to one schema; Although that schema can be changed (through schema migrations; i.e., ALTER statements), there is exactly one schema in force at any one point in time. By contrast, schema-on-read (“schemaless”) databases don’t enforce a schema, so the database can contain a mixture of older and newer data formats written at different times.

When we have data or schema changes, we also often need to make changes to application code. However, this usually can’t happen all at once. For example, we may want to do a rolling upgrade to make sure everything is working properly as we go, and with client-side applications we have to wait for them to upgrade on their own. This means we need to be able to support both old and new versions of the code at the same time.

To facilitate this, we need forward and backward compatibility.

Forward compatibility means that older code can read data that was written by newer code, while backward compatibility means that newer code can read data that was written by older code.

Backward compatibility is normally not hard to achieve: as author of the newer code, you know the format of data written by older code, and so you can explicitly handle it (if necessary by simply keeping the old code to read the old data). Forward compatibility can be trickier, because it requires older code to ignore additions made by a newer version of the code.

More generally, we can think of compatibility as a relationship between one process that encodes the data, and another process that decodes it.

Programs usually work with data in (at least) two different representations:

  1. In memory data is kept in objects, structs, lists, arrays, hash tables, trees, and so on. These data structures are optimized for efficient access and manipulation by the CPU (typically using pointers, which is a reference to a location in memory).
  2. However, whenever you want to send some data to another process with which you don’t share memory—for example, whenever you want to send data over the network or write it to a file—you need to encode it as a sequence of bytes.

So, we need some way to translate between these two representations. The translation from the in-memory representation to a byte sequence is called encoding (also known as serialization or marshalling), and the reverse is called decoding (or parsing, deserialization, and unmarshalling).

The first way to encode data is language-specific methods, such as “pickling” in python.

This is easy to do and requires minimal extra code, but it has some disadvantages:

  • Reading the data in other languages may be difficult.
  • The decoding process needs to be able to instantiate arbitrary classes. This is frequently a source of security problems.
  • Versioning is not robust.
  • Efficiency (specifically, CPU time taken to encode or decode, and the size of the encoded structure) is also often an afterthought.

Next are textual formats such as JSON, XML, and CSV. These have the advantage of being fairly human-readable, but also have some disadvantages:

  • There is often ambiguity about encoding numbers. In XML and CSV, you cannot distinguish between a number and a string that happens to consist of digits (except by referring to an external schema). JSON distinguishes strings and numbers, but it doesn’t distinguish integers and floating-point numbers, and it doesn’t specify a precision. This is a problem when dealing with large numbers in particular.
  • There is no support for binary strings.
  • JSON and XML have support for schemas, but this can be complicated to learn and implement. Many JSON-based tools don’t even bother with schemas.
  • CSV doesn’t have a schema at all.
  • Plus, these formats aren’t the most efficient.

A slightly more space-efficient option is binary encoding formats for JSON and XML, though this results in a loss of human-readability. Apache Thrift and Protocol Buffers (protobuf) are binary encoding libraries that are based on the same principle, though with some key differences:

First, they are more space efficient. Critically, they require a schema:

  • Fields are identified using a tag number. You can change the name of a field in the schema, since the encoded data never refers to field names, but you cannot change a field’s tag, since that would make all existing encoded data invalid. You can also add new fields to the schema, provided that you give each field a new tag number.
  • Forward compatibility is achieved by old code being able to just ignore new tag numbers it doesn’t recognize.
  • And, as long as tag numbers are unique, backwards compatibility is also achieved because new code will always know what the old tag numbers meant. A key consideration, though, is that any new tags must be optional, because otherwise the old code wouldn’t be able to function properly given it isn’t aware of those tags.

Apache Avro is another binary encoding format, but it is a bit different from Thrift and protobuf. Specifically, while Avro also uses a schema to specify the structure of the data being encoded, it has two schema languages: one (Avro IDL) intended for human editing, and one (based on JSON) that is more easily machine-readable. It does not use tag numbers, and there is nothing in the byte sequence to identify fields or their datatypes. The encoding simply consists of values concatenated together.

To parse the binary data, you go through the fields in the order that they appear in the schema and use the schema to tell you the datatype of each field. This means that the binary data can only be decoded correctly if the code reading the data is using the exact same schema as the code that wrote the data. Any mismatch in the schema between the reader and the writer would mean incorrectly decoded data.

So, how does Avro support schema evolution?

  • When an application wants to encode some data (for example, to write it to a file or database or send it over the network), it encodes the data using whatever version of the schema it knows about—for example, that schema may be compiled into the application. This is known as the writer’s schema.
  • When an application wants to decode some data (for example, to read it from a file or database, receive it from the network, etc.), it is expecting the data to be in some schema, which is known as the reader’s schema. That is the schema the application code is relying on —code may have been generated from that schema during the application’s build process.
  • The key idea with Avro is that the writer’s schema and the reader’s schema don’t have to be the same—they only need to be compatible.
  • When data is decoded, the Avro library resolves the differences by looking at the writer’s schema and the reader’s schema side by side and translating the data from the writer’s schema into the reader’s schema.

With Avro, forward compatibility means that you can have a new version of the schema as writer and an old version of the schema as reader. Conversely, backward compatibility means that you can have a new version of the schema as reader and an old version as writer.

This approach without tag numbers has the advantage of being able to have dynamically-generated schemas. Now, if the database schema changes (for example, a table has one column added and one column removed), you can just generate a new Avro schema from the updated database schema and export data in the new Avro schema. By contrast, if you were using Thrift or Protocol Buffers for this purpose, the field tags would likely have to be assigned by hand: every time the database schema changes, an administrator would have to manually update the mapping from database column names to field tags.

Now that we have an understanding of ways data can be encoded and decoded, we turn to how data can flow from one process to another (in other words, who is doing the encoding and decoding).

  • In a database, the process that writes to the database encodes the data, and the process that reads from the database decodes it.
  • If there is only one process accessing the database, the reader is simply a later version of the same process—basically, this is like sending a message to your future self.
  • Backward compatibility is clearly necessary here; otherwise your future self won’t be able to decode what you previously wrote.
  • Generally, we have more than one process accessing a database at once. In this case simultaneously running older and newer code could happen if we were doing a rolling upgrade, for example. This means that a value in the database may be written by a newer version of the code, and subsequently read by an older version of the code that is still running. So, forward compatibility is also often required for databases.

When you have processes that need to communicate over a network, there are a few different ways of arranging that communication. The most common arrangement is to have two roles: clients and servers. The servers expose an API over the network, and the clients can connect to the servers to make requests to that API. The API exposed by the server is known as a service.

For example, this is how the web works: clients (web browsers) make requests to web servers, making GET requests to download HTML, CSS, JavaScript, images, etc., and making POST requests to submit data to the server.

As another example, a native app running on a mobile device or a desktop computer can also make network requests to a server.

Or, a server can itself be a client to another service (for example, a typical web app server acts as client to a database). This approach is often used to decompose a large application into smaller services by area of functionality, such that one service makes a request to another when it requires some functionality or data from that other service. This way of building applications has traditionally been called a service-oriented architecture, also known as microservices architecture.

When HTTP is used as the underlying protocol for talking to the service, it is called a web service. REST APIs are currently the most widely used, but the book does go into detail about others (specifically, SOAP, and remote procedure call/RPCs). The author mostly talks about why he doesn’t like these, and I think we’re unlikely to come across them too much in our work, so I’ll leave out the details here.

Since APIs are often used by many people across organizations the provider of a service often has no control over its clients and cannot force them to upgrade. Thus, compatibility needs to be maintained for a long time, perhaps indefinitely. If a compatibility-breaking change is required, the service provider often ends up maintaining multiple versions of the service API side by side. There is no agreement on how API versioning should work (i.e., how a client can indicate which version of the API it wants to use). For RESTful APIs, common approaches are to use a version number in the URL or in the HTTP Accept header.

Asynchronous message-passing systems are somewhere between RPC and databases. They are similar to RPC in that a client’s request (usually called a message) is delivered to another process with low latency. They are similar to databases in that the message is not sent via a direct network connection, but goes via an intermediary called a message broker (also called a message queue or message-oriented middleware), which stores the message temporarily.

Using a message broker has several advantages compared to direct RPC:

  • It can act as a buffer if the recipient is unavailable or overloaded, and thus improve system reliability.
  • It can automatically redeliver messages to a process that has crashed, and thus prevent messages from being lost.
  • It avoids the sender needing to know the IP address and port number of the recipient (which is particularly useful in a cloud deployment where virtual machines often come and go).
  • It allows one message to be sent to several recipients.
  • It logically decouples the sender from the recipient (the sender just publishes messages and doesn’t care who consumes them).

However, a difference compared to RPC is that message-passing communication is usually one-way: a sender normally doesn’t expect to receive a reply to its messages. It is possible for a process to send a response, but this would usually be done on a separate channel. This communication pattern is asynchronous: the sender doesn’t wait for the message to be delivered, but simply sends it and then forgets about it.

While different message brokers operate differently, in general one process sends a message to a named queue or topic, and the broker ensures that the message is delivered to one or more consumers of or subscribers to that queue or topic. There can be many producers and many consumers on the same topic.

Message brokers typically don’t enforce any particular data model—a message is just a sequence of bytes with some metadata, so you can use any encoding format. If the encoding is backward and forward compatible, you have the greatest flexibility to change publishers and consumers independently and deploy them in any order.

Chapter 5: Replication

Replication is keeping multiple copies of data in different locations. You might want to do this to improve availability (in case some parts of the system fails), latency (by bringing the data closer to users), and scalability (there are only so many reads and writes that a single copy of the data can handle).

One of the large trade offs when implementing replication is between consistency of the data, and speed of information retrieval. If consistency is the most important, write operations are costly since every write needs to be replicated to all/many of the copies. If writing needs to be frequent/cheap, there are more opportunities for data to be improperly replicated.

Replication can be done a few ways:

  • Statement-based: The same statement is run on all the machines (like a SQL insert statement). This would be a problem for non-deterministic functions (like queries with current times or random numbers), and statements can depend on past data in a table, which might be different for each partition.
  • Write-ahead log shipping: Byte-level logs from on-disk storage are sent around. This addresses issues with non-deterministic functions, but since logs are very low-level they might not be compatible with all versions of a database, so there are no zero-downtime upgrades.
  • Logical (row-based): Instead of logging the byte-level store of the data, you translate each action into a row-based log. You basically have a unique identifier for each row that is changing on each action, and you record what actually changed with it. This addresses all the problems with non-determinism, allows between-version compatibility, and is very compatible with external stores.
  • Trigger-based replication: Replication is done by an outside application vs. the database. Allows for flexible conflict resolution, but needs to be defined for each application individually.

There are three main data paradigms described in the chapter:

  • Single-leader replication: You only ever write data to the leader, which replicates its data to followers. You can only write to the leader but can read from any of the followers. This is good for situations with few writes and many reads.
  • Multi-leader replication: If we have many writes, we can add more leaders. This also improves tolerance of outages. A downside is that now write conflicts are possible.
  • Leaderless: Weakens the assumption that all data needs to be replicated all the time. In this system you try to write to a subset of the nodes, and you try to read from all of them. You read back data with a version attached, and take the most recent version. This is a more fault-tolerant system, but now we have a need to reach quorum on reads given that there will be different versions.

Each of these options has their own set of challenges:

A big problem that can occur with replication is replication lag. This shows up in a few ways:

  • If there is a short period between writes and reads (before replication is done) we could have a situation where a user could read from a follower replica that hasn’t gotten the write info yet. A solution is read-after-write (users can only read to the leader for data they may have written).
  • Another issue with replication lag is “time-traveling,” where a user reads the same thing twice from different replicas. For example, maybe a user reads a comment, then refreshes, and it’s gone if they now read from a replica that hasn’t updated yet. The solution here is monotonic reads, where users are guaranteed that reads in sequence do not result in older data being shown after newer data. This could be accomplished by having a user always read from the same replica for some period of time.
  • Lastly, we could have a situation where events that happen in sequence appear to happen out of sequence. This can be tricky to fix because of the difficulty of determining event order between partitions. One approach is to attach version info to data, so

Handling write conflicts:

  • Avoid them altogether by only allowing a user to write to a single leader.
  • If that is not possible, you can do convergent conflict resolution (pick some rule to consistently resolve conflicts), like always taking the highest id of an edit. This is simple and understandable, but prone to data loss.
  • Merging values.
  • Have a data store that allows for the recording of the conflict and application code that resolves conflicts later.

Reaching quorum: A general rule is that the number of reads + the number of writes exceeds the total number of nodes. This gives us a guarantee that for every read <> write pair, we have at least one node that it is written to that is being read from.

Chapter 6: Partitioning

For very large datasets, or very high query throughput, replication is not sufficient: we need to break the data up into partitions, also known as sharding. Partitioning is necessary when you have so much data that storing and processing it on a single machine is no longer feasible.

Normally, partitions are defined in such a way that each piece of data (each record, row, or document) belongs to exactly one partition. In effect, each partition is a small database of its own, although the database may support operations that touch multiple partitions at the same time. Partitioning is usually combined with replication so that copies of each partition are stored on multiple nodes. This means that even though each record belongs to exactly one partition, it may still be stored on several different nodes for fault tolerance.

The main reason for wanting to partition data is scalability. Different partitions can be placed on different nodes in a shared-nothing cluster. Thus, a large dataset can be distributed across many disks, and the query load can be distributed across many processors. The goal of partitioning is to spread the data and query load evenly across multiple machines, avoiding hot spots (nodes with disproportionately high load).

For queries that operate on a single partition, each node can independently execute the queries for its own partition, so query throughput can be scaled by adding more nodes. Large, complex queries can potentially be parallelized across many nodes, although this gets significantly harder.

Our goal with partitioning is to spread the data and the query load evenly across nodes. If every node takes a fair share, then—in theory—10 nodes should be able to handle 10 times as much data and 10 times the read and write throughput of a single node (ignoring replication for now).

If the partitioning is unfair, so that some partitions have more data or queries than others, we call it skewed. The presence of skew makes partitioning much less effective. In an extreme case, all the load could end up on one partition, so 9 out of 10 nodes are idle and your bottleneck is the single busy node. A partition with disproportionately high load is called a hot spot.

The simplest approach for avoiding hot spots would be to assign records to nodes randomly. That would distribute the data quite evenly across the nodes, but it has a big disadvantage: when you’re trying to read a particular item, you have no way of knowing which node it is on, so you have to query all nodes in parallel.

If we assume we have a key-value data model where we always access records by PK, we have a few ways to partition data:

Key-range partitioning: Assign a continuous range of keys to each partition.

  • If you know the boundaries between the ranges, you can easily determine which partition contains a given key. If you also know which partition is assigned to which node, then you can make your request directly to the appropriate node.
  • The ranges of keys are not necessarily evenly spaced, because your data may not be evenly distributed; Simply having one shelf per two letters of the alphabet would lead to some shelves being much bigger than others. In order to distribute the data evenly, the partition boundaries need to adapt to the data.
  • Within each partition, we can keep keys in sorted order. This has the advantage that range scans are easy, and you can treat the key as a concatenated index in order to fetch several related records in one query.
  • On the flip side, certain access patterns can lead to hotspots (for example, if partitioning by date, all writes for a certain day may go to the same node). A solution is to add a prefix to the key so that data is first partitioned by prefix.

Hash partitioning: Because of this risk of skew and hot spots, many distributed datastores use a hash function to determine the partition for a given key. A good hash function takes skewed data and makes it uniformly distributed.

  • Say you have a 32-bit hash function that takes a string. Whenever you give it a new string, it returns a seemingly random number between 0 and 232 − 1. Even if the input strings are very similar, their hashes are evenly distributed across that range of numbers. Once you have a suitable hash function for keys, you can assign each partition a range of hashes (rather than a range of keys), and every key whose hash falls within a partition’s range will be stored in that partition.
  • This technique is good at distributing keys fairly among the partitions. The partition boundaries can be evenly spaced, or they can be chosen pseudorandomly (in which case the technique is sometimes known as consistent hashing).
  • Unfortunately however, by using the hash of the key for partitioning we lose a nice property of key-range partitioning: the ability to do efficient range queries. Keys that were once adjacent are now scattered across all the partitions, so their sort order is lost.
  • A combination of the above: Use a compound primary key consisting of several columns. Only the first part of that key is hashed to determine the partition, but the other columns are used as a concatenated index for sorting the data.
  • A query therefore cannot search for a range of values within the first column of a compound key, but if it specifies a fixed value for the first column, it can perform an efficient range scan over the other columns of the key.
  • The concatenated index approach enables an elegant data model for one-to-many relationships. For example, on a social media site, one user may post many updates. If the primary key for updates is chosen to be (user_id, update_timestamp), then you can efficiently retrieve all updates made by a particular user within some time interval, sorted by timestamp. Different users may be stored on different partitions, but within each user, the updates are stored ordered by timestamp on a single partition.

All of the above assumes there is only one primary key index. It gets more complicated if there are secondary indices. There are two main approaches to partitioning a database with secondary indexes:

Document-based partitioning: Each partition is completely different and maintains its own secondary index.

  • For example, imagine you are operating a website for selling used cars. Each listing has a unique ID—call it the document ID—and you partition the database by the document ID (for example, IDs 0 to 499 in partition 0, IDs 500 to 999 in partition 1, etc.). You want to let users search for cars, allowing them to filter by color and by make, so you need a secondary index on color and make (in a document database these would be fields; in a relational database they would be columns). If you have declared the index, the database can perform the indexing automatically. For example, whenever a red car is added to the database, the database partition automatically adds it to the list of document IDs for the index entry color:red.
  • In this indexing approach, each partition is completely separate: each partition maintains its own secondary indexes, covering only the documents in that partition. It doesn’t care what data is stored in other partitions. Whenever you need to write to the database—to add, remove, or update a document—you only need to deal with the partition that contains the document ID that you are writing. For that reason, a document-partitioned index is also known as a local index.
  • However, reading from a document-partitioned index requires care: unless you have done something special with the document IDs, there is no reason why all the cars with a particular color or a particular make would be in the same partition. Thus, if you want to search for red cars, you need to send the query to all partitions, and combine all the results you get back. This approach to querying a partitioned database is sometimes known as scatter/gather, and it can make read queries on secondary indexes quite expensive.

Term-based partitioning: Rather than each partition having its own secondary index (a local index), we can construct a global index that covers data in all partitions. However, we can’t just store that index on one node, since it would likely become a bottleneck and defeat the purpose of partitioning. A global index must also be partitioned, but it can be partitioned differently from the primary key index.

  • For example, red cars from all partitions appear under color:red in the index, but the index is partitioned so that colors starting with the letters a to r appear in partition 0 and colors starting with s to z appear in partition 1. The index on the make of car is partitioned similarly (with the partition boundary being between f and h). We call this kind of index term-partitioned, because the term we’re looking for determines the partition of the index. Here, a term would be color:red, for example. The name term comes from full-text indexes (a particular kind of secondary index), where the terms are all the words that occur in a document.
  • The advantage of a global (term-partitioned) index over a document-partitioned index is that it can make reads more efficient: rather than doing scatter/gather over all partitions, a client only needs to make a request to the partition containing the term that it wants. However, the downside of a global index is that writes are slower and more complicated, because a write to a single document may now affect multiple partitions of the index (every term in the document might be on a different partition, on a different node).

Rebalancing

Any changes that require moving load from one node in the cluster to another are called rebalancing. For example, maybe we add a new node, or a failed node. The existing partitions need to be rebalanced to account for this.

There are three strategies for rebalancing:

  • Fixed number of partitions: The number of partitions doesn’t change throughout rebalancing. This is tricky because you have to choose the right number of partitions – you might have too many partitions (leading to high management overhead) or too few (making rebalancing expensive).
  • Dynamic partitioning: If a partition exceeds a configured size it will split into two halves, one of which will be moved to another node. Conversely, if a partition is too small, it can be merged with another one.
  • Partitioning proportionally to nodes: In the first two strategies, the number of partitions is independent of the number of nodes. In this case there are a fixed number of partitions per node. When a new node is added, it randomly splits existing partitions and takes ownership of half.

No matter which partitioning scheme is used, rebalancing is usually expected to meet some minimum requirements:

  • After rebalancing, the load (data storage, read and write requests) should be shared fairly between the nodes in the cluster.
  • While rebalancing is happening, the database should continue accepting reads and writes.
  • No more data than necessary should be moved between nodes, to make rebalancing fast and to minimize the network and disk I/O load.

Rebalancing can be done automatically or manually. A manual approach is recommended in the book.

Chapter 7: Transactions

Many things can go wrong in data systems. For example, the system may fail mid-operation, or multiple operations may happen at the same time, causing unexpected bugs.

Transactions have been the mechanism of choice for simplifying the issue of fault-tolerance.

A transaction is a way for an application to group several reads and writes together into a logical unit. All the reads and writes in a transaction are executed as one operation: either the entire transaction succeeds or it fails and is aborted or rolled back. If it fails, the application can safely retry.

Transactions can be for a single object (for example, updating a row in one table), or multiple objects (like updating a row in one table that has a foreign key reference to a row in another table).

With transactions, error handling becomes much simpler for an application, because it doesn’t need to worry about partial failure—i.e., the case where some operations succeed and some fail (for whatever reason).

Not every application needs transactions, though. For example, an application with very simple access patterns, such as reading and writing only a single record, can probably manage without transactions. However, for more complex access patterns, transactions can hugely reduce the number of potential error cases you need to think about.

The question is, how do you figure out whether you need transactions? In order to answer that question, we first need to understand exactly what safety guarantees transactions can provide, and what costs are associated with them.

The safety guarantees provided by transactions are often described by the well-known acronym ACID, which stands for Atomicity, Consistency, Isolation, and Durability. There is some ambiguity about the definitions of each of these terms, so one system that is ACID-compliant may be very different from another that claims the same thing. Generally, though:

  • Atomicity: In general, atomic refers to something that cannot be broken down into smaller parts. An atomic transaction is an indivisible series of database operations such that either all occurs, or nothing occurs. An example of an atomic transaction is a monetary transfer from bank account A to account B. It consists of two operations, withdrawing the money from account A and saving it to account B. Performing these operations in an atomic transaction ensures that the database remains in a consistent state, that is, money is neither lost nor created if either of those two operations fail.
  • Consistency: The idea of ACID consistency is that you have certain statements about your data that must always be true—for example, in an accounting system, credits and debits across all accounts must always be balanced. However, if you want to write data that violates this norm, you can – the database won’t stop you. Unlike the other three, whereas consistency is a property of the application. The application may rely on the database’s atomicity and isolation properties in order to achieve consistency, but it’s not up to the database alone. Thus, the letter C doesn’t really belong in ACID.
  • Isolation: Most databases are accessed by several clients at the same time. That is no problem if they are reading and writing different parts of the database, but if they are accessing the same database records, you can run into concurrency problems (race conditions). Isolation in the sense of ACID means that concurrently executing transactions are isolated from each other: they cannot step on each other’s toes.
  • Durability: Durability is the promise that once a transaction has committed successfully, any data it has written will not be forgotten, even if there is a hardware fault or the database crashes.

Concurrency is tricky, so databases have long tried to hide concurrency issues from application developers by providing “transaction isolation.” In theory, isolation should make your life easier by letting you pretend that no concurrency is happening: serializable isolation means that the database guarantees that transactions have the same effect as if they ran serially.

However, serializable isolation has a performance cost, and many databases don’t want to pay that price. It’s therefore common for systems to use weaker levels of isolation, which protect against some concurrency issues, but not all. Even many popular relational database systems (which are usually considered “ACID”) use weak isolation.

Different levels of isolation protect against different levels of bugs:

Read Committed

The most basic level of transaction isolation is read committed. It makes two guarantees: When reading from the database, you will only see data that has been committed (no dirty reads), and when writing to the database, you will only overwrite data that has been committed (no dirty writes).

Dirty reads are when a transaction can see data that is being written by another transaction, but has not yet committed or aborted (basically, it isn’t final). Dirty writes are similar: One client overwrites data that another client has written, but not yet committed.

These are two good protections, but the read committed level of isolation does allow for read skew, also known as non-repeatable reads. This is when a client sees different parts of the database at different points in time. For example, taking a backup requires making a copy of the entire database, which may take hours on a large database. During the time that the backup process is running, writes will continue to be made to the database. Thus, you could end up with some parts of the backup containing an older version of the data, and other parts containing a newer version. If you need to restore from such a backup, the inconsistencies become permanent.

Snapshot Isolation

Read skew is most commonly prevented with snapshot isolation, which allows a transaction to only read from a consistent snapshot at one point in time.

In order to implement this, the database must potentially keep several different committed versions of an object, because various in-progress transactions may need to see the state of the database at different points in time. Because it maintains several versions of an object side by side, this technique is known as multi-version concurrency control (MVCC).

So, snapshot isolation protects against dirty reads, dirty writes, and read skew. But there are other kinds of conflicts that can occur with concurrent writes. For example, the lost update problem can occur if an application reads some value from the database, modifies it, and writes back the modified value (a read-modify-write cycle). If two transactions do this concurrently, one of the modifications can be lost, because the second write does not include the first modification. This can be addressed through:

  • Atomic write operations, which lock an object when it is read so that no other transaction can read it until the update has been applied.
  • Explicit locking can be an option if the database’s built-in atomic operations don’t provide the necessary functionality. This is where the application explicitly locks objects that are going to be updated. Then the application can perform a read-modify-write cycle, and if any other transaction tries to concurrently read the same object, it is forced to wait until the first read-modify-write cycle has completed. In this case, the onus is on the application, not the database.
  • In databases that don’t provide transactions, you sometimes find an atomic compare-and-set operation. The purpose of this operation is to avoid lost updates by allowing an update to happen only if the value has not changed since you last read it. If the current value does not match what you previously read, the update has no effect, and the read-modify-write cycle must be retried.
  • In replicated systems, a common approach is to allow concurrent writes to create several conflicting versions of a value, and to use application code or special data structures to resolve and merge these versions after the fact.

Phantom reads are another potential issue. Phantom reads are when a transaction reads objects that match some search condition, and another client makes a write that affects the results of that search.

Snapshot isolation prevents straightforward phantom reads, but phantoms in the context of “write skew” require special treatment, such as index-range locks. Write skew is when you have some condition (such as, there must be at least one doctor on call) and writes are made to different objects at the same time, the condition can be violated (both doctors request to go at the same time, both are approved, and the expectations are broken). The anomalous behavior was only possible because the transactions ran concurrently. This is only preventable with serializable isolation.

Actual Serial Execution

Serializable isolation is usually regarded as the strongest isolation level. It guarantees that even though transactions may execute in parallel, the end result is the same as if they had executed one at a time, serially, without any concurrency. There are a few approaches. The most straightforward is to literally execute transactions in a serial order. In other words, we execute only one transaction at a time, in serial order, on a single thread.

In order for this to work, a few conditions must be met:

  • Every transaction must be small and fast, because it takes only one slow transaction to stall all transaction processing.
  • It is limited to use cases where the active dataset can fit in memory.
  • Write throughput must be low enough to be handled on a single CPU core, or else transactions need to be partitioned without requiring cross-partition coordination.
  • Cross-partition transactions are possible, but there is a hard limit to the extent to which they can be used.

Generally, actual serial execution does not scale well.

Two-Phase Locking (2PL)

Another approach is two-phase locking, which was the standard for decades.

Several transactions are allowed to concurrently read the same object as long as nobody is writing to it. But as soon as anyone wants to write (modify or delete) an object, exclusive access is required. So A can’t read an object B is writing to until it is done, and A can’t write to an object while B is reading.

In 2PL, writers don’t just block other writers; they also block readers and vice versa.

The big downside of two-phase locking, and the reason why it hasn’t been used by everybody since the 1970s, is performance: transaction throughput and response times of queries are significantly worse under two-phase locking than under weak isolation.

Serializable Snapshot Isolation

Two-phase locking is a so-called pessimistic concurrency control mechanism: it is based on the principle that if anything might possibly go wrong (as indicated by a lock held by another transaction), it’s better to wait until the situation is safe again before doing anything.

Serial execution is even more pessimistic: it is essentially equivalent to each transaction having an exclusive lock on the entire database (or one partition of the database) for the duration of the transaction.

By contrast, serializable snapshot isolation is an optimistic concurrency control technique. Optimistic in this context means that instead of blocking if something potentially dangerous happens, transactions continue anyway, in the hope that everything will turn out all right. When a transaction wants to commit, the database checks whether anything bad happened (i.e., whether isolation was violated); if so, the transaction is aborted and has to be retried.

Only transactions that executed serializably are allowed to commit. Others are aborted.

It performs badly if there is high contention (many transactions trying to access the same objects), as this leads to a high proportion of transactions needing to abort. If the system is already close to its maximum throughput, the additional transaction load from retried transactions can make performance worse. However, if there is enough spare capacity, and if contention between transactions is not too high, optimistic concurrency control techniques tend to perform better than pessimistic ones.

Chapter 8: The Trouble with Distributed Systems

When working with a single computer, we typically expect deterministic behavior: Either things work, or they don’t, and it’s typically pretty consistent. This is no longer the case with distributed systems, where many things can go wrong, either entirely or partially.

The fact that partial failures can occur is the defining characteristic of distributed systems. The difficulty is that partial failures are nondeterministic: it may sometimes work and sometimes unpredictably fail. The process could also randomly go slow, or not respond at all (and eventually time out).

Three common families of issues with distributed systems are problems with networks, clocks, process pauses.

Problems with Networks

The internet and most internal networks in data centers are “asynchronous packet networks.” In this kind of network, one node can send a message (called a packet) to another node, but the network gives no guarantees as to when it will arrive, or whether it will arrive at all. If you send a request and expect a response, many things could go wrong. For example, your request could get lost, could get stuck in a queue, the remote node may have failed or temporarily stopped responding, the remote node may have responded but the response got lost, or the response has been delayed.

If you send a request and don’t get a response, it’s not possible to distinguish exactly what happened out of any of the previous options.

The usual way of handling this issue is a timeout: after some time you give up waiting and assume that the response is not going to arrive. However, when a timeout occurs, you still don’t know whether the remote node got your request or not (and if the request is still queued somewhere, it may still be delivered to the recipient, even if the sender has given up on it).

Problems with Clocks

In a distributed system, time is a tricky business, because communication is not instantaneous: it takes time for a message to travel across the network from one machine to another. The time when a message is received is always later than the time when it is sent, but due to variable delays in the network, we don’t know how much later. This fact sometimes makes it difficult to determine the order in which things happened when multiple machines are involved.

Moreover, each machine on the network has its own clock, which is an actual hardware device: usually a quartz crystal oscillator. These devices are not perfectly accurate, so each machine has its own notion of time, which may be slightly faster or slower than on other machines. They can also drift depending on things like temperature.

Modern computers have at least two different kinds of clocks: a time-of-day clock and a monotonic clock.

A time-of-day clock does what you intuitively expect of a clock: it returns the current date and time according to some calendar. Since each computer has its own, these rarely line up. It is possible to synchronize time-of-day clocks to some degree: the most commonly used mechanism is the Network Time Protocol (NTP), which allows the computer clock to be adjusted according to the time reported by a group of servers. The servers in turn get their time from a more accurate time source, such as a GPS receiver.

A monotonic clock is suitable for measuring a duration (like a time interval), such as a timeout or a service’s response time. You can check the value of the monotonic clock at one point in time, do something, and then check the clock again at a later time. The difference between the two values tells you how much time elapsed between the two checks. However, the absolute value of the clock is meaningless: it might be the number of nanoseconds since the computer was started, or something similarly arbitrary. In particular, it makes no sense to compare monotonic clock values from two different computers, because they don’t mean the same thing.

In a distributed system, using a monotonic clock for measuring elapsed time (e.g., timeouts) is usually fine, because it doesn’t assume any synchronization between different nodes’ clocks and is not sensitive to slight inaccuracies of measurement.

The problem with clocks is that while they seem simple and easy to use, they have a surprising number of pitfalls: a day may not have exactly 86,400 seconds, time-of-day clocks may move backward in time (if they are re-synced using NTP), and the time on one node may be quite different from the time on another node.

This can be important for applications that rely on clocks, such as ordering events across multiple nodes, which is needed for things like deciding which of two writes happened most recently. In systems that use LWW (last write wins) we could have a major issue if the clocks aren’t synced properly. We could also lose data without knowing it if a node with fast clock overwrites data from a node with a slower clock.

Generally, it doesn’t make sense to think of a clock reading as a point in time—it is more like a range of times, within a confidence interval: for example, a system may be 95% confident that the time now is between 10.3 and 10.5 seconds past the minute.

Process Pauses

Another time-related family of issues is process pauses. In this case, consider a node whose execution is paused, even in the middle of some function. During the pause, the rest of the nodes keep up their work, and may even declare the paused node dead since it isn’t responding. Eventually, the paused node may continue running, without even noticing that it was asleep until it checks its clock sometime later. It might overwrite work or duplicate some process when it wakes up.

In some environments, a situation like this (or any failure to respond within a specified time) can cause serious damage: computers that control aircraft, rockets, robots, cars, and other physical objects must respond quickly and predictably to their sensor inputs. In these systems, there is a specified deadline by which the software must respond; if it doesn’t meet the deadline, that may cause a failure of the entire system. These are so-called hard real-time systems. But implementing this is expensive, so rarely done unless totally necessary.

Detecting Faults

To tolerate faults, the first step is to detect them, but even that is hard. Most systems don’t have an accurate mechanism of detecting whether a node has failed, so most distributed algorithms rely on timeouts to determine whether a remote node is still available.

If you’re going to use timeouts, the next question is how long the timeout should be. There’s a trade-off here because a long timeout means a long wait until a node is declared dead (and during this time, users may have to wait or see error messages). A short timeout detects faults faster, but carries a higher risk of incorrectly declaring a node dead when in fact it has only suffered a temporary slowdown.

Prematurely declaring a node dead is problematic, because if the node is actually alive and in the middle of performing some action (for example, sending an email), and another node takes over, the action may end up being performed twice (like we talked about with process pauses).

Also, when a node is declared dead, its responsibilities need to be transferred to other nodes, which places additional load on other nodes and the network. If the system is already struggling with high load, declaring nodes dead prematurely can make the problem worse. In particular, it could happen that the node actually wasn’t dead but only slow to respond due to overload; transferring its load to other nodes can cause a cascading failure (in the extreme case, all nodes declare each other dead, and everything stops working).

It’s hard to say exactly the perfect length of time to wait, because there is often a lot of variability in packet delays. This is often caused by queuing (basically, too much traffic).

In such environments, you can only choose timeouts experimentally: measure the distribution of network round-trip times over an extended period, and over many machines, to determine the expected variability of delays. Then, taking into account your application’s characteristics, you can determine an appropriate trade-off between failure detection delay and risk of premature timeouts.

Even better, rather than using configured constant timeouts, systems can continually measure response times and their variability (jitter), and automatically adjust timeouts according to the observed response time distribution.

Another issue entirely is that sometimes a node can be in a degraded state: for example, a Gigabit network interface could suddenly drop to 1 Kb/s throughput due to a driver bug. Such a node that is “limping” but not dead can be even more difficult to deal with than a cleanly failed node.

Making Decisions

Once a fault is detected, making a system tolerate it is not easy either: there is no global variable, no shared memory, no common knowledge or any other kind of shared state between the machines. In order to make decisions, we need a quorum among the nodes:

  • That includes decisions about declaring nodes dead. If a quorum of nodes declares another node dead, then it must be considered dead, even if that node still very much feels alive. The individual node must abide by the quorum decision and step down.
  • Most commonly, the quorum is an absolute majority of more than half the nodes (although other kinds of quorums are possible). A majority quorum allows the system to continue working if individual nodes have failed (with three nodes, one failure can be tolerated; with five nodes, two failures can be tolerated). However, it is still safe, because there can only be only one majority in the system—there cannot be two majorities with conflicting decisions at the same time.

Distributed systems problems become much harder if there is a risk that nodes may “lie” (send arbitrary faulty or corrupted responses)—for example, if a node may claim to have received a particular message when in fact it didn’t. Such behavior is known as a Byzantine fault. However, the author says that in the kinds of systems we discuss in this book, we can usually safely assume that there are no Byzantine faults.

System Models

There are various system models that outline what kinds of faults are expected in a system.

The following models consider timing delays:

  • Synchronous model: The synchronous model assumes bounded network delay, bounded process pauses, and bounded clock error. This does not imply exactly synchronized clocks or zero network delay; it just means you know that network delay, pauses, and clock drift will never exceed some fixed upper bound. The synchronous model is not a realistic model of most practical systems, because (as discussed in this chapter) unbounded delays and pauses do occur.
  • Partially synchronous model: Partial synchrony means that a system behaves like a synchronous system most of the time, but it sometimes exceeds the bounds for network delay, process pauses, and clock drift. This is a realistic model of many systems: most of the time, networks and processes are quite well behaved—otherwise we would never be able to get anything done—but we have to reckon with the fact that any timing assumptions may be shattered occasionally. When this happens, network delay, pauses, and clock error may become arbitrarily large.
  • Asynchronous model: In this model, an algorithm is not allowed to make any timing assumptions—in fact, it does not even have a clock (so it cannot use timeouts). Some algorithms can be designed for the asynchronous model, but it is very restrictive.

Besides timing issues, we have to consider node failures. The three most common system models for nodes are:

  • Crash-stop faults: In the crash-stop model, an algorithm may assume that a node can fail in only one way, namely by crashing. This means that the node may suddenly stop responding at any moment, and thereafter that node is gone forever—it never comes back.
  • Crash-recovery faults: We assume that nodes may crash at any moment, and perhaps start responding again after some unknown time. In the crash-recovery model, nodes are assumed to have stable storage (i.e., nonvolatile disk storage) that is preserved across crashes, while the in-memory state is assumed to be lost.
  • Byzantine (arbitrary) faults: Nodes may do absolutely anything, including trying to trick and deceive other nodes, as described in the last section.

For modeling real systems, the partially synchronous model with crash-recovery faults is generally the most useful model.

Chapter 9: Consistency and Consensus

Now that we covered potential faults in distributed systems in the last chapter, this chapter is about tolerating those faults.

Consistency Guarantees

If you look at two database nodes at the same moment in time, you’re likely to see different data on the two nodes, because write requests arrive at different times. These inconsistencies occur no matter what replication method the database uses (single-leader, multi-leader, or leaderless replication). Most replicated databases provide at least eventual consistency, which means that if you stop writing to the database and wait for some unspecified length of time, then eventually all read requests will return the same value. In other words, the inconsistency is temporary, and it eventually resolves itself. But it doesn’t say anything about when this will happen.

When working with a database that provides only weak guarantees, you need to be constantly aware of its limitations and not accidentally assume too much. Bugs are often subtle and hard to find by testing, because the application may work well most of the time. The edge cases of eventual consistency only become apparent when there is a fault in the system (e.g., a network interruption) or at high concurrency.

However, stronger guarantees come at the cost of performance and fault tolerance.

Linearizability (also known as atomic consistency, strong consistency, immediate consistency, or external consistency) is a popular consistency model: its goal is to make replicated data appear as though there were only a single copy, and to make all operations act on it atomically.

In a linearizable system, as soon as one client successfully completes a write, all clients reading from the database must be able to see the value just written. Maintaining the illusion of a single copy of the data means guaranteeing that the value read is the most recent, up-to-date value, and doesn’t come from a stale cache or replica. In other words, linearizability is a recency guarantee.

Linearizability is not the same as serializability. Serializability is an isolation property of transactions, where every transaction may read and write multiple objects (rows, documents, records). It guarantees that transactions behave the same as if they had executed in some serial order (each transaction running to completion before the next transaction starts). It is okay for that serial order to be different from the order in which transactions were actually run.

Linearizability is a recency guarantee on reads and writes of a register (an individual object). It doesn’t group operations together into transactions, so it does not prevent problems such as write skew, unless you take additional measures such as materializing conflicts.

A database may provide both serializability and linearizability, and this combination is known as strict serializability or strong one-copy serializability

Although linearizability is appealing because it is easy to understand—it makes a database behave like a variable in a single-threaded program—it has the downside of being slow, especially in environments with large network delays. It is also less tolerant to network problems.

The Unhelpful CAP Theorem

CAP is sometimes presented as Consistency, Availability, Partition tolerance: pick two out of three. Unfortunately, putting it this way is misleading because network partitions are a kind of fault, so they aren’t something about which you have a choice: they will happen whether you like it or not.

At times when the network is working correctly, a system can provide both consistency (linearizability) and total availability. When a network fault occurs, you have to choose between either linearizability or total availability. Thus, a better way of phrasing CAP would be either Consistent or Available when Partitioned. A more reliable network needs to make this choice less often, but at some point the choice is inevitable.

In discussions of CAP there are several contradictory definitions of the term availability, and the formalization as a theorem does not match its usual meaning. Many so-called “highly available” (fault-tolerant) systems actually do not meet CAP’s idiosyncratic definition of availability.

The CAP theorem as formally defined is also of very narrow scope: it only considers one consistency model (namely linearizability) and one kind of fault (network partitions, or nodes that are alive but disconnected from each other). It doesn’t say anything about network delays, dead nodes, or other trade-offs. Thus, although CAP has been historically influential, there is a lot of misunderstanding and confusion around CAP, and it does not help us understand systems better, so it is best avoided.

Causality is another consistency model which imposes an ordering on events in a system (what happened before what, based on cause and effect). Unlike linearizability, which puts all operations in a single, totally ordered timeline, causality provides us with a weaker consistency model: some things can be concurrent, so the version history is like a timeline with branching and merging. Causal consistency does not have the coordination overhead of linearizability and is much less sensitive to network problems.

However, even if we capture the causal ordering, some things cannot be implemented this way: For example, ensuring that a username is unique and rejecting concurrent registrations for the same username. If one node is going to accept a registration, it needs to somehow know that another node isn’t concurrently in the process of registering the same name. This problem is why we often need consensus.

Consensus

Achieving consensus means deciding something in such a way that all nodes agree on what was decided, and such that the decision is irrevocable. For example, a database must decide whether to commit or abort a distributed transaction, or a messaging system must decide on the order in which to deliver messages.

These issues are straightforward if you only have a single node, or if you are willing to assign the decision-making capability to a single node. This is what happens in a single-leader database: all the power to make decisions is vested in the leader, which is why such databases are able to provide linearizable operations, uniqueness constraints, a totally ordered replication log, and more.

However, if that single leader fails, or if a network interruption makes the leader unreachable, such a system becomes unable to make any progress. There are three ways of handling that situation:

Wait for the leader to recover, and accept that the system will be blocked in the meantime. This approach does not fully solve consensus because it does not satisfy the termination property: if the leader does not recover, the system can be blocked forever. Manually fail over by getting humans to choose a new leader node and reconfigure the system to use it. Many relational databases take this approach. The speed of failover is limited by the speed at which humans can act, which is generally slower than computers. Use an algorithm to automatically choose a new leader. This approach requires a consensus algorithm, and it is advisable to use a proven algorithm that correctly handles adverse network conditions

Although a single-leader database can provide linearizability without executing a consensus algorithm on every write, it still requires consensus to maintain its leadership and for leadership changes. Thus, in some sense, having a leader only “kicks the can down the road”: consensus is still required, only in a different place, and less frequently. The good news is that fault-tolerant algorithms and systems for consensus exist.

Tools like ZooKeeper play an important role in providing an “outsourced” consensus, failure detection, and membership service that applications can use.

Note that not every system necessarily requires consensus: for example, leaderless and multi-leader replication systems typically do not use global consensus. The conflicts that occur in these systems are a consequence of not having consensus across different leaders, but maybe that’s okay: maybe we simply need to cope without linearizability and learn to work better with data that has branching and merging version histories.

Chapter 10: Batch Processing

Engineering batch processes are where you have some process scheduled periodically with the goal of optimizing for throughput vs. latency. For example, a pre-computed search index.

The chapter motivates batch processing with unix principles:

  1. Inputs are immutable (unchanging over time).
  2. Outputs are intended to be used as inputs (basically a piping process from one function to another).
  3. Complex problems are solved with small, specialized tools.

These principles are carried over into MapReduce, where instead of the immutable type being a text file as in the unix case, it is a distributed database with records.

MapReduce works as follows:

  • The goal is to have a unix-like process that can do tasks on very large, highly distributed data systems. We have a bunch of parallel nodes, each storing some partition of the data.
  • On each parallel node you first run a Map task that extracts key-value pairs.
  • On each Map node you sort the values by key within each partition.
  • Next, values with the same key are transferred to the same node.
  • Finally, on each of your many parallel reduce nodes we apply some Reducer function to aggregate all the rows with the same key.

For example, if you wanted to compute the average age for users in a country. Our Map function would get a key value pair for each record, where the key is the user’s country code and the value is the user’s age. These keys are then sorted within each partition by country code, and all the users with the same country code are moved to the same partition. Finally, the reducer function aggregates the average by country.

This is a very flexible paradigm that can be used to solve lots of different problems. It allows for computation across multiple nodes, and better fault tolerance, since each step is written to disk and jobs can be re-started.

Joins can also be done:

  • Sort-Merge join: The Map stage makes the join key the key for both datasets getting merged, and data the value. Data is partitioned by key so all the data of the same key (from both data stores) ends up on the same reduce partition. The Reduce task creates the joined records for the set of all values of the same key. It’s a flexible approach because no assumptions are made about the structure of the data beforehand, but it’s slow.
  • Broadcast Hash join: A small input is loaded into a hash table on each partition running the map step. The mapper scans over its partition of a large dataset and matches using the in-memory hash table. The reduce table just pulls together all the rows from the map stage (no computation is really done by the reduce). This also gets slow for medium + data sizes.
  • Partitioned Hash join: Two tables are partitioned in the same way (say, by user_id). The mapper scans over its partition of one dataset and matches using the in-memory hash table of the other. The reduce table just pulls together all the rows from the map stage (no computation is really done by the reduce). Much faster than a sort merge.

Limitations of MapReduce:

  • MapReduce needs all tasks to succeed in one job before starting the next (and because of the strict framework, MapReduce jobs often have many many steps – a join, a grouped aggregation, a sort would be 3 steps).
  • Mappers can often be redundant, just doing some simple sorting takes a whole step.
  • Temporary data gets stored across several nodes and this replication is overkill (in the name of fault tolerance).

Chapter 11: Stream Processing

In reality, a lot of data is unbounded because it arrives gradually over time: your users produced data yesterday and today, and they will continue to produce more data tomorrow. Unless you go out of business, this process never ends, and so the dataset is never “complete” in any meaningful way. Thus, batch processors must artificially divide the data into chunks of fixed duration: for example, processing a day’s worth of data at the end of every day, or processing an hour’s worth of data at the end of every hour.

The problem with daily batch processes is that changes in the input are only reflected in the output a day later, which is too slow for many impatient users. To reduce the delay, we can run the processing more frequently—say, processing a second’s worth of data at the end of every second—or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. That is the idea behind stream processing.

In the batch processing world, the inputs and outputs of a job are files (perhaps on a distributed filesystem). For stream processing, if the input is a file (a sequence of bytes), the first processing step is usually to parse it into a sequence of records. In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self- contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock

In a streaming system, related events are usually grouped together into a topic or stream.

A common approach for notifying consumers of topics about new events is to use a messaging system: a producer sends a message containing the event, which is then pushed to consumers.

A widely used approach is to send messages via a message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams. It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.

The chapter then goes into a bunch of detail on the different types of message brokers, fault tolerance in stream processing frameworks, the difficulties of reasoning about time in a stream processor, and joins with streams. I’m skipping all that here because I am lazy, but there are lots of notes here, here, and here.

Once you have a stream, you can process it. There are three main ways to do this:

  1. Write the data in the events to a database, cache, search index, etc. to be queried by other clients.
  2. Push events to users in some way, like via email alerts or push notifications.
  3. Process the input stream(s) to produce output stream(s).

Tags:

Updated: