వ్యాసం యొక్క అనువాదం కోర్సు యొక్క విద్యార్థుల కోసం ప్రత్యేకంగా తయారు చేయబడింది
గత కొన్ని వారాలుగా, మేము
నిలువు వరుస డేటాను ప్రసారం చేస్తోంది
పెద్ద ట్యాబులర్ డేటాసెట్లను అడ్డు వరుస లేదా రికార్డ్-ఆధారిత ఫార్మాట్ నుండి కాలమ్ ఫార్మాట్కి తరలించడం వల్ల ఎక్కువ ఖర్చు అవుతుందనేది బాణం వినియోగదారుల నుండి నాకు వచ్చే సాధారణ ప్రశ్న. బహుళ-గిగాబైట్ డేటాసెట్ల కోసం, మెమరీలో లేదా డిస్క్లో ట్రాన్స్పోజ్ చేయడం చాలా ఎక్కువ.
స్ట్రీమింగ్ డేటా కోసం, సోర్స్ డేటా అడ్డు వరుస లేదా నిలువు వరుస అయినా, ఒక ఎంపిక ఏమిటంటే, అడ్డు వరుసల చిన్న బ్యాచ్లను పంపడం, ప్రతి ఒక్కటి అంతర్గతంగా నిలువు లేఅవుట్ను కలిగి ఉంటుంది.
అపాచీ బాణంలో, టేబుల్ చంక్ను సూచించే ఇన్-మెమరీ స్తంభ శ్రేణుల సేకరణను రికార్డ్ బ్యాచ్ అంటారు. తార్కిక పట్టిక యొక్క ఒకే డేటా నిర్మాణాన్ని సూచించడానికి, మీరు రికార్డుల యొక్క అనేక ప్యాకేజీలను సేకరించవచ్చు.
ఇప్పటికే ఉన్న "రాండమ్ యాక్సెస్" ఫైల్ ఫార్మాట్లో, మేము ఫైల్ చివరిలో టేబుల్ స్కీమా మరియు బ్లాక్ లేఅవుట్ని కలిగి ఉన్న మెటాడేటాను వ్రాస్తాము, ఇది డేటాసెట్ నుండి ఏదైనా బ్యాచ్ రికార్డ్లను లేదా ఏదైనా కాలమ్ను చాలా చౌకగా ఎంచుకోవడానికి మిమ్మల్ని అనుమతిస్తుంది. స్ట్రీమింగ్ ఫార్మాట్లో, మేము సందేశాల శ్రేణిని పంపుతాము: ఒక స్కీమా, ఆపై ఒకటి లేదా అంతకంటే ఎక్కువ బ్యాచ్ల రికార్డ్లు.
విభిన్న ఫార్మాట్లు ఈ చిత్రం వలె కనిపిస్తాయి:
PyArrowలో డేటా స్ట్రీమింగ్: అప్లికేషన్
ఇది ఎలా పని చేస్తుందో మీకు చూపించడానికి, నేను ఒకే స్ట్రీమ్ భాగంను సూచించే ఉదాహరణ డేటాసెట్ను సృష్టిస్తాను:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
ఇప్పుడు, మనం 1 GB డేటాను వ్రాయాలనుకుంటున్నాము, ఇందులో ఒక్కొక్కటి 1 MB భాగాలు, మొత్తం 1024 భాగాలు ఉంటాయి. ముందుగా, 1 నిలువు వరుసలతో మొదటి 16MB డేటా ఫ్రేమ్ని క్రియేట్ చేద్దాం:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
అప్పుడు నేను వాటిని మార్చుకుంటాను pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
ఇప్పుడు నేను RAMకి వ్రాసి సృష్టించే అవుట్పుట్ స్ట్రీమ్ను సృష్టిస్తాను StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
అప్పుడు మేము 1024 భాగాలను వ్రాస్తాము, ఇది చివరికి 1GB డేటాసెట్ను రూపొందిస్తుంది:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
మేము RAMలో వ్రాసినందున, మేము మొత్తం స్ట్రీమ్ను ఒక బఫర్లో పొందవచ్చు:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
ఈ డేటా మెమరీలో ఉన్నందున, బాణం రికార్డుల బ్యాచ్ల రీడింగ్ సున్నా-కాపీ ఆపరేషన్ ద్వారా పొందబడుతుంది. నేను StreamReaderని తెరుస్తాను, డేటాను చదివాను pyarrow.Table
ఆపై వాటిని మార్చండి DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
ఇవన్నీ, వాస్తవానికి, మంచివి, కానీ మీకు ప్రశ్నలు ఉండవచ్చు. ఇది ఎంత వేగంగా జరుగుతుంది? పాండాస్ డేటాఫ్రేమ్ రిట్రీవల్ పనితీరును భాగం పరిమాణం ఎలా ప్రభావితం చేస్తుంది?
స్ట్రీమింగ్ పనితీరు
స్ట్రీమింగ్ భాగం పరిమాణం తగ్గుతున్నందున, అసమర్థమైన కాష్ యాక్సెస్ స్కీమ్ల కారణంగా పాండాలలో ఒక పక్కనే ఉన్న స్తంభాల డేటాఫ్రేమ్ని పునర్నిర్మించే ఖర్చు పెరుగుతుంది. C++ డేటా స్ట్రక్చర్లు మరియు శ్రేణులు మరియు వాటి మెమరీ బఫర్లతో పని చేయడం వల్ల కొంత ఓవర్హెడ్ కూడా ఉంది.
పైన పేర్కొన్న విధంగా 1 MB కోసం, నా ల్యాప్టాప్లో (క్వాడ్-కోర్ జియాన్ E3-1505M) ఇది ఇలా మారుతుంది:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
7.75 1 MB భాగాల నుండి 1024 GB డేటాఫ్రేమ్ని పునరుద్ధరించడానికి సమర్థవంతమైన నిర్గమాంశ 1 Gb / s అని తేలింది. మేము పెద్ద లేదా చిన్న భాగాలను ఉపయోగిస్తే ఏమి జరుగుతుంది? మీరు పొందే ఫలితాలు ఇక్కడ ఉన్నాయి:
పనితీరు 256K నుండి 64K భాగాలకు గణనీయంగా పడిపోతుంది. 1MB భాగాలు కంటే 16MB భాగాలు వేగంగా ప్రాసెస్ చేయబడినందుకు నేను ఆశ్చర్యపోయాను. ఇది మరింత క్షుణ్ణంగా అధ్యయనం చేయడం మరియు ఇది సాధారణ పంపిణీ లేదా మరేదైనా ప్రమేయం ఉందా అని అర్థం చేసుకోవడం విలువ.
ఫార్మాట్ యొక్క ప్రస్తుత అమలులో, డేటా సూత్రప్రాయంగా కంప్రెస్ చేయబడదు, కాబట్టి మెమరీలో పరిమాణం మరియు "వైర్లో" సుమారుగా ఒకే విధంగా ఉంటుంది. భవిష్యత్తులో కుదింపు ఒక ఎంపికగా మారవచ్చు.
ఫలితం
స్ట్రీమింగ్ కాలమ్ డేటా పెద్ద డేటాసెట్లను చిన్న భాగాలలో పాండాలు వంటి కాలమ్ అనలిటిక్స్ సాధనాలకు బదిలీ చేయడానికి సమర్థవంతమైన మార్గం. వరుస-ఆధారిత నిల్వను ఉపయోగించే డేటా సేవలు మీ ప్రాసెసర్ యొక్క L2 మరియు L3 కాష్కి మరింత సౌకర్యవంతంగా ఉండే చిన్న చిన్న డేటా భాగాలను బదిలీ చేయగలవు మరియు బదిలీ చేయగలవు.
పూర్తి కోడ్
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
మూలం: www.habr.com