Idempotent Updates with Cassandra
One of the things we don’t get with NoSQL databases is ACID transactions. We get Durability, which means data is persistently stored, but we don’t get:
- Atomicity: If we update multiple records in a single transaction, there’s no guarantee that all or none of them will be applied when a failure occurs.
- Consistency: NoSQL databases don’t use locking or data integrity rules to guarantee that every transaction moves the database from one consistent state to another.
- Isolation: There’s no guarantee that the records updated on one transaction won’t be read–or even updated–by another transaction before the first transaction is complete.
Some NoSQL databases offer some A, C, or I features in a limited context such as for “single partition” updates. We could emulate ACID features by supplementing the database with a synchronization framework such as ZooKeeper, but this reduces performance and scalability. If we’re using a NoSQL database, it’s no doubt largely because we want its high throughput. So, we have to design our updates to be compatible with its eventually consistent model.
With Cassandra, the closest we get to a transaction is a “batch mutate”, which allows us to submit multiple updates in one request. But while those updates are being applied, they can be immediately accessed by other applications. If our application (or Cassandra) crashes, there’s no guarantee about which updates in the batch will succeed. Without ACID transactions, how do we prevent conflicts between applications? How do we recover if a crash occurs?
One strategy is to design updates to be idempotent, which means that applying the same update twice produces the same results as applying the update once. In other words, if an update is successfully processed once, applying the same update again is essentially a no-op. As we’ll see in a moment, this strategy not only simplifies restarts after a crash, it can prevent conflicts between multiple updaters. But given our long dependency on ACID transactions, it might not be obvious how to do this.
To start, we must shed some baggage from SQL programming. First, consider that in Cassandra there is no distinction between inserting a new row and modifying an existing row; we merely update a row. Second, each row can have an arbitrary number of columns, and we can add new columns to an existing row at any time. Here’s an example: for a specific table (which Cassandra calls a ColumnFamily), a typical update looks something like this:
“Update the row with key k with columns: C1=V1, C2=V2, …”
Cassandra interprets this update as follows:
- If no row exists in the specified table with the key k, a new row is created with the specified column names and values.
- If a row already exists with key k, its columns are updated as follows:
o If a given column Cn does not exist, it is added to the row with value Vn.
o If column Cn already exists, its value is replaced value Vn.
So, the first time we say “update the row with key k…”, the row is created. But if we process the same update again, since the row already exists and no columns are actually changed, nothing happens. (What happens under the hood is not so simple, but this is effectively what an application sees.) Batch mutates can also delete columns and remove rows and, as we’d expect, deleting a column or row that is already deleted has no effect.
If every batch merely created a single row, it’s pretty obvious that if our application (or Cassandra itself) crashed, we could just figure out where we left off and reprocess the most recent batches. But of course, not all batches are this simple. What happens, for example, if batches from separate applications need to update the same row?
Let’s take a simple example. Suppose we have a database that captures log records and maintains an index of values found within specific log fields. Say we want to easily find all log records with a given value for “User” or “Request” fields. A reasonable approach is to create one table to store log records and a second table as a field-specific index into the log records. For example:
|
Key (timestamp)
|
Columns
|
||
|
1265040242124
|
User=Bob
|
Request=1
|
…
|
|
1265040242678
|
User=Mary
|
Request=1
|
…
|
|
1265043781000
|
User=Bob
|
Request=5
|
…
|
|
Key (field)
|
Columns
|
||
|
User
|
Bob=[1265040242124,1265043781000]
|
Mary=[1265040242678]
|
…
|
|
Request
|
1=[1265040242124,1265040242678]
|
5=[1265043781000]
|
…
|
In this approach, each log record is stored as a row in LogRecords using its timestamp (in milliseconds) as the key and its fields stored as individual columns. For each field we want to index, we create or update a row within Fields whose key is the indexed field name. For each value found for that field, we create a column whose name is the field value and whose value is a list of log record keys that contained that value (perhaps stored as a long[]). Hence, to find all the log records where “Bob” is mentioned in the “User” field, we fetch the Fields row with key=User, find the column whose name=Bob, and the column value leads us to the appropriate log records. Likewise, we can quickly find all log records where the Request value is “1”. (This approach is a little naïve for a scalable application, but it will help make our point.)
Although this approach basically works, it is problematic when we consider how we might update these tables. In a concurrent environment, suppose two instances simultaneously attempt to update the Fields row with key=User and both attempt to set the column whose name is “Bob”. Even if they first attempted to read the column named “Bob” to see if it already exists, they may both believe they are the first. One will set “Bob” to one value; the other will set “Bob” to another value. When such conflicts occur, because there is no locking, Cassandra will effectively set the “Bob” column to the last update, thereby wiping out the first update. If the updates arrive in the reverse order, we get a different result.
Notice that multiple applications can add columns to the same row without conflicting–they only conflict if they try to update a column with the same name. Hence, we can resolve the conflict by “moving” the column name in the Fields table to the row key, creating a compound key:
|
Key (Field)
|
Columns
|
|
|
User/Bob
|
1265040242124=null
|
1265043781000=null
|
|
User/Mary
|
1265040242678=null
|
|
|
Request/1
|
1265040242124=null
|
1265040242678=null
|
|
Request/5
|
1265043781000=null
|
|
There are more complex scenarios in which multiple applications may need to update the same row in the same “transaction”. One example is statistical aggregates, such as maintaining a sum or average of some data value. Even within the confines of the eventually consistent model used by NoSQL databases such as Cassandra, there are ways to accommodate these needs. The key is to use an idempotent update strategy that prevents update conflicts and facilitates application restarts.