рд▓реЗрдЦрд╛рдЪрд╛ рдЕрдиреБрд╡рд╛рдж рд╡рд┐рд╢реЗрд╖рддрдГ рдЕрднреНрдпрд╛рд╕рдХреНрд░рдорд╛рдЪреНрдпрд╛ рд╡рд┐рджреНрдпрд╛рд░реНрдереНрдпрд╛рдВрд╕рд╛рдареА рддрдпрд╛рд░ рдХреЗрд▓рд╛ рд╣реЛрддрд╛
рдЧреЗрд▓реНрдпрд╛ рдХрд╛рд╣реА рдЖрдард╡рдбреНрдпрд╛рдВрдд, рдЖрдореНрд╣реА
рд╕реНрддрдВрдн рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд┐рдд рдХрд░рдгреЗ
рдПрд░реЛ рд╡рд╛рдкрд░рдХрд░реНрддреНрдпрд╛рдВрдХрдбреВрди рдорд▓рд╛ рдкрдбрд▓реЗрд▓рд╛ рдПрдХ рд╕рд╛рдорд╛рдиреНрдп рдкреНрд░рд╢реНрди рдореНрд╣рдгрдЬреЗ рдореЛрдареНрдпрд╛ рд╕рд╛рд░рдгреА рдбреЗрдЯрд╛рд╕реЗрдЯрд▓рд╛ рдкрдВрдХреНрддреА- рдХрд┐рдВрд╡рд╛ рд░реЗрдХреЙрд░реНрдб-рдУрд░рд┐рдПрдВрдЯреЗрдб рдлреЙрд░рдореЕрдЯрдордзреВрди рдХреЙрд▓рдо рдлреЙрд░рдореЕрдЯрдордзреНрдпреЗ рд╕реНрдерд▓рд╛рдВрддрд░рд┐рдд рдХрд░рдгреНрдпрд╛рдЪреА рдЙрдЪреНрдЪ рдХрд┐рдВрдордд. рдорд▓реНрдЯреА-рдЧреАрдЧрд╛рдмрд╛рдЗрдЯ рдбреЗрдЯрд╛рд╕реЗрдЯрд╕рд╛рдареА, рдореЗрдорд░реАрдордзреНрдпреЗ рдХрд┐рдВрд╡рд╛ рдбрд┐рд╕реНрдХрд╡рд░ рдЯреНрд░рд╛рдиреНрд╕рдкреЛрдЬ рдХрд░рдгреЗ рдЬрдмрд░рджрд╕реНрдд рдЕрд╕реВ рд╢рдХрддреЗ.
рдкреНрд░рд╡рд╛рд╣рд┐рдд рдбреЗрдЯрд╛рд╕рд╛рдареА, рд╕реНрддреНрд░реЛрдд рдбреЗрдЯрд╛ рдкрдВрдХреНрддреА рдХрд┐рдВрд╡рд╛ рд╕реНрддрдВрдн рдЕрд╕реЛ, рдПрдХ рдкрд░реНрдпрд╛рдп рдореНрд╣рдгрдЬреЗ рдкрдВрдХреНрддреАрдВрдЪреНрдпрд╛ рд▓рд╣рд╛рди рдмреЕрдЪ рдкрд╛рдард╡рдгреЗ, рдкреНрд░рддреНрдпреЗрдХрд╛рдордзреНрдпреЗ рдЕрдВрддрд░реНрдЧрдд рд╕реНрддрдВрдн рд▓реЗрдЖрдЙрдЯ рдЖрд╣реЗ.
Apache Arrow рдордзреНрдпреЗ, рдЯреЗрдмрд▓рдЪреНрдпрд╛ рднрд╛рдЧрд╛рдЪреЗ рдкреНрд░рддрд┐рдирд┐рдзрд┐рддреНрд╡ рдХрд░рдгрд╛рд▒реНрдпрд╛ рдЗрди-рдореЗрдорд░реА рд╕реНрддрдВрднреАрдп рдЕреЕрд░реЗрдЪреНрдпрд╛ рд╕рдВрдЧреНрд░рд╣рд╛рд▓рд╛ рд░реЗрдХреЙрд░реНрдб рдмреЕрдЪ рдореНрд╣рдгрддрд╛рдд. рддрд╛рд░реНрдХрд┐рдХ рд╕рд╛рд░рдгреАрдЪреНрдпрд╛ рдПрдХрд▓ рдбреЗрдЯрд╛ рд╕реНрдЯреНрд░рдХреНрдЪрд░рдЪреЗ рдкреНрд░рддрд┐рдирд┐рдзрд┐рддреНрд╡ рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА, рддреБрдореНрд╣реА рд░реЗрдХреЙрд░реНрдбрдЪреЗ рдЕрдиреЗрдХ рдкреЕрдХреЗрдЬ рдЧреЛрд│рд╛ рдХрд░реВ рд╢рдХрддрд╛.
рд╡рд┐рджреНрдпрдорд╛рди "рд░рдБрдбрдо рдНрдХреНрд╕реЗрд╕" рдлрд╛рдЗрд▓ рдлреЙрд░рдореЕрдЯрдордзреНрдпреЗ, рдЖрдореНрд╣реА рдлрд╛рдЗрд▓рдЪреНрдпрд╛ рд╢реЗрд╡рдЯреА рдЯреЗрдмрд▓ рд╕реНрдХреАрдорд╛ рдЖрдгрд┐ рдмреНрд▓реЙрдХ рд▓реЗрдЖрдЙрдЯ рдЕрд╕рд▓реЗрд▓рд╛ рдореЗрдЯрд╛рдбреЗрдЯрд╛ рд▓рд┐рд╣рд┐рддреЛ, рдЬреНрдпрд╛рдореБрд│реЗ рддреБрдореНрд╣рд╛рд▓рд╛ рдбреЗрдЯрд╛рд╕реЗрдЯрдордзреВрди рдХреЛрдгрддрд╛рд╣реА рд░реЗрдХреЙрд░реНрдб рдХрд┐рдВрд╡рд╛ рдХреЙрд▓рдо рдЕрдЧрджреА рд╕реНрд╡рд╕реНрддрд╛рдд рдирд┐рд╡рдбрддрд╛ рдпреЗрддреЛ. рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рдлреЙрд░рдореЕрдЯрдордзреНрдпреЗ, рдЖрдореНрд╣реА рд╕рдВрджреЗрд╢рд╛рдВрдЪреА рдорд╛рд▓рд┐рдХрд╛ рдкрд╛рдард╡рддреЛ: рдПрдХ рд╕реНрдХреАрдорд╛ рдЖрдгрд┐ рдирдВрддрд░ рд░реЗрдХреЙрд░реНрдбрдЪреНрдпрд╛ рдПрдХ рдХрд┐рдВрд╡рд╛ рдЕрдзрд┐рдХ рдмреЕрдЪ.
рднрд┐рдиреНрди рд╕реНрд╡рд░реВрдкреЗ рдпрд╛ рдЪрд┐рддреНрд░рд╛рд╕рд╛рд░рдЦреЗ рдХрд╛рд╣реАрддрд░реА рджрд┐рд╕рддрд╛рдд:
PyArrow рдордзреАрд▓ рдбреЗрдЯрд╛ рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ: рдНрдкреНрд▓рд┐рдХреЗрд╢рди
рддреЗ рдХрд╕реЗ рдХрд╛рд░реНрдп рдХрд░рддреЗ рддреЗ рддреБрдореНрд╣рд╛рд▓рд╛ рджрд╛рдЦрд╡рдгреНрдпрд╛рд╕рд╛рдареА, рдореА рдПрдХрд▓ рдкреНрд░рд╡рд╛рд╣ рднрд╛рдЧ рджрд░реНрд╢рд╡рд┐рдгрд╛рд░рд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг рдбреЗрдЯрд╛рд╕реЗрдЯ рддрдпрд╛рд░ рдХрд░реЗрди:
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = int(total_size / ncols / np.dtype('float64').itemsize)
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
рдЖрддрд╛ рд╕рдордЬрд╛, рдПрдХреВрдг 1 рднрд╛рдЧрд╛рдВрд╕рд╛рдареА рдкреНрд░рддреНрдпреЗрдХреА 1 MB рднрд╛рдЧрд╛рдВрдЪрд╛ рд╕рдорд╛рд╡реЗрд╢ рдЕрд╕рд▓реЗрд▓рд╛ 1024 GB рдбреЗрдЯрд╛ рд▓рд┐рд╣рд╛рдпрдЪрд╛ рдЖрд╣реЗ. рдкреНрд░рдердо, 1 рд╕реНрддрдВрднрд╛рдВрд╕рд╣ рдкрд╣рд┐рд▓реА 16MB рдбреЗрдЯрд╛ рдлреНрд░реЗрдо рддрдпрд╛рд░ рдХрд░реВрдпрд╛:
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
df = generate_data(MEGABYTE, NCOLS)
рдордЧ рдореА рддреНрдпрд╛рдВрдирд╛ рд░реВрдкрд╛рдВрддрд░рд┐рдд рдХрд░рддреЛ pyarrow.RecordBatch
:
batch = pa.RecordBatch.from_pandas(df)
рдЖрддрд╛ рдореА рдЖрдЙрдЯрдкреБрдЯ рд╕реНрдЯреНрд░реАрдо рддрдпрд╛рд░ рдХрд░реЗрди рдЬреЛ RAM рд╡рд░ рд▓рд┐рд╣реВрди рддрдпрд╛рд░ рдХрд░реЗрд▓ StreamWriter
:
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
рдордЧ рдЖрдореНрд╣реА 1024 рднрд╛рдЧ рд▓рд┐рд╣реВ, рдЬреЗ рд╢реЗрд╡рдЯреА 1GB рдбреЗрдЯрд╛рд╕реЗрдЯ рдмрдирд╡реЗрд▓:
for i in range(DATA_SIZE // MEGABYTE):
stream_writer.write_batch(batch)
рдЖрдореНрд╣реА RAM рдордзреНрдпреЗ рд▓рд┐рд╣рд┐рд▓реЗ рдЕрд╕рд▓реНрдпрд╛рдиреЗ, рдЖрдореНрд╣реА рд╕рдВрдкреВрд░реНрдг рдкреНрд░рд╡рд╛рд╣ рдПрдХрд╛ рдмрдлрд░рдордзреНрдпреЗ рдорд┐рд│рд╡реВ рд╢рдХрддреЛ:
In [13]: source = sink.get_result()
In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>
In [15]: source.size
Out[15]: 1074750744
рд╣рд╛ рдбреЗрдЯрд╛ рдореЗрдорд░реАрдордзреНрдпреЗ рдЕрд╕рд▓реНрдпрд╛рдореБрд│реЗ, рдЕреЕрд░реЛ рд░реЗрдХреЙрд░реНрдбрдЪреЗ рдмреЕрдЪ рд╡рд╛рдЪрдгреЗ рд╢реВрдиреНрдп-рдХреЙрдкреА рдСрдкрд░реЗрд╢рдирджреНрд╡рд╛рд░реЗ рдкреНрд░рд╛рдкреНрдд рдХреЗрд▓реЗ рдЬрд╛рддреЗ. рдореА StreamReader рдЙрдШрдбрддреЛ, рддреНрдпрд╛рдд рдбреЗрдЯрд╛ рд╡рд╛рдЪрд╛ pyarrow.Table
рдЖрдгрд┐ рдирдВрддрд░ рддреНрдпрд╛рдВрдирд╛ рдордзреНрдпреЗ рд░реВрдкрд╛рдВрддрд░рд┐рдд рдХрд░рд╛ DataFrame pandas
:
In [16]: reader = pa.StreamReader(source)
In [17]: table = reader.read_all()
In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>
In [19]: df = table.to_pandas()
In [20]: df.memory_usage().sum()
Out[20]: 1073741904
рд╣реЗ рд╕рд░реНрд╡ рдирдХреНрдХреАрдЪ рдЪрд╛рдВрдЧрд▓реЗ рдЖрд╣реЗ, рдкрд░рдВрддреБ рддреБрдореНрд╣рд╛рд▓рд╛ рдкреНрд░рд╢реНрди рдЕрд╕реВ рд╢рдХрддрд╛рдд. рддреЗ рдХрд┐рддреА рд╡реЗрдЧрд╛рдиреЗ рдШрдбрддреЗ? рднрд╛рдЧ рдЖрдХрд╛рд░ рдкрд╛рдВрдбрд╛ рдбреЗрдЯрд╛рдлреНрд░реЗрдо рдкреБрдирд░реНрдкреНрд░рд╛рдкреНрддреА рдХрд╛рд░реНрдпрдХреНрд╖рдорддреЗрд╡рд░ рдХрд╕рд╛ рдкрд░рд┐рдгрд╛рдо рдХрд░рддреЛ?
рдкреНрд░рд╡рд╛рд╣рд┐рдд рдХрд╛рд░реНрдпрдкреНрд░рджрд░реНрд╢рди
рд╕реНрдЯреНрд░реАрдорд┐рдВрдЧ рднрд╛рдЧрд╛рдЪрд╛ рдЖрдХрд╛рд░ рдХрдореА рд╣реЛрдд рдЕрд╕рддрд╛рдирд╛, рдЕрдХрд╛рд░реНрдпрдХреНрд╖рдо рдХреЕрд╢реЗ рдНрдХреНрд╕реЗрд╕ рдпреЛрдЬрдирд╛рдВрдореБрд│реЗ рдкрд╛рдВрдбрд╛рдВрдордзреНрдпреЗ рд╕рдВрд▓рдЧреНрди рд╕реНрддрдВрднреАрдп рдбреЗрдЯрд╛рдлреНрд░реЗрдордЪреА рдкреБрдирд░реНрд░рдЪрдирд╛ рдХрд░рдгреНрдпрд╛рдЪреА рдХрд┐рдВрдордд рд╡рд╛рдврддреЗ. C++ рдбреЗрдЯрд╛ рд╕реНрдЯреНрд░рдХреНрдЪрд░реНрд╕ рдЖрдгрд┐ рдЕреЕрд░реЗ рдЖрдгрд┐ рддреНрдпрд╛рдВрдЪреНрдпрд╛ рдореЗрдорд░реА рдмрдлрд░рд╕рд╣ рдХрд╛рдо рдХрд░рдгреНрдпрд╛рдкрд╛рд╕реВрди рдХрд╛рд╣реА рдУрд╡реНрд╣рд░рд╣реЗрдб рджреЗрдЦреАрд▓ рдЖрд╣реЗ.
рд╡рд░реАрд▓рдкреНрд░рдорд╛рдгреЗ 1 MB рд╕рд╛рдареА, рдорд╛рдЭреНрдпрд╛ рд▓реЕрдкрдЯреЙрдкрд╡рд░ (рдХреНрд╡рд╛рдб-рдХреЛрд░ Xeon E3-1505M) рдЕрд╕реЗ рджрд┐рд╕реВрди рдЖрд▓реЗ:
In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop
рдЕрд╕реЗ рджрд┐рд╕реВрди рдЖрд▓реЗ рдХреА 7.75 1 MB рднрд╛рдЧрд╛рдВрдордзреВрди 1024 GB рдбреЗрдЯрд╛рдлреНрд░реЗрдо рдкреБрдирд░реНрд╕рдВрдЪрдпрд┐рдд рдХрд░рдгреНрдпрд╛рд╕рд╛рдареА рдкреНрд░рднрд╛рд╡реА рдереНрд░реВрдкреБрдЯ 1 Gb/s рдЖрд╣реЗ. рдЖрдкрдг рдореЛрдареЗ рдХрд┐рдВрд╡рд╛ рд▓рд╣рд╛рди рднрд╛рдЧ рд╡рд╛рдкрд░рд▓реНрдпрд╛рд╕ рдХрд╛рдп рд╣реЛрдИрд▓? рддреБрдореНрд╣рд╛рд▓рд╛ рдорд┐рд│рдгрд╛рд░реЗ рдкрд░рд┐рдгрд╛рдо рдпреЗрдереЗ рдЖрд╣реЗрдд:
рдХрд╛рд░реНрдпрдкреНрд░рджрд░реНрд╢рди 256K рддреЗ 64K рднрд╛рдЧрд╛рдВрдордзреНрдпреЗ рд▓рдХреНрд╖рдгреАрдпрд░реАрддреНрдпрд╛ рдШрд╕рд░рддреЗ. рдорд▓рд╛ рдЖрд╢реНрдЪрд░реНрдп рд╡рд╛рдЯрд▓реЗ рдХреА 1MB рднрд╛рдЧрд╛рдВрд╡рд░ 16MB рднрд╛рдЧрд╛рдВрдкреЗрдХреНрд╖рд╛ рдЬрд▓рдж рдкреНрд░рдХреНрд░рд┐рдпрд╛ рдХреЗрд▓реА рдЧреЗрд▓реА. рдЕрдзрд┐рдХ рд╕рдЦреЛрд▓ рдЕрднреНрдпрд╛рд╕ рдХрд░рдгреЗ рдЖрдгрд┐ рд╣реЗ рд╕рд╛рдорд╛рдиреНрдп рд╡рд┐рддрд░рдг рдЖрд╣реЗ рдХреА рдЖрдгрдЦреА рдХрд╛рд╣реАрддрд░реА рдЧреБрдВрддрд▓реЗрд▓реЗ рдЖрд╣реЗ рд╣реЗ рд╕рдордЬреВрди рдШреЗрдгреЗ рдпреЛрдЧреНрдп рдЖрд╣реЗ.
рд╕реНрд╡рд░реВрдкрд╛рдЪреНрдпрд╛ рд╕рдзреНрдпрд╛рдЪреНрдпрд╛ рдЕрдВрдорд▓рдмрдЬрд╛рд╡рдгреАрдордзреНрдпреЗ, рдбреЗрдЯрд╛ рддрддреНрддреНрд╡рддрдГ рд╕рдВрдХреБрдЪрд┐рдд рдХреЗрд▓рд╛ рдЬрд╛рдд рдирд╛рд╣реА, рдореНрд╣рдгреВрди рдореЗрдорд░реАрдордзреАрд▓ рдЖрдХрд╛рд░ рдЖрдгрд┐ "рд╡рд╛рдпрд░рд╡рд░" рдЕрдВрджрд╛рдЬреЗ рд╕рдорд╛рди рдЖрд╣реЗ. рднрд╡рд┐рд╖реНрдпрд╛рдд рдХреЙрдореНрдкреНрд░реЗрд╢рди рд╣рд╛ рдПрдХ рдкрд░реНрдпрд╛рдп рдмрдиреВ рд╢рдХрддреЛ.
рдкрд░рд┐рдгрд╛рдо
рд╕реНрддрдВрдн рдбреЗрдЯрд╛ рдкреНрд░рд╡рд╛рд╣рд┐рдд рдХрд░рдгреЗ рд╣рд╛ рдореЛрдареНрдпрд╛ рдбреЗрдЯрд╛рд╕реЗрдЯрд▓рд╛ рд╕реНрддрдВрдн рд╡рд┐рд╢реНрд▓реЗрд╖рдг рд╕рд╛рдзрдирд╛рдВрдордзреНрдпреЗ рд╣рд╕реНрддрд╛рдВрддрд░рд┐рдд рдХрд░рдгреНрдпрд╛рдЪрд╛ рдПрдХ рдкреНрд░рднрд╛рд╡реА рдорд╛рд░реНрдЧ рдЕрд╕реВ рд╢рдХрддреЛ рдЬрд╕реЗ рдХреА рд▓рд╣рд╛рди рднрд╛рдЧрд╛рдВрдордзреНрдпреЗ рдкрд╛рдВрдбрд╛. рдбреЗрдЯрд╛ рд╕реЗрд╡рд╛ рдЬреНрдпрд╛ рд░реЛ-рдУрд░рд┐рдПрдВрдЯреЗрдб рд╕реНрдЯреЛрд░реЗрдЬ рд╡рд╛рдкрд░рддрд╛рдд рддреНрдпрд╛ рддреБрдордЪреНрдпрд╛ рдкреНрд░реЛрд╕реЗрд╕рд░рдЪреНрдпрд╛ L2 рдЖрдгрд┐ L3 рдХреЕрд╢реЗрд╕рд╛рдареА рдЕрдзрд┐рдХ рд╕реЛрдпреАрд╕реНрдХрд░ рдЕрд╕рд▓реЗрд▓реНрдпрд╛ рдбреЗрдЯрд╛рдЪреЗ рдЫреЛрдЯреЗ рднрд╛рдЧ рд╣рд╕реНрддрд╛рдВрддрд░рд┐рдд рдЖрдгрд┐ рд╣рд╕реНрддрд╛рдВрддрд░рд┐рдд рдХрд░реВ рд╢рдХрддрд╛рдд.
рдкреВрд░реНрдг рдХреЛрдб
import time
import numpy as np
import pandas as pd
import pyarrow as pa
def generate_data(total_size, ncols):
nrows = total_size / ncols / np.dtype('float64').itemsize
return pd.DataFrame({
'c' + str(i): np.random.randn(nrows)
for i in range(ncols)
})
KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
def get_timing(f, niter):
start = time.clock_gettime(time.CLOCK_REALTIME)
for i in range(niter):
f()
return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER
def read_as_dataframe(klass, source):
reader = klass(source)
table = reader.read_all()
return table.to_pandas()
NITER = 5
results = []
CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]
for chunksize in CHUNKSIZES:
nchunks = DATA_SIZE // chunksize
batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))
sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)
for i in range(nchunks):
stream_writer.write_batch(batch)
source = sink.get_result()
elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)
result = (chunksize, elapsed)
print(result)
results.append(result)
рд╕реНрддреНрд░реЛрдд: www.habr.com