Olá a todos. Estamos desenvolvendo um produto para análise de tráfego offline. O projeto tem uma tarefa relacionada à análise estatística das rotas dos visitantes entre regiões.
Como parte desta tarefa, os usuários podem fazer consultas ao sistema do seguinte tipo:
- quantos visitantes passaram da área “A” para a área “B”;
- quantos visitantes passaram da área “A” para a área “B”, passando pela área “C” e depois pela área “D”;
- quanto tempo levou para um determinado tipo de visitante viajar da área “A” para a área “B”.
e uma série de consultas analíticas semelhantes.
O movimento do visitante pelas áreas é um gráfico direcionado. Depois de ler na Internet, descobri que SGBDs gráficos também são usados para relatórios analíticos. Eu queria ver como os SGBDs gráficos lidariam com essas consultas (TL; DR; mal).
Eu escolhi usar o SGBD
- Back-end de armazenamento BerkeleyDB, Apache Cassandra, Scylla;
- índices complexos podem ser armazenados em Lucene, Elasticsearch, Solr.
Os autores do JanusGraph escrevem que ele é adequado tanto para OLTP quanto para OLAP.
Trabalhei com BerkeleyDB, Apache Cassandra, Scylla e ES, e esses produtos são frequentemente usados em nossos sistemas, então fiquei otimista em testar este SGBD gráfico. Achei estranho escolher BerkeleyDB em vez de RocksDB, mas isso provavelmente se deve aos requisitos de transação. Em qualquer caso, para uso escalável do produto, sugere-se usar um backend em Cassandra ou Scylla.
Não considerei o Neo4j porque o clustering requer uma versão comercial, ou seja, o produto não é open source.
Os SGBDs gráficos dizem: “Se parece um gráfico, trate-o como um gráfico!” - beleza!
Primeiro, desenhei um gráfico, que foi feito exatamente de acordo com os cânones dos SGBDs gráficos:
Existe uma essência Zone
, responsável pela área. Se ZoneStep
pertence a isso Zone
, então ele se refere a isso. Na essência Area
, ZoneTrack
, Person
Não preste atenção, eles pertencem ao domínio e não são considerados parte do teste. No total, uma consulta de pesquisa em cadeia para tal estrutura gráfica seria semelhante a:
g.V().hasLabel('Zone').has('id',0).in_()
.repeat(__.out()).until(__.out().hasLabel('Zone').has('id',19)).count().next()
O que em russo é algo assim: encontre uma Zona com ID = 0, pegue todos os vértices dos quais uma aresta vai até ela (ZoneStep), pise sem voltar até encontrar aqueles ZoneSteps dos quais há uma aresta para a Zona com ID = 19, conte o número dessas cadeias.
Não pretendo conhecer todos os meandros da pesquisa em gráficos, mas esta consulta foi gerada com base neste livro (
Carreguei 50 mil trilhas variando de 3 a 20 pontos de comprimento em um banco de dados gráfico JanusGraph usando o backend BerkeleyDB, criei índices de acordo com
Script de download do Python:
from random import random
from time import time
from init import g, graph
if __name__ == '__main__':
points = []
max_zones = 19
zcache = dict()
for i in range(0, max_zones + 1):
zcache[i] = g.addV('Zone').property('id', i).next()
startZ = zcache[0]
endZ = zcache[max_zones]
for i in range(0, 10000):
if not i % 100:
print(i)
start = g.addV('ZoneStep').property('time', int(time())).next()
g.V(start).addE('belongs').to(startZ).iterate()
while True:
pt = g.addV('ZoneStep').property('time', int(time())).next()
end_chain = random()
if end_chain < 0.3:
g.V(pt).addE('belongs').to(endZ).iterate()
g.V(start).addE('goes').to(pt).iterate()
break
else:
zone_id = int(random() * max_zones)
g.V(pt).addE('belongs').to(zcache[zone_id]).iterate()
g.V(start).addE('goes').to(pt).iterate()
start = pt
count = g.V().count().next()
print(count)
Usamos uma VM com 4 núcleos e 16 GB de RAM em um SSD. JanusGraph foi implantado usando este comando:
docker run --name janusgraph -p8182:8182 janusgraph/janusgraph:latest
Nesse caso, os dados e índices usados para pesquisas de correspondência exata são armazenados no BerkeleyDB. Tendo executado a solicitação fornecida anteriormente, recebi um tempo igual a várias dezenas de segundos.
Ao executar os 4 scripts acima em paralelo, consegui transformar o DBMS em uma abóbora com um fluxo alegre de stacktraces Java (e todos nós adoramos ler stacktraces Java) nos logs do Docker.
Depois de pensar um pouco, decidi simplificar o diagrama gráfico da seguinte forma:
Decidir que a pesquisa por atributos de entidade seria mais rápida do que a pesquisa por arestas. Como resultado, meu pedido se transformou no seguinte:
g.V().hasLabel('ZoneStep').has('id',0).repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',19)).count().next()
O que em russo é algo assim: encontre ZoneStep com ID=0, pise sem voltar até encontrar ZoneStep com ID=19, conte o número dessas cadeias.
Também simplifiquei o script de carregamento fornecido acima para não criar conexões desnecessárias, limitando-me aos atributos.
O pedido ainda demorava vários segundos a ser concluído, o que era totalmente inaceitável para a nossa tarefa, uma vez que não era de todo adequado para efeitos de pedidos AdHoc de qualquer tipo.
Tentei implantar o JanusGraph usando Scylla como a implementação mais rápida do Cassandra, mas isso também não levou a nenhuma alteração significativa no desempenho.
Portanto, apesar de "parecer um gráfico", não consegui fazer com que o SGBD gráfico o processasse rapidamente. Presumo plenamente que há algo que não sei e que o JanusGraph pode ser feito para realizar essa pesquisa em uma fração de segundo, porém, não consegui fazê-lo.
Como o problema ainda precisava ser resolvido, comecei a pensar em JOINs e Pivots de tabelas, o que não inspirava otimismo em termos de elegância, mas poderia ser uma opção completamente viável na prática.
Nosso projeto já utiliza Apache ClickHouse, então resolvi testar minha pesquisa neste SGBD analítico.
Implantei o ClickHouse usando uma receita simples:
sudo docker run -d --name clickhouse_1
--ulimit nofile=262144:262144
-v /opt/clickhouse/log:/var/log/clickhouse-server
-v /opt/clickhouse/data:/var/lib/clickhouse
yandex/clickhouse-server
Criei um banco de dados e uma tabela assim:
CREATE TABLE
db.steps (`area` Int64, `when` DateTime64(1, 'Europe/Moscow') DEFAULT now64(), `zone` Int64, `person` Int64)
ENGINE = MergeTree() ORDER BY (area, zone, person) SETTINGS index_granularity = 8192
Preenchi-o com dados usando o seguinte script:
from time import time
from clickhouse_driver import Client
from random import random
client = Client('vm-12c2c34c-df68-4a98-b1e5-a4d1cef1acff.domain',
database='db',
password='secret')
max = 20
for r in range(0, 100000):
if r % 1000 == 0:
print("CNT: {}, TS: {}".format(r, time()))
data = [{
'area': 0,
'zone': 0,
'person': r
}]
while True:
if random() < 0.3:
break
data.append({
'area': 0,
'zone': int(random() * (max - 2)) + 1,
'person': r
})
data.append({
'area': 0,
'zone': max - 1,
'person': r
})
client.execute(
'INSERT INTO steps (area, zone, person) VALUES',
data
)
Como as inserções vêm em lotes, o preenchimento foi muito mais rápido do que no JanusGraph.
Construímos duas consultas usando JOIN. Para passar do ponto A ao ponto B:
SELECT s1.person AS person,
s1.zone,
s1.when,
s2.zone,
s2.when
FROM
(SELECT *
FROM steps
WHERE (area = 0)
AND (zone = 0)) AS s1 ANY INNER JOIN
(SELECT *
FROM steps AS s2
WHERE (area = 0)
AND (zone = 19)) AS s2 USING person
WHERE s1.when <= s2.when
Para passar por 3 pontos:
SELECT s3.person,
s1z,
s1w,
s2z,
s2w,
s3.zone,
s3.when
FROM
(SELECT s1.person AS person,
s1.zone AS s1z,
s1.when AS s1w,
s2.zone AS s2z,
s2.when AS s2w
FROM
(SELECT *
FROM steps
WHERE (area = 0)
AND (zone = 0)) AS s1 ANY INNER JOIN
(SELECT *
FROM steps AS s2
WHERE (area = 0)
AND (zone = 3)) AS s2 USING person
WHERE s1.when <= s2.when) p ANY INNER JOIN
(SELECT *
FROM steps
WHERE (area = 0)
AND (zone = 19)) AS s3 USING person
WHERE p.s2w <= s3.when
As solicitações, é claro, parecem bastante assustadoras: para uso real, você precisa criar um gerador de software. No entanto, eles funcionam e funcionam rapidamente. Tanto a primeira quanto a segunda solicitação são concluídas em menos de 0.1 segundos. Aqui está um exemplo do tempo de execução da consulta para count(*) passando por 3 pontos:
SELECT count(*)
FROM
(
SELECT
s1.person AS person,
s1.zone AS s1z,
s1.when AS s1w,
s2.zone AS s2z,
s2.when AS s2w
FROM
(
SELECT *
FROM steps
WHERE (area = 0) AND (zone = 0)
) AS s1
ANY INNER JOIN
(
SELECT *
FROM steps AS s2
WHERE (area = 0) AND (zone = 3)
) AS s2 USING (person)
WHERE s1.when <= s2.when
) AS p
ANY INNER JOIN
(
SELECT *
FROM steps
WHERE (area = 0) AND (zone = 19)
) AS s3 USING (person)
WHERE p.s2w <= s3.when
┌─count()─┐
│ 11592 │
└─────────┘
1 rows in set. Elapsed: 0.068 sec. Processed 250.03 thousand rows, 8.00 MB (3.69 million rows/s., 117.98 MB/s.)
Uma observação sobre IOPS. Ao preencher os dados, o JanusGraph gerou um número bastante alto de IOPS (1000-1300 para quatro threads de preenchimento de dados) e o IOWAIT foi bastante alto. Ao mesmo tempo, ClickHouse gerou carga mínima no subsistema de disco.
Conclusão
Decidimos usar o ClickHouse para atender esse tipo de solicitação. Sempre podemos otimizar ainda mais as consultas usando visualizações materializadas e paralelização, pré-processando o fluxo de eventos usando Apache Flink antes de carregá-los no ClickHouse.
O desempenho é tão bom que provavelmente nem precisaremos pensar em dinamizar tabelas programaticamente. Anteriormente, tínhamos que fazer pivôs de dados recuperados do Vertica por meio de upload para o Apache Parquet.
Infelizmente, outra tentativa de usar um SGBD gráfico não teve sucesso. Não achei que o JanusGraph tivesse um ecossistema amigável que facilitasse o conhecimento do produto. Ao mesmo tempo, para configurar o servidor, utiliza-se o método Java tradicional, que fará quem não conhece Java chorar lágrimas de sangue:
host: 0.0.0.0
port: 8182
threadPoolWorker: 1
gremlinPool: 8
scriptEvaluationTimeout: 30000
channelizer: org.janusgraph.channelizers.JanusGraphWsAndHttpChannelizer
graphManager: org.janusgraph.graphdb.management.JanusGraphManager
graphs: {
ConfigurationManagementGraph: conf/janusgraph-cql-configurationgraph.properties,
airlines: conf/airlines.properties
}
scriptEngines: {
gremlin-groovy: {
plugins: { org.janusgraph.graphdb.tinkerpop.plugin.JanusGraphGremlinPlugin: {},
org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {},
org.apache.tinkerpop.gremlin.tinkergraph.jsr223.TinkerGraphGremlinPlugin: {},
org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: {classImports: [java.lang.Math], methodImports: [java.lang.Math#*]},
org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/airline-sample.groovy]}}}}
serializers:
# GraphBinary is here to replace Gryo and Graphson
- { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { serializeResultToString: true }}
# Gryo and Graphson, latest versions
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { serializeResultToString: true }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
# Older serialization versions for backwards compatibility:
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoLiteMessageSerializerV1d0, config: {ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV2d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
processors:
- { className: org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor, config: { sessionTimeout: 28800000 }}
- { className: org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor, config: { cacheExpirationTime: 600000, cacheMaxSize: 1000 }}
metrics: {
consoleReporter: {enabled: false, interval: 180000},
csvReporter: {enabled: false, interval: 180000, fileName: /tmp/gremlin-server-metrics.csv},
jmxReporter: {enabled: false},
slf4jReporter: {enabled: true, interval: 180000},
gangliaReporter: {enabled: false, interval: 180000, addressingMode: MULTICAST},
graphiteReporter: {enabled: false, interval: 180000}}
threadPoolBoss: 1
maxInitialLineLength: 4096
maxHeaderSize: 8192
maxChunkSize: 8192
maxContentLength: 65536
maxAccumulationBufferComponents: 1024
resultIterationBatchSize: 64
writeBufferHighWaterMark: 32768
writeBufferHighWaterMark: 65536
ssl: {
enabled: false}
Consegui "colocar" acidentalmente a versão BerkeleyDB do JanusGraph.
A documentação é bastante distorcida em termos de índices, já que o gerenciamento de índices exige que você execute algum xamanismo bastante estranho no Groovy. Por exemplo, a criação de um índice deve ser feita escrevendo o código no console do Gremlin (que, aliás, não funciona imediatamente). Da documentação oficial do JanusGraph:
graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
age = mgmt.getPropertyKey('age')
mgmt.buildIndex('byNameComposite', Vertex.class).addKey(name).buildCompositeIndex()
mgmt.buildIndex('byNameAndAgeComposite', Vertex.class).addKey(name).addKey(age).buildCompositeIndex()
mgmt.commit()
//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameComposite').call()
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameAndAgeComposite').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byNameComposite"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("byNameAndAgeComposite"), SchemaAction.REINDEX).get()
mgmt.commit()
Posfácio
De certo modo, a experiência acima é uma comparação entre quente e suave. Se você pensar bem, um SGBD gráfico realiza outras operações para obter os mesmos resultados. Porém, como parte dos testes, também conduzi um experimento com uma solicitação como:
g.V().hasLabel('ZoneStep').has('id',0)
.repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',1)).count().next()
o que reflete a distância a pé. Porém, mesmo nesses dados, o gráfico SGBD apresentou resultados que ultrapassaram alguns segundos... Isso, claro, se deve ao fato de existirem caminhos como 0 -> X -> Y ... -> 1
, que o mecanismo gráfico também verificou.
Mesmo para uma consulta como:
g.V().hasLabel('ZoneStep').has('id',0).out().has('id',1)).count().next()
Não consegui obter uma resposta produtiva com um tempo de processamento inferior a um segundo.
A moral da história é que uma bela ideia e uma modelagem paradigmática não levam ao resultado desejado, o que é demonstrado com muito maior eficiência no exemplo do ClickHouse. O caso de uso apresentado neste artigo é um claro antipadrão para SGBDs gráficos, embora pareça adequado para modelagem em seu paradigma.
Fonte: habr.com