అపాచీ బాణంతో కాలమ్ డేటాను ప్రసారం చేస్తోంది

వ్యాసం యొక్క అనువాదం కోర్సు యొక్క విద్యార్థుల కోసం ప్రత్యేకంగా తయారు చేయబడింది డేటా ఇంజనీర్.

అపాచీ బాణంతో కాలమ్ డేటాను ప్రసారం చేస్తోంది

గత కొన్ని వారాలుగా, మేము నాంగ్ లి కు జోడించబడింది అపాచీ బాణం బైనరీ స్ట్రీమ్ ఫార్మాట్, ఇప్పటికే ఉన్న యాదృచ్ఛిక యాక్సెస్/IPC ఫైల్ ఫార్మాట్‌కు అనుబంధంగా. మాకు జావా మరియు C++ అమలులు మరియు పైథాన్ బైండింగ్‌లు ఉన్నాయి. ఈ ఆర్టికల్‌లో, ఫార్మాట్ ఎలా పనిచేస్తుందో వివరిస్తాను మరియు మీరు పాండాస్ డేటాఫ్రేమ్ కోసం చాలా ఎక్కువ డేటా నిర్గమాంశను ఎలా సాధించవచ్చో చూపిస్తాను.

నిలువు వరుస డేటాను ప్రసారం చేస్తోంది

పెద్ద ట్యాబులర్ డేటాసెట్‌లను అడ్డు వరుస లేదా రికార్డ్-ఆధారిత ఫార్మాట్ నుండి కాలమ్ ఫార్మాట్‌కి తరలించడం వల్ల ఎక్కువ ఖర్చు అవుతుందనేది బాణం వినియోగదారుల నుండి నాకు వచ్చే సాధారణ ప్రశ్న. బహుళ-గిగాబైట్ డేటాసెట్‌ల కోసం, మెమరీలో లేదా డిస్క్‌లో ట్రాన్స్‌పోజ్ చేయడం చాలా ఎక్కువ.

స్ట్రీమింగ్ డేటా కోసం, సోర్స్ డేటా అడ్డు వరుస లేదా నిలువు వరుస అయినా, ఒక ఎంపిక ఏమిటంటే, అడ్డు వరుసల చిన్న బ్యాచ్‌లను పంపడం, ప్రతి ఒక్కటి అంతర్గతంగా నిలువు లేఅవుట్‌ను కలిగి ఉంటుంది.

అపాచీ బాణంలో, టేబుల్ చంక్‌ను సూచించే ఇన్-మెమరీ స్తంభ శ్రేణుల సేకరణను రికార్డ్ బ్యాచ్ అంటారు. తార్కిక పట్టిక యొక్క ఒకే డేటా నిర్మాణాన్ని సూచించడానికి, మీరు రికార్డుల యొక్క అనేక ప్యాకేజీలను సేకరించవచ్చు.

ఇప్పటికే ఉన్న "రాండమ్ యాక్సెస్" ఫైల్ ఫార్మాట్‌లో, మేము ఫైల్ చివరిలో టేబుల్ స్కీమా మరియు బ్లాక్ లేఅవుట్‌ని కలిగి ఉన్న మెటాడేటాను వ్రాస్తాము, ఇది డేటాసెట్ నుండి ఏదైనా బ్యాచ్ రికార్డ్‌లను లేదా ఏదైనా కాలమ్‌ను చాలా చౌకగా ఎంచుకోవడానికి మిమ్మల్ని అనుమతిస్తుంది. స్ట్రీమింగ్ ఫార్మాట్‌లో, మేము సందేశాల శ్రేణిని పంపుతాము: ఒక స్కీమా, ఆపై ఒకటి లేదా అంతకంటే ఎక్కువ బ్యాచ్‌ల రికార్డ్‌లు.

విభిన్న ఫార్మాట్‌లు ఈ చిత్రం వలె కనిపిస్తాయి:

అపాచీ బాణంతో కాలమ్ డేటాను ప్రసారం చేస్తోంది

PyArrowలో డేటా స్ట్రీమింగ్: అప్లికేషన్

ఇది ఎలా పని చేస్తుందో మీకు చూపించడానికి, నేను ఒకే స్ట్రీమ్ భాగంను సూచించే ఉదాహరణ డేటాసెట్‌ను సృష్టిస్తాను:

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

ఇప్పుడు, మనం 1 GB డేటాను వ్రాయాలనుకుంటున్నాము, ఇందులో ఒక్కొక్కటి 1 MB భాగాలు, మొత్తం 1024 భాగాలు ఉంటాయి. ముందుగా, 1 నిలువు వరుసలతో మొదటి 16MB డేటా ఫ్రేమ్‌ని క్రియేట్ చేద్దాం:

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

అప్పుడు నేను వాటిని మార్చుకుంటాను pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

ఇప్పుడు నేను RAMకి వ్రాసి సృష్టించే అవుట్‌పుట్ స్ట్రీమ్‌ను సృష్టిస్తాను StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

అప్పుడు మేము 1024 భాగాలను వ్రాస్తాము, ఇది చివరికి 1GB డేటాసెట్‌ను రూపొందిస్తుంది:

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

మేము RAMలో వ్రాసినందున, మేము మొత్తం స్ట్రీమ్‌ను ఒక బఫర్‌లో పొందవచ్చు:

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

ఈ డేటా మెమరీలో ఉన్నందున, బాణం రికార్డుల బ్యాచ్‌ల రీడింగ్ సున్నా-కాపీ ఆపరేషన్ ద్వారా పొందబడుతుంది. నేను StreamReaderని తెరుస్తాను, డేటాను చదివాను pyarrow.Tableఆపై వాటిని మార్చండి DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

ఇవన్నీ, వాస్తవానికి, మంచివి, కానీ మీకు ప్రశ్నలు ఉండవచ్చు. ఇది ఎంత వేగంగా జరుగుతుంది? పాండాస్ డేటాఫ్రేమ్ రిట్రీవల్ పనితీరును భాగం పరిమాణం ఎలా ప్రభావితం చేస్తుంది?

స్ట్రీమింగ్ పనితీరు

స్ట్రీమింగ్ భాగం పరిమాణం తగ్గుతున్నందున, అసమర్థమైన కాష్ యాక్సెస్ స్కీమ్‌ల కారణంగా పాండాలలో ఒక పక్కనే ఉన్న స్తంభాల డేటాఫ్రేమ్‌ని పునర్నిర్మించే ఖర్చు పెరుగుతుంది. C++ డేటా స్ట్రక్చర్‌లు మరియు శ్రేణులు మరియు వాటి మెమరీ బఫర్‌లతో పని చేయడం వల్ల కొంత ఓవర్‌హెడ్ కూడా ఉంది.

పైన పేర్కొన్న విధంగా 1 MB కోసం, నా ల్యాప్‌టాప్‌లో (క్వాడ్-కోర్ జియాన్ E3-1505M) ఇది ఇలా మారుతుంది:

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

7.75 1 MB భాగాల నుండి 1024 GB డేటాఫ్రేమ్‌ని పునరుద్ధరించడానికి సమర్థవంతమైన నిర్గమాంశ 1 Gb / s అని తేలింది. మేము పెద్ద లేదా చిన్న భాగాలను ఉపయోగిస్తే ఏమి జరుగుతుంది? మీరు పొందే ఫలితాలు ఇక్కడ ఉన్నాయి:

అపాచీ బాణంతో కాలమ్ డేటాను ప్రసారం చేస్తోంది

పనితీరు 256K నుండి 64K భాగాలకు గణనీయంగా పడిపోతుంది. 1MB భాగాలు కంటే 16MB భాగాలు వేగంగా ప్రాసెస్ చేయబడినందుకు నేను ఆశ్చర్యపోయాను. ఇది మరింత క్షుణ్ణంగా అధ్యయనం చేయడం మరియు ఇది సాధారణ పంపిణీ లేదా మరేదైనా ప్రమేయం ఉందా అని అర్థం చేసుకోవడం విలువ.

ఫార్మాట్ యొక్క ప్రస్తుత అమలులో, డేటా సూత్రప్రాయంగా కంప్రెస్ చేయబడదు, కాబట్టి మెమరీలో పరిమాణం మరియు "వైర్లో" సుమారుగా ఒకే విధంగా ఉంటుంది. భవిష్యత్తులో కుదింపు ఒక ఎంపికగా మారవచ్చు.

ఫలితం

స్ట్రీమింగ్ కాలమ్ డేటా పెద్ద డేటాసెట్‌లను చిన్న భాగాలలో పాండాలు వంటి కాలమ్ అనలిటిక్స్ సాధనాలకు బదిలీ చేయడానికి సమర్థవంతమైన మార్గం. వరుస-ఆధారిత నిల్వను ఉపయోగించే డేటా సేవలు మీ ప్రాసెసర్ యొక్క L2 మరియు L3 కాష్‌కి మరింత సౌకర్యవంతంగా ఉండే చిన్న చిన్న డేటా భాగాలను బదిలీ చేయగలవు మరియు బదిలీ చేయగలవు.

పూర్తి కోడ్

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

మూలం: www.habr.com

ఒక వ్యాఖ్యను జోడించండి