NewSQL = NoSQL+ACID

NewSQL = NoSQL+ACID
Until recently in Odnoklassniki about 50 TB of data processed in real time was stored in SQL Server. For such a volume, it is almost impossible to provide fast and reliable, and even fault-tolerant data center access using SQL DBMS. Usually, in such cases, one of the NoSQL stores is used, but not everything can be transferred to NoSQL: some entities require ACID transaction guarantees.

This led us to the use of NewSQL storage, that is, a DBMS that provides fault tolerance, scalability and performance of NoSQL systems, but at the same time retains the ACID guarantees familiar to classical systems. There are few working industrial systems of this new class, so we implemented such a system ourselves and put it into commercial operation.

How it works and what happened - read under the cut.

Today, the monthly audience of Odnoklassniki is more than 70 million unique visitors. We enter the top five largest social networks in the world, and in the top twenty sites on which users spend the most time. The β€œOK” infrastructure handles very high loads: over a million HTTP requests/sec per front. Parts of the server park in the amount of more than 8000 pieces are located close to each other - in four Moscow data centers, which makes it possible to provide a network delay of less than 1 ms between them.

We have been using Cassandra since 2010, starting with version 0.6. Today, several dozen clusters are in operation. The fastest cluster processes over 4 million operations per second, while the largest one stores 260 TB.

However, these are all ordinary NoSQL clusters used to store weakly coordinated data. We also wanted to replace the main consistent storage, Microsoft SQL Server, which has been used since the founding of Odnoklassniki. The storage consisted of more than 300 SQL Server Standard Edition machines, which contained 50 TB of data - business entities. This data is modified as part of ACID transactions and requires high consistency.

To distribute data across SQL Server nodes, we used both vertical and horizontal partitioning (sharding). Historically, we used a simple data sharding scheme: each entity was associated with a token - a function of the entity ID. Entities with the same token were placed on the same SQL server. The master-detail relationship was implemented in such a way that the tokens of the master and child records always match and are located on the same server. In a social network, almost all records are generated on behalf of the user, which means that all user data within one functional subsystem is stored on one server. That is, a business transaction almost always involved tables of one SQL server, which made it possible to ensure data consistency using local ACID transactions, without the need to use slow and unreliable distributed ACID transactions.

Thanks to sharding and to speed up SQL:

  • We do not use Foreign key constraints, since when sharding, the entity ID can be located on another server.
  • We do not use stored procedures and triggers due to the additional load on the DBMS CPU.
  • We do not use JOINs because of all of the above and a lot of random reads from disk.
  • Outside of a transaction, we use the Read Uncommitted isolation level to reduce deadlocks.
  • We only execute short transactions (less than 100ms on average).
  • We do not use multi-row UPDATE and DELETE due to a large number of deadlocks - we update only one record at a time.
  • Queries are always executed only by indexes - a query with a full table scan plan means for us an overload of the database and its failure.

These steps allowed us to squeeze almost the maximum performance out of SQL servers. However, the problems became more and more. Let's take a look at them.

Problems with SQL

  • Since we used self-written sharding, adding new shards was done manually by administrators. All this time, scalable data replicas have not served requests.
  • As the number of records in the table grows, the speed of insertion and modification decreases, when adding indexes to an existing table, the speed drops by a multiple, the creation and re-creation of indexes takes a downtime.
  • Having a small amount of Windows for SQL Server in production makes infrastructure management difficult

But the main problem is

fault tolerance

Classic SQL Server has poor fault tolerance. Let's say you have only one database server and it fails every three years. At this time, the site is down for 20 minutes, this is acceptable. If you have 64 servers, then the site is down once every three weeks. And if you have 200 servers, then the site does not work every week. This is problem.

What can be done to improve the fault tolerance of the SQL server? Wikipedia invites us to build highly available cluster: where in case of failure of any of the components there is a backup.

This requires a fleet of expensive equipment: numerous duplication, fiber optics, shared storage, and the inclusion of the reserve does not work reliably: about 10% of the inclusions end in the failure of the backup node with a train behind the main node.

But the main disadvantage of such a highly available cluster is zero availability in case of failure of the data center in which it is located. Odnoklassniki has four data centers, and we need to ensure work in one of them in case of a complete failure.

For this one could use Multi Master replication built into SQL Server. This solution is much more expensive due to the cost of software and suffers from well-known replication problems - unpredictable transaction delays with synchronous replication and delays in applying replication (and, as a result, lost modifications) with asynchronous. implied manual conflict resolution makes this option completely inapplicable for us.

All these problems required a cardinal solution and we proceeded to their detailed analysis. Here we need to get acquainted with what SQL Server basically does - transactions.

Simple transaction

Consider the simplest transaction from the point of view of an applied SQL programmer: adding a photo to an album. Albums and photos are stored in different plates. The album has a public photo counter. Then such a transaction is divided into the following steps:

  1. We block the album by key.
  2. Create an entry in the photo table.
  3. If the photo has a public status, then we wind up the counter of public photos in the album, update the record and commit the transaction.

Or in pseudocode:

TX.start("Albums", id);
Album album = albums.lock(id);
Photo photo = photos.create(…);

if (photo.status == PUBLIC ) {
    album.incPublicPhotosCount();
}
album.update();

TX.commit();

We see that the most common scenario for a business transaction is to read data from the database into the memory of the application server, change something and save the new values ​​back to the database. Usually in such a transaction we update several entities, several tables.

When a transaction is executed, a concurrent modification of the same data from another system may occur. For example, Antispam may decide that the user is somehow suspicious and therefore all photos of the user should no longer be public, they need to be sent for moderation, which means changing photo.status to some other value and turning off the corresponding counters. Obviously, if this operation will occur without guarantees of atomicity of application and isolation of competing modifications, as in ACID, then the result will not be what you need - either the photo counter will show the wrong value, or not all photos will be sent for moderation.

A lot of such code that manipulates various business entities within a single transaction has been written throughout the existence of Odnoklassniki. According to the experience of migrations to NoSQL with Event Consistency we know that the biggest challenge (and time consuming) is the need to develop code to maintain data consistency. Therefore, we considered the main requirement for the new storage to be the provision for the application logic of real ACID transactions.

Other equally important requirements were:

  • In the event of a data center failure, both reading and writing to the new storage must be available.
  • Maintaining the current speed of development. That is, when working with a new repository, the amount of code should be approximately the same, there should be no need to add something to the repository, develop algorithms for resolving conflicts, maintaining secondary indexes, etc.
  • The speed of the new storage should be fast enough, both when reading data and when processing transactions, which effectively meant that academically rigorous, general-purpose, but slow solutions, such as two-phase commits.
  • Automatic scaling on the fly.
  • Using ordinary cheap servers, without the need to buy exotic pieces of iron.
  • Possibility of storage development by the company's developers. In other words, priority was given to proprietary or open source solutions, preferably in Java.

Decisions, decisions

Analyzing possible solutions, we came up with two possible architecture choices:

The first is to take any SQL server and implement the required fault tolerance, scaling mechanism, failover clustering, conflict resolution, and distributed, reliable, and fast ACID transactions. We assessed this option as highly non-trivial and time-consuming.

The second option is to take a ready-made NoSQL storage with implemented scaling, failover clustering, conflict resolution and implement transactions and SQL yourself. At first glance, even the task of implementing SQL, not to mention ACID transactions, looks like a task for years. But then we realized that the set of SQL features that we use in practice is as far from ANSI SQL as Cassandra CQL far from ANSI SQL. Taking a closer look at CQL, we realized that it is close enough to what we need.

Cassandra and CQL

So, what is interesting about Cassandra, what features does it have?

Firstly, here you can create tables with support for various data types, you can do SELECT or UPDATE by primary key.

CREATE TABLE photos (id bigint KEY, owner bigint,…);
SELECT * FROM photos WHERE id=?;
UPDATE photos SET … WHERE id=?;

To ensure replica data consistency, Cassandra uses quorum approach. In the simplest case, this means that when placing three replicas of the same row on different nodes of the cluster, the write is considered successful if the majority of the nodes (i.e. two out of three) confirmed the success of this write operation. The data of a series is considered consistent if, when reading, most of the nodes were polled and confirmed them. Thus, if there are three replicas, full and instant data consistency is guaranteed if one node fails. This approach allowed us to implement an even more reliable scheme: always send requests to all three replicas, waiting for a response from the two fastest ones. The late response of the third replica is then discarded. At the same time, a node that is late with a response can have serious problems - brakes, garbage collection in the JVM, direct memory reclaim in the linux kernel, hardware failure, disconnection from the network. However, client operations and data are not affected in any way.

The approach when we access three nodes and receive a response from two is called speculation: a request for extra replicas is sent before it "falls off".

Another benefit of Cassandra is Batchlog, a mechanism that ensures that changes you make are either fully applied or not completely applied to the package. This allows us to solve A in ACID - atomicity out of the box.

The closest thing to transactions in Cassandra is the so-called "lightweight transactions". But they are far from β€œreal” ACID transactions: in fact, this is an opportunity to make CAS on the data of only one record, using the Paxos heavyweight protocol consensus. Therefore, the speed of such transactions is low.

What we missed in Cassandra

So, we had to implement real ACID transactions in Cassandra. With the help of which we could easily implement two other convenient features of the classic DBMS: consistent fast indexes, which would allow us to select data not only by the primary key, and the usual generator of monotonous auto-increment IDs.

C*One

So the new DBMS was born C*One, consisting of three types of server nodes:

  • Storages are (almost) standard Cassandra servers responsible for storing data on local drives. As the load and volume of data grows, their number can be easily scaled up to tens and hundreds.
  • Transaction coordinators - ensure the execution of transactions.
  • Clients are application servers that implement business operations and initiate transactions. There may be thousands of such clients.

NewSQL = NoSQL+ACID

Servers of all types are in a common cluster, use Cassandra's internal message protocol to communicate with each other and gossip for the exchange of cluster information. With the help of Heartbeat, servers learn about mutual failures, maintain a single data scheme - tables, their structure and replication; partitioning scheme, cluster topology, etc.

Client

NewSQL = NoSQL+ACID

Instead of standard drivers, Fat Client mode is used. Such a node does not store data, but can act as a request execution coordinator, that is, the Client itself acts as a coordinator of its requests: it polls storage replicas and resolves conflicts. This is not only more reliable and faster than the standard driver, which requires communication with a remote coordinator, but also allows you to control the transmission of requests. Outside of a transaction open on the client, requests are sent to storages. If the client opened a transaction, then all requests within the transaction are sent to the transaction coordinator.
NewSQL = NoSQL+ACID

C*One Transaction Coordinator

The coordinator is what we implemented for C*One from scratch. It is responsible for managing transactions, locks, and the order in which transactions are applied.

For each serviced transaction, the coordinator generates a timestamp: each subsequent one is greater than that of the previous transaction. Since the conflict resolution system in Cassandra is based on timestamps (of two conflicting records, the latest timestamp is considered relevant), the conflict will always be resolved in favor of the subsequent transaction. Thus we have implemented lamport watch is a cheap way to resolve conflicts in a distributed system.

Locks

To ensure isolation, we decided to use the easiest way - pessimistic locks on the primary key of the record. In other words, in a transaction, a record must first be locked, only then read, modified, and saved. Only after a successful commit can a record be unlocked so that competing transactions can use it.

Implementing such a lock is simple in a non-distributed environment. In a distributed system, there are two main ways: either implement distributed locking on a cluster, or distribute transactions so that transactions involving the same record are always serviced by the same coordinator.

Since in our case the data is already distributed among groups of local transactions in SQL, it was decided to assign groups of local transactions to the coordinators: one coordinator performs all transactions with a token from 0 to 9, the second - with a token from 10 to 19, and so on. As a result, each of the instances of the coordinator becomes the master of the transaction group.

Then locks can be implemented as a banal HashMap in the memory of the coordinator.

Refusals of coordinators

Since one coordinator exclusively serves a group of transactions, it is very important to quickly determine the fact of its failure so that the repeated attempt to execute the transaction is within the timeout. To make this fast and reliable, we used a fully meshed quorum hearbeat protocol:

Each data center hosts at least two coordinator nodes. Periodically, each coordinator sends a heartbeat message to the other coordinators and informs them about its functioning, as well as about the last time it received heartbeat messages from which coordinators in the cluster.

NewSQL = NoSQL+ACID

Receiving similar information from the rest as part of their heartbeat messages, each coordinator decides for himself which nodes of the cluster are functioning and which are not, guided by the quorum principle: if node X received information from the majority of nodes in the cluster about the normal receipt of messages from node Y, then , Y works. Conversely, as soon as the majority reports missing messages from node Y, then Y has failed. Curiously, if the quorum tells node X that it is not receiving any more messages from it, then node X itself will consider itself to have failed.

Heartbeat messages are sent at a high frequency, about 20 times per second, with a period of 50 ms. In Java, it is difficult to guarantee an application response within 50ms due to comparable pause times caused by the garbage collector. We were able to achieve this response time using the G1 garbage collector, which allows you to specify a target for the duration of GC pauses. However, sometimes, quite rarely, the collector pauses go beyond 50 ms, which can lead to a false failure detection. To avoid this, the coordinator does not report the failure of the remote node when the first heartbeat message from it is lost, only if several in a row are missing. So we managed to detect the failure of the coordinator node in 200 ms.

But it is not enough to quickly understand which node has ceased to function. Something needs to be done about it.

Reservation

The classical scheme assumes that in the event of a failure of the master to launch the election of a new one using one of the fashionable universal algorithms. However, such algorithms have well-known problems with convergence in time and the duration of the election process itself. We managed to avoid such additional delays using the coordinator replacement scheme in a fully connected network:

NewSQL = NoSQL+ACID

Let's say we want to execute a transaction in group 50. Let's define a replacement scheme in advance, that is, which nodes will execute group 50 transactions in case of failure of the main coordinator. Our goal is to keep the system up and running in the event of a data center failure. Let's define that the first reserve will be a node from another data center, and the second reserve will be a node from the third. This scheme is selected once and does not change until the topology of the cluster changes, that is, until new nodes enter it (which happens very rarely). The order of choosing a new active master in case of failure of the old one will always be as follows: the first reserve will become the active master, and if it has ceased to function, the second reserve will become.

Such a scheme is more reliable than a universal algorithm, since to activate a new master, it is enough to determine the fact of the failure of the old one.

But how will customers understand which of the masters is currently working? It is impossible to send information to thousands of clients in 50 ms. It is possible that a client sends a request to open a transaction, not yet knowing that this master is no longer functioning, and the request will hang on a timeout. To prevent this from happening, clients speculatively send a request to open a transaction to the master of the group and both of its reserves at once, but only the one who is the active master at the moment will respond to this request. All subsequent communication within the transaction will be performed by the client only with the active master.

The standby masters place the received requests for transactions that are not their own into the queue of unborn transactions, where they are stored for some time. If the active master dies, the new master processes requests to open transactions from its queue and responds to the client. If the client has already managed to open a transaction with the old master, then the second response is ignored (and, obviously, such a transaction will not complete and will be repeated by the client).

How a transaction works

Suppose the client sent a request to the coordinator to open a transaction for such and such an entity with such and such a primary key. The coordinator locks this entity and places it in the lock table in memory. If necessary, the coordinator reads this entity from the store and saves the received data into a transactional state in the coordinator's memory.

NewSQL = NoSQL+ACID

When a client wants to change the data in a transaction, it sends a request to the coordinator to modify the entity, and the coordinator places the new data in the transaction state table in memory. This completes the recording - the storage is not written to.

NewSQL = NoSQL+ACID

When a client requests its own modified data as part of an active transaction, the coordinator acts as follows:

  • if the ID is already in the transaction, then the data is taken from memory;
  • if there is no ID in memory, then the missing data is read from the storage nodes, combined with those already in memory, and the result is returned to the client.

Thus, the client can read its own changes, and other clients do not see these changes, because they are stored only in the memory of the coordinator, they are not yet in the Cassandra nodes.

NewSQL = NoSQL+ACID

When the client sends a commit, the state that the service had in memory is stored by the coordinator in a logged batch, and sent to the Cassandra stores as a logged batch. The repositories do everything necessary for this package to be atomically (fully) applied, and return a response to the coordinator, who releases the locks and confirms the success of the transaction to the client.

NewSQL = NoSQL+ACID

And for rollback, the coordinator only needs to free the memory occupied by the state of the transaction.

As a result of the improvements described above, we have implemented the principles of ACID:

  • Atomicity. This is a guarantee that no transaction will be partially fixed in the system, either all of its sub-operations will be executed, or none of them will be executed. In our case, this principle is observed due to the logged batch in Cassandra.
  • Consistency. Each successful transaction, by definition, commits only valid results. If, after opening a transaction and performing some of the operations, it is found that the result is invalid, a rollback is performed.
  • Isolation. When a transaction is executed, parallel transactions should not affect its result. Concurrent transactions are isolated with pessimistic locks on the coordinator. For readings outside of a transaction, the principle of isolation at the Read Committed level is respected.
  • Stability. Regardless of the problems at the lower levels - a system blackout, a hardware failure - the changes made by a successfully completed transaction should remain saved after the resumption of functioning.

Reading by indexes

Let's take a simple table:

CREATE TABLE photos (
id bigint primary key,
owner bigint,
modified timestamp,
…)

It has an ID (primary key), an owner, and a modification date. You need to make a very simple request - select data on the owner with the date of change "for the last day".

SELECT *
WHERE owner=?
AND modified>?

In order to process such a query quickly, in a classic SQL DBMS, you need to build an index on the columns (owner, modified). We can do this quite simply, since we now have ACID guarantees!

Indexes in C*One

There is an initial table with photos in which record ID is a primary key.

NewSQL = NoSQL+ACID

For an index, C*One creates a new table that is a copy of the original table. The key is the same as the index expression, but it also includes the primary key of the record from the source table:

NewSQL = NoSQL+ACID

Now the query for "owner in the last XNUMX hours" can be rewritten as a select from another table:

SELECT * FROM i1_test
WHERE owner=?
AND modified>?

Data consistency between the source table photos and index i1 is maintained automatically by the coordinator. Based on the data schema alone, when a change is received, the coordinator generates and remembers the change not only of the main table, but also the changes of the copies. No additional actions are performed with the index table, logs are not read, locks are not used. That is, adding indexes almost does not consume resources and practically does not affect the speed of applying modifications.

With the help of ACID, we managed to implement indexes "like in SQL". They are consistent, scalable, fast, composable, and built into the CQL query language. Index support does not require any changes to the application code. Everything is simple, as in SQL. And most importantly, indexes do not affect the speed of execution of modifications to the original transaction table.

What happened

We developed C*One three years ago and put it into commercial operation.

What did we end up with? Let's evaluate this on the example of a subsystem for processing and storing photos, one of the most important types of data in a social network. This is not about the bodies of the photographs themselves, but about all kinds of meta-information. Now in Odnoklassniki there are about 20 billion such records, the system processes 80 thousand read requests per second, up to 8 thousand ACID transactions per second related to data modification.

When we used SQL with replication factor = 1 (but in RAID 10), the photo meta-information was stored on a highly available cluster of 32 Microsoft SQL Server machines (plus 11 spares). Also, 10 servers were allocated for storing backups. Total 50 expensive cars. At the same time, the system worked at rated load, without a margin.

After migrating to a new system, we got replication factor = 3 - a copy in each data center. The system consists of 63 Cassandra storage nodes and 6 coordinator machines, for a total of 69 servers. But these machines are much cheaper, totaling about 30% of the cost of a SQL system. In this case, the load is kept at the level of 30%.

With the introduction of C*One, latency also decreased: in SQL, a write operation took about 4,5 ms. In C * One - about 1,6 ms. The duration of a transaction is on average less than 40 ms, the commit is completed in 2 ms, the duration of reading and writing is 2 ms on average. The 99th percentile is only 3-3,1 ms, the number of timeouts has decreased by 100 times - all due to the widespread use of speculation.

To date, most of the SQL Server nodes have been decommissioned, new products are developed only using C * One. We adapted C*One to work in our cloud one-cloud, which made it possible to accelerate the deployment of new clusters, simplify configuration and automate operation. Without the source code, this would be much more difficult and hackneyed.

Now we are working on transferring our other storages to the cloud - but that's a completely different story.

Source: habr.com

Add a comment