The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team

This is not even a joke, it seems that this particular picture most accurately reflects the essence of these databases, and in the end it will be clear why:

The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team

According to DB-Engines Ranking, the two most popular NoSQL column bases are Cassandra (CS) and HBase (HB).

The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team

By the will of fate, our data upload management team at Sberbank has already long and works closely with HB. During this time, we have studied its strengths and weaknesses quite well and learned how to cook it. However, the presence of an alternative in the form of CS all the time made us doubt ourselves a little: did we make the right choice? Moreover, the results comparisons, performed by DataStax, it was said that CS easily defeats HB with almost a crushing score. On the other hand, DataStax is a stakeholder, and you should not take a word for it. We were also embarrassed by the rather small amount of information about the testing conditions, so we decided to find out on our own who is the king of BigData NoSql, and the results were very interesting.

However, before moving on to the results of the tests performed, it is necessary to describe the significant aspects of the environment configurations. The matter is that CS can be used in the mode allowing data loss. Those. this is when only one server (node) is responsible for the data of a certain key, and if it falls off for some reason, then the value of this key will be lost. For many tasks, this is not critical, but for the banking sector, this is the exception rather than the rule. In our case, it is important to have several copies of data for reliable storage.

Therefore, only the CS operation mode in the triple replication mode was considered, i.e. the creation of a casespace was performed with the following parameters:

CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3};

Further, there are two ways to provide the necessary level of consistency. General rule:
NW + NR > RF

Which means that the number of acknowledgments from nodes on write (NW) plus the number of acknowledgments from nodes on read (NR) must be greater than the replication factor. In our case, RF = 3, which means that the following options are suitable:
2 + 2> 3
3 + 1> 3

Since it is fundamentally important for us to store data as reliably as possible, the 3 + 1 scheme was chosen. In addition, HB works on a similar principle, i.e. such a comparison would be more fair.

It should be noted that DataStax in their study did the opposite, they set RF = 1 for both CS and HB (for the latter by changing the HDFS settings). This is a really important aspect because the performance impact of CS is huge in this case. For example, the picture below shows the increase in the time required to load data into CS:

The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team

Here we see the following, the more competing threads write data, the longer it takes. This is natural, but it is important that the degradation of performance for RF=3 is much higher. In other words, if we write to 4 tables with 5 threads each (total 20), then RF=3 loses by about 2 times (150 seconds for RF=3 versus 75 for RF=1). But if we increase the load by loading data into 8 tables into 5 threads each (total 40), then the loss of RF=3 is already 2,7 times (375 seconds versus 138).

Perhaps this is partly the secret of DataStax's successful load testing for CS, because changing the replication factor from 2 to 3 had no effect on HB at our booth. Those. disks are not a bottleneck for HB for our configuration. However, there are many other pitfalls here, because it should be noted that our version of HB has been slightly patched and tuned, the environments are completely different, etc. It's also worth noting that maybe I just don't know how to properly prepare CS and there are some more efficient ways to work with it, and I hope we'll figure it out in the comments. But first things first.

All tests were performed on an iron cluster consisting of 4 servers, each in the configuration:

CPU: Xeon E5-2680 v4 @ 2.40GHz 64 threads.
Drives: 12 pieces SATA HDD
java version: 1.8.0_111

CS version: 3.11.5

cassandra.yml parametersnum_tokens: 256
hinted_handoff_enabled: true
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
hints_directory: /data10/cassandra/hints
hints_flush_period_in_ms: 10000
max_hints_file_size_in_mb: 128
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
role_manager: CassandraRoleManager
roles_validity_in_ms: 2000
permissions_validity_in_ms: 2000
credentials_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- /data1/cassandra/data # each dataN directory is a separate disk
- /data2/cassandra/data
- /data3/cassandra/data
- /data4/cassandra/data
- /data5/cassandra/data
- /data6/cassandra/data
- /data7/cassandra/data
- /data8/cassandra/data
commitlog_directory: /data9/cassandra/commitlog
cdc_enabled: false
disk_failure_policy: stop
commit_failure_policy: stop
prepared_statements_cache_size_mb:
thrift_prepared_statements_cache_size_mb:
key_cache_size_in_mb:
key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data10/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
commitlog_segment_size_in_mb: 32
seed_provider:
class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
seeds: "*,*"
concurrent_reads: 256 # tried 64 - no difference
concurrent_writes: 256 # tried 64 - no difference
concurrent_counter_writes: 256 # tried 64 - no difference seen
concurrent_materialized_view_writes: 32
memtable_heap_space_in_mb: 2048 # tried 16 GB - it was slower
memtable_allocation_type: heap_buffers
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: *
broadcast_address: *
listen_on_broadcast_address: true
internode_authenticator: org.apache.cassandra.auth.AllowAllInternodeAuthenticator
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: *
rpc_port: 9160
rpc_keepalive: true
rpc_server_type:sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
column_index_size_in_kb: 64
column_index_cache_size_in_kb: 2
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 1600
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 100000
range_request_timeout_in_ms: 200000
write_request_timeout_in_ms: 40000
counter_write_request_timeout_in_ms: 100000
cas_contention_timeout_in_ms: 20000
truncate_request_timeout_in_ms: 60000
request_timeout_in_ms: 200000
slow_query_log_timeout_in_ms: 500
cross_node_timeout: false
endpoint_snitch: GossipingPropertyFileSnitch
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
server_encryption_options:
internode_encryption: none
client_encryption_options:
enabled: false
internode_compression: dc
inter_dc_tcp_nodelay: false
tracetype_query_ttl: 86400
tracetype_repair_ttl: 604800
enable_user_defined_functions: false
enable_scripted_user_defined_functions: false
windows_timer_interval: 1
transparent_data_encryption_options:
enabled: false
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
batch_size_warn_threshold_in_kb: 200
batch_size_fail_threshold_in_kb: 250
unlogged_batch_across_partitions_warn_threshold: 10
compaction_large_partition_warning_threshold_mb: 100
gc_warn_threshold_in_ms: 1000
back_pressure_enabled: false
enable_materialized_views: true
enable_sasi_indexes: true

GC settings:

### CMS Settings-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:SurvivorRatio=8
-XX:MaxTenuringThreshold=1
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSWaitDuration=10000
-XX:+CMSParallelInitialMarkEnabled
-XX:+CMSEdenChunksRecordAlways
-XX:+CMSClassUnloadingEnabled

16Gb was allocated to jvm.options memory (we also tried 32 Gb, no difference was noticed).

Tables were created with the command:

CREATE TABLE ks.t1 (id bigint PRIMARY KEY, title text) WITH compression = {'sstable_compression': 'LZ4Compressor', 'chunk_length_kb': 64};

HB version: 1.2.0-cdh5.14.2 (in the org.apache.hadoop.hbase.regionserver.HRegion class, we excluded MetricsRegion, which led to GC when the number of regions was more than 1000 on RegionServer)

Non-default HBase Optionszookeeper.session.timeout: 120000
hbase.rpc.timeout: 2 minute(s)
hbase.client.scanner.timeout.period: 2 minute(s)
hbase.master.handler.count: 10
hbase.regionserver.lease.period, hbase.client.scanner.timeout.period: 2 minute(s)
hbase.regionserver.handler.count: 160
hbase.regionserver.metahandler.count: 30
hbase.regionserver.logroll.period: 4 hour(s)
hbase.regionserver.maxlogs: 200
hbase.hregion.memstore.flush.size: 1 GiB
hbase.hregion.memstore.block.multiplier: 6
hbase.hstore.compactionThreshold: 5
hbase.hstore.blockingStoreFiles: 200
hbase.hregion.majorcompaction: 1 day(s)
HBase Service Advanced Configuration Snippet (Safety Valve) for hbase-site.xml:
hbase.regionserver.wal.codecorg.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec
hbase.master.namespace.init.timeout3600000
hbase.regionserver.optionalcacheflushinterval18000000
hbase.regionserver.thread.compaction.large12
hbase.regionserver.wal.enablecompressiontrue
hbase.hstore.compaction.max.size1073741824
hbase.server.compactchecker.interval.multiplier200
Java Configuration Options for HBase RegionServer:
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:ReservedCodeCacheSize=256m
hbase.snapshot.master.timeoutMillis: 2 minute(s)
hbase.snapshot.region.timeout: 2 minute(s)
hbase.snapshot.master.timeout.millis: 2 minute(s)
HBase REST Server Max Log Size: 100 MiB
HBase REST Server Maximum Log File Backups: 5
HBase Thrift Server Max Log Size: 100 MiB
HBase Thrift Server Maximum Log File Backups: 5
Master Max Log Size: 100 MiB
Master Maximum Log File Backups: 5
RegionServer Max Log Size: 100 MiB
RegionServer Maximum Log File Backups: 5
HBase Active Master Detection Window: 4 minute(s)
dfs.client.hedged.read.threadpool.size: 40
dfs.client.hedged.read.threshold.millis: 10 millisecond(s)
hbase.rest.threads.min: 8
hbase.rest.threads.max: 150
Maximum Process File Descriptors: 180000
hbase.thrift.minWorkerThreads: 200
hbase.master.executor.openregion.threads: 30
hbase.master.executor.closeregion.threads: 30
hbase.master.executor.serverops.threads: 60
hbase.regionserver.thread.compaction.small: 6
hbase.ipc.server.read.threadpool.size: 20
Region Mover Threads: 6
Client Java Heap Size in Bytes: 1 GiB
HBase REST Server Default Group: 3 GiB
HBase Thrift Server Default Group: 3 GiB
Java Heap Size of HBase Master in Bytes: 16 GiB
Java Heap Size of HBase RegionServer in Bytes: 32 GiB

+ZooKeeper
maxClientCnxns: 601
maxSessionTimeout: 120000
Table creation:
hbase org.apache.hadoop.hbase.util.RegionSplitter ns:t1 UniformSplit -c 64 -f cf
alter 'ns:t1', {NAME => 'cf', DATA_BLOCK_ENCODING => 'FAST_DIFF', COMPRESSION => 'GZ'}

There is one important point here - the DataStax description does not say how many regions were used when creating HB tables, although this is critical for large volumes. Therefore, for the tests, the number = 64 was chosen, which allows you to store up to 640 GB, i.e. medium size table.

At the time of the test, HBase had 22 thousand tables and 67 thousand regions (this would have been fatal for version 1.2.0, if not for the patch mentioned above).

Now for the code. Since it was not clear which configurations are more advantageous for a particular database, the tests were performed in various combinations. Those. in some tests, loading went simultaneously to 4 tables (all 4 nodes were used to connect). In other tests, we worked with 8 different tables. In some cases, the batch size was 100, in others 200 (batch parameter - see code below). The data size for value is 10 bytes or 100 bytes (dataSize). In total, 5 million records were written and read each time in each table. At the same time, 5 threads were written/read to each table (thread number is thNum), each of which used its own range of keys (count = 1 million):

if (opType.equals("insert")) {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        StringBuilder sb = new StringBuilder("BEGIN BATCH ");
        for (int i = 0; i < batch; i++) {
            String value = RandomStringUtils.random(dataSize, true, true);
            sb.append("INSERT INTO ")
                    .append(tableName)
                    .append("(id, title) ")
                    .append("VALUES (")
                    .append(key)
                    .append(", '")
                    .append(value)
                    .append("');");
            key++;
        }
        sb.append("APPLY BATCH;");
        final String query = sb.toString();
        session.execute(query);
    }
} else {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        StringBuilder sb = new StringBuilder("SELECT * FROM ").append(tableName).append(" WHERE id IN (");
        for (int i = 0; i < batch; i++) {
            sb = sb.append(key);
            if (i+1 < batch)
                sb.append(",");
            key++;
        }
        sb = sb.append(");");
        final String query = sb.toString();
        ResultSet rs = session.execute(query);
    }
}

Accordingly, similar functionality was provided for HB:

Configuration conf = getConf();
HTable table = new HTable(conf, keyspace + ":" + tableName);
table.setAutoFlush(false, false);
List<Get> lGet = new ArrayList<>();
List<Put> lPut = new ArrayList<>();
byte[] cf = Bytes.toBytes("cf");
byte[] qf = Bytes.toBytes("value");
if (opType.equals("insert")) {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        lPut.clear();
        for (int i = 0; i < batch; i++) {
            Put p = new Put(makeHbaseRowKey(key));
            String value = RandomStringUtils.random(dataSize, true, true);
            p.addColumn(cf, qf, value.getBytes());
            lPut.add(p);
            key++;
        }
        table.put(lPut);
        table.flushCommits();
    }
} else {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        lGet.clear();
        for (int i = 0; i < batch; i++) {
            Get g = new Get(makeHbaseRowKey(key));
            lGet.add(g);
            key++;
        }
        Result[] rs = table.get(lGet);
    }
}

Since in HB the client should take care of the uniform distribution of data, the key salting function looked like this:

public static byte[] makeHbaseRowKey(long key) {
    byte[] nonSaltedRowKey = Bytes.toBytes(key);
    CRC32 crc32 = new CRC32();
    crc32.update(nonSaltedRowKey);
    long crc32Value = crc32.getValue();
    byte[] salt = Arrays.copyOfRange(Bytes.toBytes(crc32Value), 5, 7);
    return ArrayUtils.addAll(salt, nonSaltedRowKey);
}

Now the most interesting - the results:

The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team

The same as a graph:

The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team

The advantage of HB is so amazing that there is a suspicion that there is some kind of bottleneck in the CS setup. However, googling and twisting the most obvious parameters (like concurrent_writes or memtable_heap_space_in_mb) did not give any acceleration. At the same time, it is clean in the logs, it does not swear at anything.

The data fell evenly across the nodes, the statistics from all nodes are approximately the same.

This is how the statistics for the table with one of the nodes looks likeKeyspace: ks
Read Count: 9383707
Read Latency: 0.04287025042448576 ms
Write Count: 15462012
Write Latency: 0.1350068438699957ms
Pending Flushes: 0
Table: t1
SSTable count: 16
Space used (live): 148.59 MiB
Space used (total): 148.59 MiB
Space used by snapshots (total): 0 bytes
Off heap memory used (total): 5.17 MiB
SSTable Compression Ratio: 0.5720989576459437
Number of partitions (estimate): 3970323
Memtable cell count: 0
Memtable data size: 0 bytes
Memtable off heap memory used: 0 bytes
Memtable switch count: 5
Local read count: 2346045
Local read latency: NaN ms
Local write count: 3865503
Local write latency: NaN ms
Pending flushes: 0
Percent repaired: 0.0
Bloom filter false positives: 25
Bloom filter false ratio: 0.00000
Bloom filter space used: 4.57 MiB
Bloom filter off heap memory used: 4.57 MiB
Index summary off heap memory used: 590.02 KiB
Compression metadata off heap memory used: 19.45 KiB
Compacted partition minimum bytes: 36
Compacted partition maximum bytes: 42
Compacted partition mean bytes: 42
Average live cells per slice (last five minutes): NaN
Maximum live cells per slice (last five minutes): 0
Average tombstones per slice (last five minutes): NaN
Maximum tombstones per slice (last five minutes): 0
Dropped Mutations: 0 bytes

An attempt to reduce the size of the batch (up to sending it by the piece) had no effect, it only got worse. It is possible that in fact this is really the maximum performance for CS, since the results obtained by CS are similar to those obtained by DataStax - about a hundred thousand operations per second. In addition, if we look at resource utilization, we can see that CS uses much more CPU and disks:

The battle of two yakozuna, or Cassandra vs HBase. Experience of the Sberbank team
The figure shows the utilization during the run of all tests in a row for both databases.

As for the powerful advantage of HB when reading. It can be seen here that for both databases, disk utilization during reading is extremely low (read tests are the final part of the testing cycle for each database, for example, for CS it is from 15:20 to 15:40). In the case of HB, the reason is clear - most of the data hangs in memory, in the memstore, and some is cached in the blockcache. As for CS, it’s not very clear how it works, but disk utilization is also not visible, but just in case, an attempt was made to enable the row_cache_size_in_mb = 2048 cache and set caching = {'keys': 'ALL', 'rows_per_partition': ' 2000000'}, but that made it even a little worse.

It is also worth reiterating an important point about the number of regions in HB. In our case, the value 64 was indicated. If we decrease it and make it equal to, for example, 4, then the reading speed drops by 2 times. The reason is that the memstore will fill up faster and the files will be flashed more often and more files will need to be processed when reading, which is a rather complicated operation for HB. In real conditions, this is treated by thinking over the strategy of presplitting and compactification, in particular, we use a self-written utility that collects garbage and compresses HFiles constantly in the background. It is possible that for the DataStax tests they allocated 1 region per table at all (which is not correct) and this would somewhat clarify why HB lost so much in their reading tests.

The preliminary conclusions from this are as follows. Assuming that no blunders were made during testing, Cassandra looks like a colossus with feet of clay. More precisely, while she balances on one leg, as in the picture at the beginning of the article, she shows relatively good results, but when fighting under the same conditions, she loses outright. At the same time, taking into account the low CPU utilization on our hardware, we learned how to plant two RegionServer HBs per host and thereby doubled the performance. Those. taking into account the utilization of resources, the situation for CS is even more deplorable.

Of course, these tests are quite synthetic and the amount of data used here is relatively modest. It is possible that when switching to terabytes, the situation would be different, but if for HB we can load terabytes, then for CS this turned out to be problematic. It often threw an OperationTimedOutException even with these volumes, although the parameters for waiting for a response were already increased several times compared to the default ones.

I hope that by joint efforts we will find bottlenecks in CS, and if we can speed it up, then at the end of the post I will definitely add information about the final results.

UPD: Thanks to the advice of comrades, it was possible to speed up reading. Was:
159 ops (644 tables, 4 threads, 5 batch).
Added by:
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
And played around with the number of threads. It turned out the following:
4 tables, 100 threads, batch = 1 (by piece): 301 ops
4 tables, 100 threads, batch = 10: 447 ops
4 tables, 100 threads, batch = 100: 625 ops

Later I will apply other tuning tips, run a full test cycle and add the results at the end of the post.

Source: habr.com

Add a comment