Un esperimento per testare l'applicabilità del DBMS a grafo JanusGraph per risolvere il problema di trovare percorsi adeguati

Un esperimento per testare l'applicabilità del DBMS a grafo JanusGraph per risolvere il problema di trovare percorsi adeguati

Ciao a tutti. Stiamo sviluppando un prodotto per l'analisi del traffico offline. Il progetto ha un compito legato all'analisi statistica dei percorsi dei visitatori attraverso le regioni.

Come parte di questa attività, gli utenti possono porre al sistema query del seguente tipo:

  • quanti visitatori sono passati dall'area “A” all'area “B”;
  • quanti visitatori sono passati dall'area “A” all'area “B” passando per l'area “C” e poi per l'area “D”;
  • quanto tempo impiega un certo tipo di visitatore per viaggiare dall'area “A” all'area “B”.

e una serie di query analitiche simili.

Il movimento del visitatore attraverso le aree è un grafico diretto. Dopo aver letto Internet, ho scoperto che i DBMS a grafico vengono utilizzati anche per report analitici. Avevo il desiderio di vedere come i DBMS a grafo avrebbero affrontato tali query (TL; DR; male).

Ho scelto di utilizzare il DBMS JanusGraph, come eccezionale rappresentante del DBMS grafico open source, che si basa su una serie di tecnologie mature, che (a mio parere) dovrebbero fornirgli caratteristiche operative decenti:

  • Backend di archiviazione BerkeleyDB, Apache Cassandra, Scylla;
  • gli indici complessi possono essere archiviati in Lucene, Elasticsearch, Solr.

Gli autori di JanusGraph scrivono che è adatto sia per OLTP che per OLAP.

Ho lavorato con BerkeleyDB, Apache Cassandra, Scylla ed ES e questi prodotti sono spesso utilizzati nei nostri sistemi, quindi ero ottimista nel testare questo DBMS a grafico. Ho trovato strano scegliere BerkeleyDB rispetto a RocksDB, ma probabilmente è dovuto ai requisiti di transazione. In ogni caso, per un utilizzo scalabile del prodotto, si consiglia di utilizzare un backend su Cassandra o Scylla.

Non ho considerato Neo4j perché il clustering richiede una versione commerciale, cioè il prodotto non è open source.

I DBMS a grafico dicono: “Se sembra un grafico, trattalo come un grafico!” - bellezza!

Per prima cosa ho disegnato un grafico, che è stato realizzato esattamente secondo i canoni dei DBMS a grafici:

Un esperimento per testare l'applicabilità del DBMS a grafo JanusGraph per risolvere il problema di trovare percorsi adeguati

C'è un'essenza Zone, responsabile del territorio. Se ZoneStep appartiene a questo Zone, poi si riferisce ad esso. Sull'essenza Area, ZoneTrack, Person Non prestare attenzione, appartengono al dominio e non sono considerati parte del test. In totale, una query di ricerca a catena per una tale struttura grafica sarebbe simile a:

g.V().hasLabel('Zone').has('id',0).in_()
       .repeat(__.out()).until(__.out().hasLabel('Zone').has('id',19)).count().next()

Ciò che in russo è qualcosa del genere: trova una Zona con ID=0, prendi tutti i vertici da cui arriva un bordo (ZoneStep), calpesta senza tornare indietro finché non trovi quegli ZoneStep da cui c'è un bordo alla Zona con ID=19, conta il numero di tali catene.

Non pretendo di conoscere tutte le complessità della ricerca sui grafici, ma questa query è stata generata sulla base di questo libro (https://kelvinlawrence.net/book/Gremlin-Graph-Guide.html).

Ho caricato 50mila tracce di lunghezza compresa tra 3 e 20 punti in un database a grafo JanusGraph utilizzando il backend BerkeleyDB, ho creato indici secondo gestione.

Script di download di Python:


from random import random
from time import time

from init import g, graph

if __name__ == '__main__':

    points = []
    max_zones = 19
    zcache = dict()
    for i in range(0, max_zones + 1):
        zcache[i] = g.addV('Zone').property('id', i).next()

    startZ = zcache[0]
    endZ = zcache[max_zones]

    for i in range(0, 10000):

        if not i % 100:
            print(i)

        start = g.addV('ZoneStep').property('time', int(time())).next()
        g.V(start).addE('belongs').to(startZ).iterate()

        while True:
            pt = g.addV('ZoneStep').property('time', int(time())).next()
            end_chain = random()
            if end_chain < 0.3:
                g.V(pt).addE('belongs').to(endZ).iterate()
                g.V(start).addE('goes').to(pt).iterate()
                break
            else:
                zone_id = int(random() * max_zones)
                g.V(pt).addE('belongs').to(zcache[zone_id]).iterate()
                g.V(start).addE('goes').to(pt).iterate()

            start = pt

    count = g.V().count().next()
    print(count)

Abbiamo utilizzato una VM con 4 core e 16 GB di RAM su un SSD. JanusGraph è stato distribuito utilizzando questo comando:

docker run --name janusgraph -p8182:8182 janusgraph/janusgraph:latest

In questo caso, i dati e gli indici utilizzati per le ricerche con corrispondenza esatta vengono archiviati in BerkeleyDB. Eseguendo la richiesta fatta in precedenza, ho ricevuto un tempo pari a diverse decine di secondi.

Eseguendo i 4 script sopra in parallelo, sono riuscito a trasformare il DBMS in una zucca con un allegro flusso di stacktrace Java (e tutti noi adoriamo leggere gli stacktrace Java) nei log di Docker.

Dopo qualche riflessione, ho deciso di semplificare il diagramma grafico nel modo seguente:

Un esperimento per testare l'applicabilità del DBMS a grafo JanusGraph per risolvere il problema di trovare percorsi adeguati

Decidere che la ricerca per attributi di entità sarebbe più veloce della ricerca per bordi. Di conseguenza, la mia richiesta si è trasformata in quanto segue:

g.V().hasLabel('ZoneStep').has('id',0).repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',19)).count().next()

Quello che in russo è qualcosa del genere: trova ZoneStep con ID=0, calpesta senza tornare indietro finché non trovi ZoneStep con ID=19, conta il numero di tali catene.

Ho inoltre semplificato lo script di caricamento sopra riportato per non creare collegamenti inutili, limitandomi agli attributi.

La richiesta ha comunque richiesto diversi secondi per essere completata, il che era del tutto inaccettabile per il nostro compito, poiché non era affatto adatta agli scopi di richieste AdHoc di qualsiasi tipo.

Ho provato a distribuire JanusGraph utilizzando Scylla come l'implementazione di Cassandra più veloce, ma anche questo non ha portato a cambiamenti significativi nelle prestazioni.

Quindi, nonostante il fatto che "sembra un grafico", non sono riuscito a far sì che il DBMS del grafico lo elaborasse rapidamente. Presumo pienamente che ci sia qualcosa che non so e che JanusGraph possa essere in grado di eseguire questa ricerca in una frazione di secondo, tuttavia, non sono stato in grado di farlo.

Dato che il problema doveva ancora essere risolto, ho iniziato a pensare ai JOIN e ai Pivot delle tabelle, che non ispiravano ottimismo in termini di eleganza, ma potevano essere un'opzione completamente realizzabile nella pratica.

Il nostro progetto utilizza già Apache ClickHouse, quindi ho deciso di testare la mia ricerca su questo DBMS analitico.

Distribuito ClickHouse utilizzando una semplice ricetta:

sudo docker run -d --name clickhouse_1 
     --ulimit nofile=262144:262144 
     -v /opt/clickhouse/log:/var/log/clickhouse-server 
     -v /opt/clickhouse/data:/var/lib/clickhouse 
     yandex/clickhouse-server

Ho creato un database e una tabella al suo interno in questo modo:

CREATE TABLE 
db.steps (`area` Int64, `when` DateTime64(1, 'Europe/Moscow') DEFAULT now64(), `zone` Int64, `person` Int64) 
ENGINE = MergeTree() ORDER BY (area, zone, person) SETTINGS index_granularity = 8192

L'ho riempito con i dati usando il seguente script:

from time import time

from clickhouse_driver import Client
from random import random

client = Client('vm-12c2c34c-df68-4a98-b1e5-a4d1cef1acff.domain',
                database='db',
                password='secret')

max = 20

for r in range(0, 100000):

    if r % 1000 == 0:
        print("CNT: {}, TS: {}".format(r, time()))

    data = [{
            'area': 0,
            'zone': 0,
            'person': r
        }]

    while True:
        if random() < 0.3:
            break

        data.append({
                'area': 0,
                'zone': int(random() * (max - 2)) + 1,
                'person': r
            })

    data.append({
            'area': 0,
            'zone': max - 1,
            'person': r
        })

    client.execute(
        'INSERT INTO steps (area, zone, person) VALUES',
        data
    )

Poiché gli inserti vengono forniti in lotti, il riempimento è stato molto più rapido rispetto a JanusGraph.

Costruite due query utilizzando JOIN. Per spostarsi dal punto A al punto B:

SELECT s1.person AS person,
       s1.zone,
       s1.when,
       s2.zone,
       s2.when
FROM
  (SELECT *
   FROM steps
   WHERE (area = 0)
     AND (zone = 0)) AS s1 ANY INNER JOIN
  (SELECT *
   FROM steps AS s2
   WHERE (area = 0)
     AND (zone = 19)) AS s2 USING person
WHERE s1.when <= s2.when

Per passare attraverso 3 punti:

SELECT s3.person,
       s1z,
       s1w,
       s2z,
       s2w,
       s3.zone,
       s3.when
FROM
  (SELECT s1.person AS person,
          s1.zone AS s1z,
          s1.when AS s1w,
          s2.zone AS s2z,
          s2.when AS s2w
   FROM
     (SELECT *
      FROM steps
      WHERE (area = 0)
        AND (zone = 0)) AS s1 ANY INNER JOIN
     (SELECT *
      FROM steps AS s2
      WHERE (area = 0)
        AND (zone = 3)) AS s2 USING person
   WHERE s1.when <= s2.when) p ANY INNER JOIN
  (SELECT *
   FROM steps
   WHERE (area = 0)
     AND (zone = 19)) AS s3 USING person
WHERE p.s2w <= s3.when

Le richieste, ovviamente, sembrano piuttosto spaventose; per un utilizzo reale, è necessario creare un cablaggio del generatore software. Tuttavia, funzionano e funzionano rapidamente. Sia la prima che la seconda richiesta vengono completate in meno di 0.1 secondi. Ecco un esempio del tempo di esecuzione della query per count(*) che passa attraverso 3 punti:

SELECT count(*)
FROM 
(
    SELECT 
        s1.person AS person, 
        s1.zone AS s1z, 
        s1.when AS s1w, 
        s2.zone AS s2z, 
        s2.when AS s2w
    FROM 
    (
        SELECT *
        FROM steps
        WHERE (area = 0) AND (zone = 0)
    ) AS s1
    ANY INNER JOIN 
    (
        SELECT *
        FROM steps AS s2
        WHERE (area = 0) AND (zone = 3)
    ) AS s2 USING (person)
    WHERE s1.when <= s2.when
) AS p
ANY INNER JOIN 
(
    SELECT *
    FROM steps
    WHERE (area = 0) AND (zone = 19)
) AS s3 USING (person)
WHERE p.s2w <= s3.when

┌─count()─┐
│   11592 │
└─────────┘

1 rows in set. Elapsed: 0.068 sec. Processed 250.03 thousand rows, 8.00 MB (3.69 million rows/s., 117.98 MB/s.)

Una nota sull'IOPS. Durante il popolamento dei dati, JanusGraph ha generato un numero piuttosto elevato di IOPS (1000-1300 per quattro thread di popolamento dei dati) e IOWAIT era piuttosto elevato. Allo stesso tempo, ClickHouse ha generato un carico minimo sul sottosistema del disco.

conclusione

Abbiamo deciso di utilizzare ClickHouse per soddisfare questo tipo di richiesta. Possiamo sempre ottimizzare ulteriormente le query utilizzando visualizzazioni materializzate e parallelizzazione pre-elaborando il flusso di eventi utilizzando Apache Flink prima di caricarli in ClickHouse.

Le prestazioni sono così buone che probabilmente non dovremo nemmeno pensare di ruotare le tabelle a livello di codice. In precedenza, dovevamo eseguire i pivot dei dati recuperati da Vertica tramite caricamento su Apache Parquet.

Sfortunatamente, un altro tentativo di utilizzare un DBMS a grafo non ha avuto successo. Non ho trovato che JanusGraph avesse un ecosistema amichevole che rendesse facile mettersi al passo con il prodotto. Allo stesso tempo, per configurare il server, viene utilizzato il tradizionale metodo Java, che farà piangere lacrime di sangue alle persone che non hanno familiarità con Java:

host: 0.0.0.0
port: 8182
threadPoolWorker: 1
gremlinPool: 8
scriptEvaluationTimeout: 30000
channelizer: org.janusgraph.channelizers.JanusGraphWsAndHttpChannelizer

graphManager: org.janusgraph.graphdb.management.JanusGraphManager
graphs: {
  ConfigurationManagementGraph: conf/janusgraph-cql-configurationgraph.properties,
  airlines: conf/airlines.properties
}

scriptEngines: {
  gremlin-groovy: {
    plugins: { org.janusgraph.graphdb.tinkerpop.plugin.JanusGraphGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.tinkergraph.jsr223.TinkerGraphGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: {classImports: [java.lang.Math], methodImports: [java.lang.Math#*]},
               org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/airline-sample.groovy]}}}}

serializers:
# GraphBinary is here to replace Gryo and Graphson
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { serializeResultToString: true }}
  # Gryo and Graphson, latest versions
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  # Older serialization versions for backwards compatibility:
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoLiteMessageSerializerV1d0, config: {ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV2d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}

processors:
  - { className: org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor, config: { sessionTimeout: 28800000 }}
  - { className: org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor, config: { cacheExpirationTime: 600000, cacheMaxSize: 1000 }}

metrics: {
  consoleReporter: {enabled: false, interval: 180000},
  csvReporter: {enabled: false, interval: 180000, fileName: /tmp/gremlin-server-metrics.csv},
  jmxReporter: {enabled: false},
  slf4jReporter: {enabled: true, interval: 180000},
  gangliaReporter: {enabled: false, interval: 180000, addressingMode: MULTICAST},
  graphiteReporter: {enabled: false, interval: 180000}}
threadPoolBoss: 1
maxInitialLineLength: 4096
maxHeaderSize: 8192
maxChunkSize: 8192
maxContentLength: 65536
maxAccumulationBufferComponents: 1024
resultIterationBatchSize: 64
writeBufferHighWaterMark: 32768
writeBufferHighWaterMark: 65536
ssl: {
  enabled: false}

Sono riuscito a "mettere" accidentalmente la versione BerkeleyDB di JanusGraph.

La documentazione è piuttosto contorta in termini di indici, poiché la gestione degli indici richiede di eseguire uno sciamanesimo piuttosto strano in Groovy. Ad esempio, la creazione di un indice deve essere eseguita scrivendo il codice nella console Gremlin (che, tra l'altro, non funziona immediatamente). Dalla documentazione ufficiale di JanusGraph:

graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
age = mgmt.getPropertyKey('age')
mgmt.buildIndex('byNameComposite', Vertex.class).addKey(name).buildCompositeIndex()
mgmt.buildIndex('byNameAndAgeComposite', Vertex.class).addKey(name).addKey(age).buildCompositeIndex()
mgmt.commit()

//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameComposite').call()
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameAndAgeComposite').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byNameComposite"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("byNameAndAgeComposite"), SchemaAction.REINDEX).get()
mgmt.commit()

postfazione

In un certo senso, l'esperimento di cui sopra è un confronto tra caldo e morbido. Se ci pensi, un DBMS a grafo esegue altre operazioni per ottenere gli stessi risultati. Tuttavia, come parte dei test, ho anche condotto un esperimento con una richiesta del tipo:

g.V().hasLabel('ZoneStep').has('id',0)
    .repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',1)).count().next()

che riflette la distanza percorribile a piedi. Tuttavia, anche su tali dati, il grafico DBMS ha mostrato risultati che andavano oltre pochi secondi... Ciò, ovviamente, è dovuto al fatto che c'erano percorsi come 0 -> X -> Y ... -> 1, controllato anche dal motore grafico.

Anche per una query del tipo:

g.V().hasLabel('ZoneStep').has('id',0).out().has('id',1)).count().next()

Non sono riuscito a ottenere una risposta produttiva con un tempo di elaborazione inferiore a un secondo.

La morale della storia è che una bella idea e un modello paradigmatico non portano al risultato desiderato, cosa che viene dimostrata con efficienza molto maggiore usando l'esempio di ClickHouse. Il caso d'uso presentato in questo articolo è un chiaro anti-modello per i DBMS a grafo, sebbene sembri adatto per la modellazione nel loro paradigma.

Fonte: habr.com

Aggiungi un commento