Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку

Це навіть не жарт, схоже, що саме ця картинка найбільше точно відображає суть цих БД, і в кінці буде зрозуміло чому:

Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку

Згідно DB-Engines Ranking, дві найпопулярніші NoSQL колонкові бази - це Cassandra (далі CS) і HBase (HB).

Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку

Волею доль наша команда управління завантаження даних у Ощадбанку вже давно та щільно працює з HB. За цей час ми досить добре вивчили її сильні та слабкі сторони та навчилися її готувати. Однак наявність альтернативи у вигляді CS весь час змушувало трохи мучитися сумнівами: а чи правильний вибір ми зробили? Тим більше, що результати порівняння, виконаного DataStax, говорили, що CS легко перемагає HB практично з розгромним рахунком. З іншого боку, DataStax - зацікавлена ​​особа, і вірити на слово не варто. Також бентежила досить мала інформація про умови тестування, тому ми вирішили з'ясувати самостійно, хто ж є королем BigData NoSql, і отримані результати виявилися дуже цікавими.

Однак, перш ніж перейти до результатів виконаних тестів, необхідно описати суттєві аспекти конфігурацій середовища. Справа в тому, що CS може використовуватися в режимі, що допускає втрату даних. Тобто. це коли лише один сервер (нода) відповідає за дані якогось ключа і якщо він з якоїсь причини відвалився, то значення цього ключа буде втрачено. Для багатьох завдань це не критично, проте для банківської сфери це скоріше виняток, ніж правило. У нашому випадку важливо мати кілька копій даних для надійного зберігання.

Тому розглядався виключно режим роботи CS як потрійний реплікації, тобто. створення кейспейсу виконувалося з такими параметрами:

CREATE KEYSPACE ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3};

Далі є два способи забезпечити необхідний рівень консистентності. Загальне правило:
NW + NR > RF

Що означає, що кількість підтверджень від нод під час запису (NW) плюс кількість підтверджень від нод під час читання (NR) має бути більшою за фактор реплікації. У разі RF = 3 і отже підходять такі варианты:
2 + 2 > 3
3 + 1 > 3

Оскільки нам важливо максимально надійно зберегти дані, була обрана схема 3+1. До того ж HB працює з аналогічним принципом, тобто. таке порівняння буде більш чесним.

Необхідно відзначити, що DataStax у своєму дослідженні робили навпаки, вони ставили RF = 1 і для CS та для HB (для останньої шляхом зміни налаштувань HDFS). Це дійсно важливий аспект, тому що вплив на продуктивність CS у цьому випадку є величезним. Наприклад, на малюнку нижче показано зростання часу, необхідного для завантаження даних у CS:

Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку

Тут ми бачимо наступне, що більше конкуруючих потоків пише дані, то довше це займає. Це природно, але важливо, що при цьому деградація продуктивності RF=3 істотно вище. Іншими словами, якщо ми пишемо в 4 таблиці в кожну по 5 потоків (разом 20), RF=3 програє приблизно в 2 рази (150 секунд RF=3 проти 75 для RF=1). Але якщо ми збільшимо навантаження, завантажуючи дані у 8 таблиць у кожну по 5 потоків (всього 40), то програш RF=3 вже у 2,7 разів (375 секунд проти 138).

Можливо, в цьому полягає секрет успішного для CS навантажувального тестування виконаного DataStax, тому що для HB на нашому стенді зміна фактора реплікації з 2 до 3 не вплинула. Тобто. диски не є вузьким місцем HB для нашої конфігурації. Однак тут є багато й інших підводних каменів, тому що треба відзначити, що наша версія HB була трохи пропатчена і затюнена, середовища абсолютно різні і т.д. Також варто відзначити, що, можливо, я просто не знаю як правильно готувати CS і існують якісь більш ефективні способи працювати з нею і сподіваюся в коментарях ми з'ясуємо це. Але про все по порядку.

Всі тести проводилися на залізному кластері, що складається з 4 серверів, кожен у конфігурації:

CPU: Xeon E5-2680 v4 @ 2.40GHz 64 threads.
Диски: 12 штук SATA HDD
java version: 1.8.0_111

Версія CS: 3.11.5

Параметри cassandra.ymlnum_tokens: 256
hinted_handoff_enabled: true
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
hints_directory: /data10/cassandra/hints
hints_flush_period_in_ms: 10000
max_hints_file_size_in_mb: 128
batchlog_replay_throttle_in_kb: 1024
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
role_manager: CassandraRoleManager
roles_validity_in_ms: 2000
permissions_validity_in_ms: 2000
credentials_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- /data1/cassandra/data # кожна директорія dataN - окремий диск
- /data2/cassandra/data
- /data3/cassandra/data
- /data4/cassandra/data
- /data5/cassandra/data
- /data6/cassandra/data
- /data7/cassandra/data
- /data8/cassandra/data
commitlog_directory: /data9/cassandra/commitlog
cdc_enabled: false
disk_failure_policy: stop
commit_failure_policy: stop
prepared_statements_cache_size_mb:
thrift_prepared_statements_cache_size_mb:
key_cache_size_in_mb:
key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
counter_cache_size_in_mb:
counter_cache_save_period: 7200
saved_caches_directory: /data10/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
commitlog_segment_size_in_mb: 32
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
параметрами:
- seeds: "*, *"
concurrent_reads: 256 # пробували 64 - різниці не помічено
concurrent_writes: 256 # пробували 64 - різниці не помічено
concurrent_counter_writes: 256 # пробували 64 - різниці не помічено
concurrent_materialized_view_writes: 32
memtable_heap_space_in_mb: 2048 # пробували 16 Гб - було повільніше
memtable_allocation_type: heap_buffers
index_summary_capacity_in_mb:
index_summary_resize_interval_in_minutes: 60
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: *
broadcast_address: *
listen_on_broadcast_address: true
internode_authenticator: org.apache.cassandra.auth.AllowAllInternodeAuthenticator
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: *
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
column_index_size_in_kb: 64
column_index_cache_size_in_kb: 2
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 1600
sstable_preemptive_open_interval_in_mb: 50
read_request_timeout_in_ms: 100000
range_request_timeout_in_ms: 200000
write_request_timeout_in_ms: 40000
counter_write_request_timeout_in_ms: 100000
cas_contention_timeout_in_ms: 20000
truncate_request_timeout_in_ms: 60000
request_timeout_in_ms: 200000
slow_query_log_timeout_in_ms: 500
cross_node_timeout: false
endpoint_snitch: GossipingPropertyFileSnitch
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
server_encryption_options:
internode_encryption: none
client_encryption_options:
enabled: false
internode_compression: dc
inter_dc_tcp_nodelay: false
tracetype_query_ttl: 86400
tracetype_repair_ttl: 604800
enable_user_defined_functions: false
enable_scripted_user_defined_functions: false
windows_timer_interval: 1
transparent_data_encryption_options:
enabled: false
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
batch_size_warn_threshold_in_kb: 200
batch_size_fail_threshold_in_kb: 250
unlogged_batch_across_partitions_warn_threshold: 10
compaction_large_partition_warning_threshold_mb: 100
gc_warn_threshold_in_ms: 1000
back_pressure_enabled: false
enable_materialized_views: true
enable_sasi_indexes: true

Налаштування GC:

### CMS Settings-XX:+UseParNewGC
-XX:+UseConcMarkSweepGC
-XX:+CMSParallelRemarkEnabled
-XX:SurvivorRatio=8
-XX:MaxTenuringThreshold=1
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSWaitDuration=10000
-XX:+CMSParallelInitialMarkEnabled
-XX:+CMSEdenChunksRecordAlways
-XX:+CMSClassUnloadingEnabled

Пам'яті jvm.options виділялося 16Gb (ще пробували 32 Gb, різниці непомічено).

Створення таблиць виконувалося командою:

CREATE TABLE ks.t1 (id bigint PRIMARY KEY, title text) WITH compression = {'sstable_compression': 'LZ4Compressor', 'chunk_length_kb': 64};

Версія HB: 1.2.0-cdh5.14.2.

Параметри non-default HBasezookeeper.session.timeout: 120000
hbase.rpc.timeout: 2 хвилини
hbase.client.scanner.timeout.period: 2 minute(s)
hbase.master.handler.count: 10
hbase.regionserver.lease.period, hbase.client.scanner.timeout.period: 2 minute(s)
hbase.regionserver.handler.count: 160
hbase.regionserver.metahandler.count: 30
hbase.regionserver.logroll.period: 4 hour(s)
hbase.regionserver.maxlogs: 200
hbase.hregion.memstore.flush.size: 1 GiB
hbase.hregion.memstore.block.multiplier: 6
hbase.hstore.compactionThreshold: 5
hbase.hstore.blockingStoreFiles: 200
hbase.hregion.majorcompaction: 1 день(s)
HBase Service Advanced Configuration Snippet (Safety Valve) для hbase-site.xml:
hbase.regionserver.wal.codecorg.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec
hbase.master.namespace.init.timeout3600000
hbase.regionserver.optionalcacheflushinterval18000000
hbase.regionserver.thread.compaction.large12
hbase.regionserver.wal.enablecompressiontrue
hbase.hstore.compaction.max.size1073741824
hbase.server.compactchecker.interval.multiplier200
Java Configuration Options for HBase RegionServer:
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:ReservedCodeCacheSize=256m
hbase.snapshot.master.timeoutMillis: 2 minute(s)
hbase.snapshot.region.timeout: 2 хвилини
hbase.snapshot.master.timeout.millis: 2 хвилини
HBase REST Server Max Log Size: 100 MiB
HBase REST Server Maximum Log File Backups: 5
HBase Thrift Server Max Log Size: 100 MiB
HBase Thrift Server Maximum Log File Backups: 5
Master Max Log Size: 100 MiB
Master Maximum Log File Backups: 5
RegionServer Max Log Size: 100 MiB
RegionServer Maximum Log File Backups: 5
HBase Active Master Detection Window: 4 хвилин(и)
dfs.client.hedged.read.threadpool.size: 40
dfs.client.hedged.read.threshold.millis: 10 millisecond(s)
hbase.rest.threads.min: 8
hbase.rest.threads.max: 150
Maximum Process File Descriptors: 180000
hbase.thrift.minWorkerThreads: 200
hbase.master.executor.openregion.threads: 30
hbase.master.executor.closeregion.threads: 30
hbase.master.executor.serverops.threads: 60
hbase.regionserver.thread.compaction.small: 6
hbase.ipc.server.read.threadpool.size: 20
Region Mover Threads: 6
Client Java Heap Size in Bytes: 1 GiB
HBase REST Server Default Group: 3 GiB
HBase Thrift Server Default Group: 3 GiB
Java Heap Size of HBase Master in Bytes: 16 GiB
Java Heap Size of HBase RegionServer in Bytes: 32 GiB

+ZooKeeper
maxClientCnxns: 601
maxSessionTimeout: 120000
Створення таблиць:
hbase org.apache.hadoop.hbase.util.RegionSplitter ns:t1 UniformSplit -c 64 -f cf
alter 'ns:t1', {NAME => 'cf', DATA_BLOCK_ENCODING => 'FAST_DIFF', COMPRESSION => 'GZ'}

Тут є один важливий момент — в описі DataStax не сказано, скільки регіонів використовувалося під час створення таблиць HB, хоча це критично великих обсягів. Тому для тестів було обрано кількість = 64, що дозволяє зберігати до 640 ГБ, тобто. таблицю середнього розміру.

У HBase на момент проведення тесту було 22 тисяч таблиць і 67 тисяч регіонів (це було б вбивчо для версії 1.2.0, якби не патч про який сказано вище).

Тепер щодо коду. Так як не було ясності, які конфігурації є більш виграшним для тієї чи іншої БД, тести проводилися у різних комбінаціях. Тобто. в одних тестах завантаження йшло одночасно в 4 таблиці (для підключення використовувалися всі 4 ноди). В інших тестах працювали із 8 різними таблицями. У деяких випадках розмір батча дорівнював 100, в інших 200 (параметр batch - див. код нижче). Розмір даних для значення 10 байт або 100 байт (dataSize). Всього щоразу записувалося та вичитувалося по 5 млн. записів у кожну таблицю. У кожну таблицю писали/читали 5 потоків (номер потоку — thNum), кожен із яких використовував свій діапазон ключів (count = 1 млн):

if (opType.equals("insert")) {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        StringBuilder sb = new StringBuilder("BEGIN BATCH ");
        for (int i = 0; i < batch; i++) {
            String value = RandomStringUtils.random(dataSize, true, true);
            sb.append("INSERT INTO ")
                    .append(tableName)
                    .append("(id, title) ")
                    .append("VALUES (")
                    .append(key)
                    .append(", '")
                    .append(value)
                    .append("');");
            key++;
        }
        sb.append("APPLY BATCH;");
        final String query = sb.toString();
        session.execute(query);
    }
} else {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        StringBuilder sb = new StringBuilder("SELECT * FROM ").append(tableName).append(" WHERE id IN (");
        for (int i = 0; i < batch; i++) {
            sb = sb.append(key);
            if (i+1 < batch)
                sb.append(",");
            key++;
        }
        sb = sb.append(");");
        final String query = sb.toString();
        ResultSet rs = session.execute(query);
    }
}

Відповідно, аналогічний функціонал був передбачений для HB:

Configuration conf = getConf();
HTable table = new HTable(conf, keyspace + ":" + tableName);
table.setAutoFlush(false, false);
List<Get> lGet = new ArrayList<>();
List<Put> lPut = new ArrayList<>();
byte[] cf = Bytes.toBytes("cf");
byte[] qf = Bytes.toBytes("value");
if (opType.equals("insert")) {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        lPut.clear();
        for (int i = 0; i < batch; i++) {
            Put p = new Put(makeHbaseRowKey(key));
            String value = RandomStringUtils.random(dataSize, true, true);
            p.addColumn(cf, qf, value.getBytes());
            lPut.add(p);
            key++;
        }
        table.put(lPut);
        table.flushCommits();
    }
} else {
    for (Long key = count * thNum; key < count * (thNum + 1); key += 0) {
        lGet.clear();
        for (int i = 0; i < batch; i++) {
            Get g = new Get(makeHbaseRowKey(key));
            lGet.add(g);
            key++;
        }
        Result[] rs = table.get(lGet);
    }
}

Так як у HB про рівномірний розподіл даних повинен піклуватися клієнт, то функція соління ключа виглядала так:

public static byte[] makeHbaseRowKey(long key) {
    byte[] nonSaltedRowKey = Bytes.toBytes(key);
    CRC32 crc32 = new CRC32();
    crc32.update(nonSaltedRowKey);
    long crc32Value = crc32.getValue();
    byte[] salt = Arrays.copyOfRange(Bytes.toBytes(crc32Value), 5, 7);
    return ArrayUtils.addAll(salt, nonSaltedRowKey);
}

Тепер найцікавіше результати:

Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку

Теж саме у вигляді графіка:

Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку

Перевага HB настільки дивна, що є підозра про наявність якогось вузького місця у налаштуванні CS. Однак гуглеж і кручення найбільш очевидних параметрів (на зразок concurrent_writes або memtable_heap_space_in_mb) прискорення не дало. При цьому в логах чисто, ні на що не свариться.

Дані лягли по нодах поступово, статистика з усіх нод приблизно однакова.

Ось як виглядає статистика по таблиці з однією з нідKeyspace: ks
Read Count: 9383707
Read Latency: 0.04287025042448576 ms
Write Count: 15462012
Write Latency: 0.1350068438699957 ms
Pending Flushes: 0
Table: t1
SSTable count: 16
Space used (live): 148.59 MiB
Space used (total): 148.59 MiB
Space used by snapshots (total): 0 bytes
Off heap memory used (total): 5.17 MiB
SSTable Compression Ratio: 0.5720989576459437
Номер партій (estimate): 3970323
Memtable cell count: 0
Memtable data size: 0 bytes
Memtable off heap memory used: 0 bytes
Memtable switch count: 5
Local read count: 2346045
Local read latency: NaN ms
Local write count: 3865503
Local write latency: NaN ms
Pending flushes: 0
Percent repaired: 0.0
Bloom filter false positives: 25
Bloom filter false ratio: 0.00000
Bloom filter space used: 4.57 MiB
Bloom filter off heap memory використаний: 4.57 MiB
Index summary off heap memory used: 590.02 KiB
Compression metadata off heap memory used: 19.45 KiB
Compacted partition minimum bytes: 36
Compacted partition maximum bytes: 42
Compacted partition mean bytes: 42
Average live cells per slice (last five minutes): NaN
Maximum live cells per slice (останні п'ять хвилин): 0
Average tombstones per slice (last five minutes): NaN
Maximum tombstones per slice (last five minutes): 0
Dropped Mutations: 0 bytes

Спроба зменшувати розмір батчу (аж до відправки поштучно) не дала ефекту, стало лише гірше. Можливо, що насправді це дійсно максимум продуктивності для CS, оскільки отримані результати CS схожі на ті, що вийшли і в DataStax - близько сотні тисяч операцій в секунду. Крім того, якщо подивитися на утилізацію ресурсів, то побачимо, що CS використовує набагато більше ЦПУ і дисків:

Битва двох якодзун, або Cassandra vs HBase. Досвід команди Ощадбанку
На малюнку показано утилізацію під час прогону всіх тестів поспіль для обох БД.

Щодо потужної переваги HB при читанні. Тут видно, що обох БД утилізація дисків під час читання вкрай низька (тести читання це завершальна частина циклу тестування кожної БД, наприклад для CS це з 15:20 до 15:40). У випадку HB причина зрозуміла — більша частина даних висить у пам'яті, в memstore і частина закеширувалася в блок-кашти. Що стосується CS, то тут не дуже зрозуміло як вона влаштована, але також утилізації дисків не видно, але про всяк випадок була спроба включити кеш row_cache_size_in_mb = 2048 і встановлено caching = {'keys': 'ALL', 'rows_per_partition': ' 2000000'}, але від цього стало навіть трохи гірше.

Також варто ще раз проговорити істотний момент про кількість регіонів в HB. У нашому випадку було вказано значення 64. Якщо ж зменшувати його і зробити рівним, наприклад, 4, то при читанні швидкість падає в 2 рази. Причина в тому, що memstore забиватиметься швидше і файли будуть флашитися частіше і при читанні потрібно буде обробляти більше файлів, що для HB досить складна операція. У реальних умовах це лікується продумуванням стратегії пресплітінгу та компактифікації, зокрема ми використовуємо самописну утиліту, яка займається складанням сміття та стисненням HFiles постійно у фоновому режимі. Цілком можливо, що для тестів DataStax виділяли взагалі 1 регіон на таблицю (що не правильно) і це дещо прояснило б, чому HB так програвав у їх тестах на читання.

Попередні висновки звідси виходять такі. Якщо припустити, що в ході тестування не було допущено грубих помилок, то Cassandra схожа на колос на глиняних ногах. Точніше, поки вона балансує на одній нозі, як на картинці на початку статті, вона показує відносно непогані результати, але при сутичці в однакових умовах програє начисто. При цьому з огляду на низьку утилізацію CPU на нашому залозі ми навчилися висаджувати по два RegionServer HB на хост і тим самим подвоїли продуктивність. Тобто. з урахуванням утилізації ресурсів ситуація для CS виходить ще плачевніша.

Безумовно, ці тести досить синтетичні та обсяг даних, який використовувався тут, відносно скромний. Не виключено, що при переході на терабайти ситуація була б іншою, проте якщо для HB ми вміємо вантажити терабайти, то для CS це проблематично. Вона часто видавала OperationTimedOutException навіть при цих обсягах, хоча параметри очікування відгуку і так були збільшені в рази, ніж дефолтні.

Сподіваюся, що спільними зусиллями ми знайдемо вузькі місця CS і якщо вдасться її прискорити, то наприкінці посту обов'язково додам інформацію про підсумкові результати.

UPD: Завдяки порадам камрадів удалося прискорити читання. Було:
159 ops (644 таблиці, 4 потоків, батч 5).
Додано:
.withLoadBalancingPolicy(new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()))
І пограв із кількістю потоків. Вийшло таке:
4 таблиці, 100 потоків, батч = 1 (поштучно): 301 969 ops
4 таблиці, 100 потоків, батч = 10: 447 ops
4 таблиці, 100 потоків, батч = 100: 625 ops

Пізніше застосую інші поради з тюнінгу, прожену повний цикл тестування та додам результати наприкінці посту.

Джерело: habr.com

Додати коментар або відгук