An experiment to test the applicability of the JanusGraph graph DBMS for solving the problem of finding suitable paths

An experiment to test the applicability of the JanusGraph graph DBMS for solving the problem of finding suitable paths

Hi all. We are developing a product for offline traffic analysis. The project has a task related to the statistical analysis of the movement paths of visitors by region.

As part of this task, users can ask the system queries of the following form:

  • how many visitors passed from area "A" to area "B";
  • how many visitors passed from area "A" to area "B" through area "C" and then through area "D";
  • how long it took for a particular type of visitor to travel from area "A" to area "B".

and a number of similar analytical queries.

The movement of the visitor through the regions is a directed graph. After reading the Internet, I found that graph DBMS are also used for analytical reports. I had a desire to see how graph DBMSs will cope with such requests (TLDR; poorly).

I chose to use DBMS Janus Graph, as an outstanding representative of an open-source graph DBMS, which relies on a stack of mature technologies that (in my opinion) should provide it with decent operational characteristics:

  • storage backend BerkeleyDB, Apache Cassandra, Scylla;
  • complex indexes can be stored in Lucene, Elasticsearch, Solr.

The JanusGraph authors write that it is suitable for both OLTP and OLAP.

I have worked with BerkeleyDB, Apache Cassandra, Scylla, and ES, and these products are often used in our systems, so I was optimistic about testing this graph DBMS. I found it strange to choose BerkeleyDB over RocksDB, but it probably has to do with the transaction requirements. In any case, for scalable, product use, it is suggested to use a Cassandra or Scylla backend.

I did not consider Neo4j, because clustering requires a commercial version, that is, the product is not open.

Graph DBMS say: "If something looks like a graph, treat it like a graph!" - beauty!

First, I drew a graph, which is made according to the canons of graph DBMS:

An experiment to test the applicability of the JanusGraph graph DBMS for solving the problem of finding suitable paths

There is an essence Zoneresponsible for the region. If ZoneStep belongs to this Zone, then he refers to it. On essence Area, ZoneTrack, Person do not pay attention, they belong to the domain and are not considered in the test. In total, for such a graph structure, a chain search query would look like this:

g.V().hasLabel('Zone').has('id',0).in_()
       .repeat(__.out()).until(__.out().hasLabel('Zone').has('id',19)).count().next()

What in Russian is something like this: find a Zone with ID=0, take all the vertices from which an edge goes to it (ZoneStep), stomp without returning back until you find such ZoneStep, from which an edge goes to Zone with ID=19, count the number such chains.

I do not pretend to know all the intricacies of graph search, but this query was generated based on this book (https://kelvinlawrence.net/book/Gremlin-Graph-Guide.html).

I uploaded 50 thousand tracks from 3 to 20 points in length to the JanusGraph graph database using the BerkeleyDB backend, created indexes according to management.

Python download script:


from random import random
from time import time

from init import g, graph

if __name__ == '__main__':

    points = []
    max_zones = 19
    zcache = dict()
    for i in range(0, max_zones + 1):
        zcache[i] = g.addV('Zone').property('id', i).next()

    startZ = zcache[0]
    endZ = zcache[max_zones]

    for i in range(0, 10000):

        if not i % 100:
            print(i)

        start = g.addV('ZoneStep').property('time', int(time())).next()
        g.V(start).addE('belongs').to(startZ).iterate()

        while True:
            pt = g.addV('ZoneStep').property('time', int(time())).next()
            end_chain = random()
            if end_chain < 0.3:
                g.V(pt).addE('belongs').to(endZ).iterate()
                g.V(start).addE('goes').to(pt).iterate()
                break
            else:
                zone_id = int(random() * max_zones)
                g.V(pt).addE('belongs').to(zcache[zone_id]).iterate()
                g.V(start).addE('goes').to(pt).iterate()

            start = pt

    count = g.V().count().next()
    print(count)

Used VM with 4 cores and 16 GB RAM on SSD. JanusGraph was deployed with the following command:

docker run --name janusgraph -p8182:8182 janusgraph/janusgraph:latest

In this case, the data and indexes that are used for exact match searches are stored in BerkeleyDB. Having executed the query given earlier, I received a time equal to several tens of seconds.

By running the above 4 scripts in parallel, I managed to turn the DBMS into a pumpkin with a merry stream of Java stack traces (and we all love to read Java stack traces) in Docker logs.

On reflection, I decided to simplify the diagram of the graph to the following:

An experiment to test the applicability of the JanusGraph graph DBMS for solving the problem of finding suitable paths

Deciding that entity attribute lookups will be faster than edge lookups. As a result, my request turned into the following:

g.V().hasLabel('ZoneStep').has('id',0).repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',19)).count().next()

What in Russian is something like this: find a ZoneStep with ID=0, stomp without going back until you find a ZoneStep with ID=19, count the number of such chains.

I also simplified the loading script above, in order not to create unnecessary links, limited to attributes.

The request was still running for a few seconds, which was completely unacceptable for our task, since for the purposes of AdHoc requests of an arbitrary kind, this was not at all suitable.

I tried deploying JanusGraph using Scylla as the fastest implementation of Cassandra, but that didn't make any significant performance difference either.

So even though "it looks like a graph" I haven't been able to get the graph DBMS to process it quickly. I quite assume that I do not know something and it is possible to make JanusGraph perform this search in a split second, however, I did not succeed.

Since I still needed to solve the problem, I started thinking about JOINs and Pivot tables, which did not inspire optimism in terms of elegance, but could be quite a viable option in practice.

Our project already uses Apache ClickHouse, so I decided to test my research on this analytical DBMS.

Deployed ClickHouse according to a simple recipe:

sudo docker run -d --name clickhouse_1 
     --ulimit nofile=262144:262144 
     -v /opt/clickhouse/log:/var/log/clickhouse-server 
     -v /opt/clickhouse/data:/var/lib/clickhouse 
     yandex/clickhouse-server

Created in it a database and a table of the form:

CREATE TABLE 
db.steps (`area` Int64, `when` DateTime64(1, 'Europe/Moscow') DEFAULT now64(), `zone` Int64, `person` Int64) 
ENGINE = MergeTree() ORDER BY (area, zone, person) SETTINGS index_granularity = 8192

Filled it with data with the following script:

from time import time

from clickhouse_driver import Client
from random import random

client = Client('vm-12c2c34c-df68-4a98-b1e5-a4d1cef1acff.domain',
                database='db',
                password='secret')

max = 20

for r in range(0, 100000):

    if r % 1000 == 0:
        print("CNT: {}, TS: {}".format(r, time()))

    data = [{
            'area': 0,
            'zone': 0,
            'person': r
        }]

    while True:
        if random() < 0.3:
            break

        data.append({
                'area': 0,
                'zone': int(random() * (max - 2)) + 1,
                'person': r
            })

    data.append({
            'area': 0,
            'zone': max - 1,
            'person': r
        })

    client.execute(
        'INSERT INTO steps (area, zone, person) VALUES',
        data
    )

Since the inserts come in batches, filling was much faster than for JanusGraph.

Designed two queries using JOIN. To go from point A to point B:

SELECT s1.person AS person,
       s1.zone,
       s1.when,
       s2.zone,
       s2.when
FROM
  (SELECT *
   FROM steps
   WHERE (area = 0)
     AND (zone = 0)) AS s1 ANY INNER JOIN
  (SELECT *
   FROM steps AS s2
   WHERE (area = 0)
     AND (zone = 19)) AS s2 USING person
WHERE s1.when <= s2.when

To go through 3 points:

SELECT s3.person,
       s1z,
       s1w,
       s2z,
       s2w,
       s3.zone,
       s3.when
FROM
  (SELECT s1.person AS person,
          s1.zone AS s1z,
          s1.when AS s1w,
          s2.zone AS s2z,
          s2.when AS s2w
   FROM
     (SELECT *
      FROM steps
      WHERE (area = 0)
        AND (zone = 0)) AS s1 ANY INNER JOIN
     (SELECT *
      FROM steps AS s2
      WHERE (area = 0)
        AND (zone = 3)) AS s2 USING person
   WHERE s1.when <= s2.when) p ANY INNER JOIN
  (SELECT *
   FROM steps
   WHERE (area = 0)
     AND (zone = 19)) AS s3 USING person
WHERE p.s2w <= s3.when

Requests, of course, look pretty scary, for real use, you need to do a software binding-generator. However, they work and work quickly. Both the first and second requests are completed in less than 0.1 sec. Here is an example of the query execution time for a count(*) pass through 3 points:

SELECT count(*)
FROM 
(
    SELECT 
        s1.person AS person, 
        s1.zone AS s1z, 
        s1.when AS s1w, 
        s2.zone AS s2z, 
        s2.when AS s2w
    FROM 
    (
        SELECT *
        FROM steps
        WHERE (area = 0) AND (zone = 0)
    ) AS s1
    ANY INNER JOIN 
    (
        SELECT *
        FROM steps AS s2
        WHERE (area = 0) AND (zone = 3)
    ) AS s2 USING (person)
    WHERE s1.when <= s2.when
) AS p
ANY INNER JOIN 
(
    SELECT *
    FROM steps
    WHERE (area = 0) AND (zone = 19)
) AS s3 USING (person)
WHERE p.s2w <= s3.when

β”Œβ”€count()─┐
β”‚   11592 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1 rows in set. Elapsed: 0.068 sec. Processed 250.03 thousand rows, 8.00 MB (3.69 million rows/s., 117.98 MB/s.)

A note about IOPS. When filling data, JanusGraph generated a fairly high number of IOPS (1000-1300 for four data filling threads), and IOWAIT was quite high. At the same time, ClickHouse generated minimal load on the disk subsystem.

Conclusion

We decided to use ClickHouse to serve requests of this type. We can always further optimize queries using materialized views and parallelization by pre-processing the stream of events with Apache Flink before loading them into ClickHouse.

The performance is so good that we probably don't even have to think about table pivots programmatically. Previously, we had to do pivots of data retrieved from Vertica through uploading to Apache Parquet.

Unfortunately, another attempt to use a graph DBMS was not successful. I did not find that JanusGraph has a friendly ecosystem that allows you to quickly get comfortable with the product. At the same time, the traditional Java-way is used to configure the server, which will make people who are not familiar with Java cry tears of blood:

host: 0.0.0.0
port: 8182
threadPoolWorker: 1
gremlinPool: 8
scriptEvaluationTimeout: 30000
channelizer: org.janusgraph.channelizers.JanusGraphWsAndHttpChannelizer

graphManager: org.janusgraph.graphdb.management.JanusGraphManager
graphs: {
  ConfigurationManagementGraph: conf/janusgraph-cql-configurationgraph.properties,
  airlines: conf/airlines.properties
}

scriptEngines: {
  gremlin-groovy: {
    plugins: { org.janusgraph.graphdb.tinkerpop.plugin.JanusGraphGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.server.jsr223.GremlinServerGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.tinkergraph.jsr223.TinkerGraphGremlinPlugin: {},
               org.apache.tinkerpop.gremlin.jsr223.ImportGremlinPlugin: {classImports: [java.lang.Math], methodImports: [java.lang.Math#*]},
               org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin: {files: [scripts/airline-sample.groovy]}}}}

serializers:
# GraphBinary is here to replace Gryo and Graphson
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphBinaryMessageSerializerV1, config: { serializeResultToString: true }}
  # Gryo and Graphson, latest versions
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  # Older serialization versions for backwards compatibility:
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0, config: { serializeResultToString: true }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GryoLiteMessageSerializerV1d0, config: {ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV2d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerGremlinV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}
  - { className: org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistryV1d0] }}

processors:
  - { className: org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor, config: { sessionTimeout: 28800000 }}
  - { className: org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor, config: { cacheExpirationTime: 600000, cacheMaxSize: 1000 }}

metrics: {
  consoleReporter: {enabled: false, interval: 180000},
  csvReporter: {enabled: false, interval: 180000, fileName: /tmp/gremlin-server-metrics.csv},
  jmxReporter: {enabled: false},
  slf4jReporter: {enabled: true, interval: 180000},
  gangliaReporter: {enabled: false, interval: 180000, addressingMode: MULTICAST},
  graphiteReporter: {enabled: false, interval: 180000}}
threadPoolBoss: 1
maxInitialLineLength: 4096
maxHeaderSize: 8192
maxChunkSize: 8192
maxContentLength: 65536
maxAccumulationBufferComponents: 1024
resultIterationBatchSize: 64
writeBufferHighWaterMark: 32768
writeBufferHighWaterMark: 65536
ssl: {
  enabled: false}

I managed to accidentally "put" the BerkeleyDB version of JanusGraph.

The documentation is rather crooked in terms of indexes, since when managing indexes, you need to do some rather strange shamanism on Groovy. For example, creating an index should be done by writing code in the Gremlin console (which doesn't work out of the box, by the way). From JanusGraph official documentation:

graph.tx().rollback() //Never create new indexes while a transaction is active
mgmt = graph.openManagement()
name = mgmt.getPropertyKey('name')
age = mgmt.getPropertyKey('age')
mgmt.buildIndex('byNameComposite', Vertex.class).addKey(name).buildCompositeIndex()
mgmt.buildIndex('byNameAndAgeComposite', Vertex.class).addKey(name).addKey(age).buildCompositeIndex()
mgmt.commit()

//Wait for the index to become available
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameComposite').call()
ManagementSystem.awaitGraphIndexStatus(graph, 'byNameAndAgeComposite').call()
//Reindex the existing data
mgmt = graph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("byNameComposite"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("byNameAndAgeComposite"), SchemaAction.REINDEX).get()
mgmt.commit()

Afterword

In a sense, the above experiment is a comparison of warm with soft. If you think about it, a graph DBMS performs other operations to get the same results. However, as part of the tests, I also experimented with a query like:

g.V().hasLabel('ZoneStep').has('id',0)
    .repeat(__.out().simplePath()).until(__.hasLabel('ZoneStep').has('id',1)).count().next()

which reflects walking distance. However, even on such data, the graph DBMS showed a result that went beyond a few seconds ... This, of course, is due to the fact that there were paths of the form 0 -> X -> Y ... -> 1, which the graph engine also checked.

Even for a view request:

g.V().hasLabel('ZoneStep').has('id',0).out().has('id',1)).count().next()

I was unable to get a productive response with a processing time of less than a second.

The moral of the fable is that a beautiful idea and paradigmatic modeling do not lead to the desired result, which is demonstrated with a much higher efficiency using the example of ClickHouse. The use case presented in this article is a clear anti-pattern for graph DBMS, although it looks like it is suitable for modeling in their paradigm.

Source: habr.com

Add a comment