Un experimento que prueba la aplicabilidad del DBMS gráfico JanusGraph para resolver el problema de encontrar rutas adecuadas

Un experimento que prueba la aplicabilidad del DBMS gráfico JanusGraph para resolver el problema de encontrar rutas adecuadas

Hola a todos. Estamos desarrollando un producto para el análisis del tráfico fuera de línea. El proyecto tiene una tarea relacionada con el análisis estadístico de las rutas de los visitantes entre regiones.

Como parte de esta tarea, los usuarios pueden realizar al sistema consultas del siguiente tipo:

  • cuántos visitantes pasaron del área "A" al área "B";
  • cuántos visitantes pasaron del área "A" al área "B" pasando por el área "C" y luego por el área "D";
  • cuánto tiempo le tomó a un determinado tipo de visitante viajar del área “A” al área “B”.

y una serie de consultas analíticas similares.

El movimiento del visitante a través de áreas es un gráfico dirigido. Después de leer Internet, descubrí que los DBMS gráficos también se utilizan para informes analíticos. Tenía el deseo de ver cómo los DBMS gráficos harían frente a este tipo de consultas (TL; DR; mal).

Elegí usar el DBMS JanusGraph, como un destacado representante del DBMS gráfico de código abierto, que se basa en una pila de tecnologías maduras, que (en mi opinión) deberían proporcionarle características operativas decentes:

  • backend de almacenamiento BerkeleyDB, Apache Cassandra, Scylla;
  • Los índices complejos se pueden almacenar en Lucene, Elasticsearch, Solr.

Los autores de JanusGraph escriben que es adecuado tanto para OLTP como para OLAP.

He trabajado con BerkeleyDB, Apache Cassandra, Scylla y ES, y estos productos se utilizan a menudo en nuestros sistemas, por lo que me sentí optimista acerca de probar este DBMS gráfico. Me pareció extraño elegir BerkeleyDB en lugar de RocksDB, pero probablemente se deba a los requisitos de transacción. En cualquier caso, para el uso de productos escalables, se sugiere utilizar un backend en Cassandra o Scylla.

No consideré Neo4j porque el clustering requiere una versión comercial, es decir, el producto no es de código abierto.

Los DBMS de gráficos dicen: "Si parece un gráfico, trátelo como un gráfico". - ¡belleza!

Primero, dibujé un gráfico, que se hizo exactamente de acuerdo con los cánones de los DBMS gráficos:

Un experimento que prueba la aplicabilidad del DBMS gráfico JanusGraph para resolver el problema de encontrar rutas adecuadas

Hay una esencia Zone, responsable del área. Si ZoneStep pertenece a esto Zone, entonces se refiere a ello. En esencia Area, ZoneTrack, Person No prestes atención, pertenecen al dominio y no se consideran parte de la prueba. En total, una consulta de búsqueda en cadena para dicha estructura gráfica se vería así:

g.V().hasLabel('Zone').has('id',0).in_()
       .repeat(__.out()).until(__.out().hasLabel('Zone').has('id',19)).count().next()

Lo que en ruso es algo como esto: encuentra una Zona con ID=0, toma todos los vértices desde los cuales va un borde hacia ella (ZoneStep), pisa fuerte sin retroceder hasta encontrar esos ZoneSteps desde los cuales hay un borde hacia la Zona con ID = 19, cuente el número de dichas cadenas.

No pretendo conocer todas las complejidades de la búsqueda en gráficos, pero esta consulta se generó en base a este libro (https://kelvinlawrence.net/book/Gremlin-Graph-Guide.html).

Cargué 50 mil pistas con una longitud de entre 3 y 20 puntos en una base de datos de gráficos JanusGraph usando el backend BerkeleyDB, creé índices de acuerdo con administración.

Script de descarga de Python:


from random import random
from time import time

from init import g, graph

if __name__ == '__main__':

    points = []
    max_zones = 19
    zcache = dict()
    for i in range(0, max_zones + 1):
        zcache[i] = g.addV('Zone').property('id', i).next()

    startZ = zcache[0]
    endZ = zcache[max_zones]

    for i in range(0, 10000):

        if not i % 100:
            print(i)

        start = g.addV('ZoneStep').property('time', int(time())).next()
        g.V(start).addE('belongs').to(startZ).iterate()

        while True:
            pt = g.addV('ZoneStep').property('time', int(time())).next()
            end_chain = random()
            if end_chain < 0.3:
                g.V(pt).addE('belongs').to(endZ).iterate()
                g.V(start).addE('goes').to(pt).iterate()
                break
            else:
                zone_id = int(random() * max_zones)
                g.V(pt).addE('belongs').to(zcache[zone_id]).iterate()
                g.V(start).addE('goes').to(pt).iterate()

            start = pt

    count = g.V().count().next()
    print(count)

Usamos una VM con 4 núcleos y 16 GB de RAM en un SSD. JanusGraph se implementó usando este comando:

docker run --name janusgraph -p8182:8182 janusgraph/janusgraph:latest

En este caso, los datos y los índices que se utilizan para las búsquedas de coincidencias exactas se almacenan en BerkeleyDB. Habiendo ejecutado la solicitud dada anteriormente, recibí un tiempo igual a varias decenas de segundos.

Al ejecutar los 4 scripts anteriores en paralelo, logré convertir el DBMS en una calabaza con un alegre flujo de seguimientos de pila de Java (y a todos nos encanta leer seguimientos de pila de Java) en los registros de Docker.

Después de pensarlo un poco, decidí simplificar el diagrama gráfico a lo siguiente:

Un experimento que prueba la aplicabilidad del DBMS gráfico JanusGraph para resolver el problema de encontrar rutas adecuadas

Decidir que buscar por atributos de entidad sería más rápido que buscar por aristas. Como resultado, mi solicitud resultó ser la siguiente:

g.V().hasLabel('ZoneStep').has('id',0).repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',19)).count().next()

Lo que en ruso es algo como esto: encuentra ZoneStep con ID=0, pisa fuerte sin retroceder hasta encontrar ZoneStep con ID=19, cuenta el número de dichas cadenas.

También simplifiqué el script de carga indicado anteriormente para no crear conexiones innecesarias, limitándome a los atributos.

La solicitud todavía tardó varios segundos en completarse, lo cual era completamente inaceptable para nuestra tarea, ya que no era en absoluto adecuado para los propósitos de solicitudes AdHoc de ningún tipo.

Intenté implementar JanusGraph usando Scylla como la implementación más rápida de Cassandra, pero esto tampoco generó ningún cambio significativo en el rendimiento.

Entonces, a pesar de que "parece un gráfico", no pude lograr que el DBMS del gráfico lo procesara rápidamente. Asumo plenamente que hay algo que no sé y que se puede hacer que JanusGraph realice esta búsqueda en una fracción de segundo, sin embargo, no pude hacerlo.

Como el problema aún estaba por resolver, comencé a pensar en JOINs y Pivots de tablas, que no inspiraban optimismo en términos de elegancia, pero que podrían ser una opción completamente viable en la práctica.

Nuestro proyecto ya utiliza Apache ClickHouse, así que decidí probar mi investigación en este DBMS analítico.

Implementé ClickHouse usando una receta simple:

sudo docker run -d --name clickhouse_1 
     --ulimit nofile=262144:262144 
     -v /opt/clickhouse/log:/var/log/clickhouse-server 
     -v /opt/clickhouse/data:/var/lib/clickhouse 
     yandex/clickhouse-server

Creé una base de datos y una tabla como esta:

CREATE TABLE 
db.steps (`area` Int64, `when` DateTime64(1, 'Europe/Moscow') DEFAULT now64(), `zone` Int64, `person` Int64) 
ENGINE = MergeTree() ORDER BY (area, zone, person) SETTINGS index_granularity = 8192

Lo llené con datos usando el siguiente script:

from time import time

from clickhouse_driver import Client
from random import random

client = Client('vm-12c2c34c-df68-4a98-b1e5-a4d1cef1acff.domain',
                database='db',
                password='secret')

max = 20

for r in range(0, 100000):

    if r % 1000 == 0:
        print("CNT: {}, TS: {}".format(r, time()))

    data = [{
            'area': 0,
            'zone': 0,
            'person': r
        }]

    while True:
        if random() < 0.3:
            break

        data.append({
                'area': 0,
                'zone': int(random() * (max - 2)) + 1,
                'person': r
            })

    data.append({
            'area': 0,
            'zone': max - 1,
            'person': r
        })

    client.execute(
        'INSERT INTO steps (area, zone, person) VALUES',
        data
    )

Dado que los insertos vienen en lotes, el llenado fue mucho más rápido que con JanusGraph.

Construyó dos consultas usando JOIN. Para pasar del punto A al punto B:

SELECT s1.person AS person,
       s1.zone,
       s1.when,
       s2.zone,
       s2.when
FROM
  (SELECT *
   FROM steps
   WHERE (area = 0)
     AND (zone = 0)) AS s1 ANY INNER JOIN
  (SELECT *
   FROM steps AS s2
   WHERE (area = 0)
     AND (zone = 19)) AS s2 USING person
WHERE s1.when <= s2.when

Para pasar por 3 puntos:

SELECT s3.person,
       s1z,
       s1w,
       s2z,
       s2w,
       s3.zone,
       s3.when
FROM
  (SELECT s1.person AS person,
          s1.zone AS s1z,
          s1.when AS s1w,
          s2.zone AS s2z,
          s2.when AS s2w
   FROM
     (SELECT *
      FROM steps
      WHERE (area = 0)
        AND (zone = 0)) AS s1 ANY INNER JOIN
     (SELECT *
      FROM steps AS s2
      WHERE (area = 0)
        AND (zone = 3)) AS s2 USING person
   WHERE s1.when <= s2.when) p ANY INNER JOIN
  (SELECT *
   FROM steps
   WHERE (area = 0)
     AND (zone = 19)) AS s3 USING person
WHERE p.s2w <= s3.when

Las solicitudes, por supuesto, parecen bastante aterradoras; para un uso real, es necesario crear un arnés generador de software. Sin embargo, funcionan y funcionan rápidamente. Tanto la primera como la segunda solicitud se completan en menos de 0.1 segundos. A continuación se muestra un ejemplo del tiempo de ejecución de la consulta para el recuento (*) que pasa por 3 puntos:

SELECT count(*)
FROM 
(
    SELECT 
        s1.person AS person, 
        s1.zone AS s1z, 
        s1.when AS s1w, 
        s2.zone AS s2z, 
        s2.when AS s2w
    FROM 
    (
        SELECT *
        FROM steps
        WHERE (area = 0) AND (zone = 0)
    ) AS s1
    ANY INNER JOIN 
    (
        SELECT *
        FROM steps AS s2
        WHERE (area = 0) AND (zone = 3)
    ) AS s2 USING (person)
    WHERE s1.when <= s2.when
) AS p
ANY INNER JOIN 
(
    SELECT *
    FROM steps
    WHERE (area = 0) AND (zone = 19)
) AS s3 USING (person)
WHERE p.s2w <= s3.when

┌─count()─┐
│   11592 │
└─────────┘

1 rows in set. Elapsed: 0.068 sec. Processed 250.03 thousand rows, 8.00 MB (3.69 million rows/s., 117.98 MB/s.)

Una nota sobre IOPS. Al completar los datos, JanusGraph generó una cantidad bastante alta de IOPS (1000-1300 para cuatro subprocesos de población de datos) y IOWAIT fue bastante alta. Al mismo tiempo, ClickHouse generó una carga mínima en el subsistema de disco.

Conclusión

Decidimos utilizar ClickHouse para atender este tipo de solicitudes. Siempre podemos optimizar aún más las consultas utilizando vistas materializadas y paralelización preprocesando el flujo de eventos usando Apache Flink antes de cargarlas en ClickHouse.

El rendimiento es tan bueno que probablemente ni siquiera tendremos que pensar en tablas dinámicas mediante programación. Anteriormente, teníamos que realizar pivotes de los datos recuperados de Vertica mediante la carga en Apache Parquet.

Desafortunadamente, otro intento de utilizar un DBMS gráfico no tuvo éxito. No encontré que JanusGraph tuviera un ecosistema amigable que facilitara la puesta al día con el producto. Al mismo tiempo, para configurar el servidor se utiliza el método tradicional de Java, lo que hará llorar lágrimas de sangre a las personas que no están familiarizadas con Java:

host: 0.0.0.0
port: 8182
threadPoolWorker: 1
gremlinPool: 8
scriptEvaluationTimeout: 30000
channelizer: org.janusgraph.channelizers.JanusGraphWsAndHttpChannelizer

graphManager: org.janusgraph.graphdb.management.JanusGraphManager
graphs: {
  ConfigurationManagementGraph: conf/janusgraph-cql-configurationgraph.properties,
  airlines: conf/airlines.properties
}

scriptEngines: {
  gremlin-groovy: {
    plugins: { org.janusgraph.graphdb.tinkerpop.plugin.JanusGraphGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.tinkergraph.jsr223.TinkerGraphGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: {classImports: [java.lang.Math], methodImports: [java.lang.Math#*]},
               org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/airline-sample.groovy]}}}}

serializers:
# GraphBinary is here to replace Gryo and Graphson
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { serializeResultToString: true }}
  # Gryo and Graphson, latest versions
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  # Older serialization versions for backwards compatibility:
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoLiteMessageSerializerV1d0, config: {ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV2d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}

processors:
  - { className: org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor, config: { sessionTimeout: 28800000 }}
  - { className: org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor, config: { cacheExpirationTime: 600000, cacheMaxSize: 1000 }}

metrics: {
  consoleReporter: {enabled: false, interval: 180000},
  csvReporter: {enabled: false, interval: 180000, fileName: /tmp/gremlin-server-metrics.csv},
  jmxReporter: {enabled: false},
  slf4jReporter: {enabled: true, interval: 180000},
  gangliaReporter: {enabled: false, interval: 180000, addressingMode: MULTICAST},
  graphiteReporter: {enabled: false, interval: 180000}}
threadPoolBoss: 1
maxInitialLineLength: 4096
maxHeaderSize: 8192
maxChunkSize: 8192
maxContentLength: 65536
maxAccumulationBufferComponents: 1024
resultIterationBatchSize: 64
writeBufferHighWaterMark: 32768
writeBufferHighWaterMark: 65536
ssl: {
  enabled: false}

Me las arreglé para "poner" accidentalmente la versión BerkeleyDB de JanusGraph.

La documentación es bastante torcida en términos de índices, ya que administrar índices requiere que realices algún chamanismo bastante extraño en Groovy. Por ejemplo, la creación de un índice debe realizarse escribiendo código en la consola Gremlin (que, por cierto, no funciona de fábrica). De la documentación oficial de JanusGraph:

graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
age = mgmt.getPropertyKey('age')
mgmt.buildIndex('byNameComposite', Vertex.class).addKey(name).buildCompositeIndex()
mgmt.buildIndex('byNameAndAgeComposite', Vertex.class).addKey(name).addKey(age).buildCompositeIndex()
mgmt.commit()

//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameComposite').call()
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameAndAgeComposite').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byNameComposite"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("byNameAndAgeComposite"), SchemaAction.REINDEX).get()
mgmt.commit()

Epílogo

En cierto sentido, el experimento anterior es una comparación entre lo cálido y lo suave. Si lo piensas bien, un DBMS gráfico realiza otras operaciones para obtener los mismos resultados. Sin embargo, como parte de las pruebas, también realicé un experimento con una solicitud como:

g.V().hasLabel('ZoneStep').has('id',0)
    .repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',1)).count().next()

que refleja la distancia a pie. Sin embargo, incluso con tales datos, el gráfico DBMS mostró resultados que fueron más allá de unos pocos segundos... Esto, por supuesto, se debe al hecho de que había caminos como 0 -> X -> Y ... -> 1, que el motor gráfico también comprobó.

Incluso para una consulta como:

g.V().hasLabel('ZoneStep').has('id',0).out().has('id',1)).count().next()

No pude obtener una respuesta productiva con un tiempo de procesamiento de menos de un segundo.

La moraleja de la historia es que una idea hermosa y un modelado paradigmático no conducen al resultado deseado, lo que se demuestra con mucha mayor eficiencia en el ejemplo de ClickHouse. El caso de uso presentado en este artículo es un antipatrón claro para los DBMS gráficos, aunque parece adecuado para modelar en su paradigma.

Fuente: habr.com

Añadir un comentario