የአምድ ውሂብ በ Apache ቀስት በዥረት መልቀቅ

የጽሁፉ ትርጉም የተዘጋጀው በተለይ ለትምህርቱ ተማሪዎች ነው። የውሂብ መሐንዲስ.

የአምድ ውሂብ በ Apache ቀስት በዥረት መልቀቅ

ባለፉት ጥቂት ሳምንታት ውስጥ አለን። ኖንግ ሊ ታክሏል Apache ቀስት የሁለትዮሽ ዥረት ቅርጸት፣ ያለውን የዘፈቀደ መዳረሻ/አይፒሲ ፋይል ቅርጸት ማሟላት። ጃቫ እና ሲ++ አተገባበር እና የፓይዘን ማሰሪያዎች አሉን። በዚህ ጽሑፍ ውስጥ, ቅርጸቱ እንዴት እንደሚሰራ እገልጻለሁ እና ለፓንዳስ ዳታ ፍሬም በጣም ከፍተኛ የውሂብ ፍሰት እንዴት ማግኘት እንደሚችሉ አሳይሻለሁ.

የዥረት አምድ ውሂብ

ከቀስት ተጠቃሚዎች የምቀበለው የተለመደ ጥያቄ ትላልቅ የሰንጠረዥ ውሂብ ስብስቦችን ከረድፍ ወይም መዝገብ-ተኮር ቅርጸት ወደ አምድ ተኮር ቅርጸት የማሸጋገር ከፍተኛ ወጪ ነው። ለብዙ ጊጋባይት ዳታሴቶች በማህደረ ትውስታ ወይም በዲስክ ላይ ማስተላለፍ በጣም ከባድ ስራ ሊሆን ይችላል።

መረጃን ለመልቀቅ፣ የምንጭ ውሂቡ ረድፍም ሆነ አምድ፣ አንዱ አማራጭ ትንንሽ የረድፎችን ስብስቦችን መላክ ነው፣ እያንዳንዱም በውስጡ የአዕማድ አቀማመጥ አለው።

በApache ቀስት ውስጥ፣ የማህደረ ትውስታ አምድ ድርድሮች የሰንጠረዥ ቁራጭን የሚወክሉ ስብስቦች የመዝገብ ባች ይባላሉ። የአመክንዮአዊ ሰንጠረዥ ነጠላ የውሂብ መዋቅርን ለመወከል፣ በርካታ የምዝግብ ማስታወሻዎች ሊሰበሰቡ ይችላሉ።

አሁን ባለው የ"ራንደም መዳረሻ" የፋይል ፎርማት የሠንጠረዡን እቅድ የያዘ ሜታዳታ እንመዘግባለን እና በፋይሉ መጨረሻ ላይ ያሉ ቦታዎችን እንገድባለን ይህም ከውሂብ ስብስብ ውስጥ ማንኛውንም መዝገብ ወይም ማንኛውንም አምድ በከፍተኛ ዋጋ እንዲመርጡ ያስችልዎታል። በዥረት ፎርማት፣ ተከታታይ መልዕክቶችን እንልካለን፡ ገለፃ እና ከዚያም አንድ ወይም ከዚያ በላይ የሆኑ መዝገቦች።

የተለያዩ ቅርጸቶች እንደዚህ ያለ ነገር ይመስላሉ-

የአምድ ውሂብ በ Apache ቀስት በዥረት መልቀቅ

የዥረት ዳታ በPyArrow፡ መተግበሪያ

ይህ እንዴት እንደሚሰራ ለእርስዎ ለማሳየት አንድ ነጠላ የዥረት ቁራጭን የሚወክል ምሳሌ ዳታ ስብስብ እፈጥራለሁ፡-

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = int(total_size / ncols / np.dtype('float64').itemsize)
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })	

አሁን 1 ጂቢ ውሂብ መጻፍ እንፈልጋለን እንበል ፣ እያንዳንዳቸው 1 ሜባ ቁርጥራጮችን ያቀፈ ፣ በድምሩ 1024። ለመጀመር፣ የመጀመሪያውን 1 ሜባ የውሂብ ፍሬም ከ16 አምዶች ጋር እንፍጠር፡

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

df = generate_data(MEGABYTE, NCOLS)

ከዚያም ወደ እነርሱ እቀይራለሁ pyarrow.RecordBatch:

batch = pa.RecordBatch.from_pandas(df)

አሁን ወደ RAM የሚጽፍ እና የሚፈጥር የውጤት ዥረት እፈጥራለሁ StreamWriter:

sink = pa.InMemoryOutputStream()
stream_writer = pa.StreamWriter(sink, batch.schema)

ከዚያ 1024 ቁርጥራጮችን እንጽፋለን ፣ ይህም በመጨረሻ ወደ 1 ጂቢ የውሂብ ስብስብ ይሆናል ።

for i in range(DATA_SIZE // MEGABYTE):
    stream_writer.write_batch(batch)

ለ RAM ስለጻፍን ሙሉውን ዥረት በአንድ ቋት ውስጥ ማግኘት እንችላለን፡-

In [13]: source = sink.get_result()

In [14]: source
Out[14]: <pyarrow.io.Buffer at 0x7f2df7118f80>

In [15]: source.size
Out[15]: 1074750744

ይህ መረጃ በማህደረ ትውስታ ውስጥ ስለሆነ የቀስት መዝገቦችን ባች ማንበብ ዜሮ ቅጂ ነው። StreamReader ን እከፍታለሁ፣ ውሂብን ወደ ውስጥ አነባለሁ። pyarrow.Table, እና ከዚያ ወደ እነርሱ ይቀይሯቸው DataFrame pandas:

In [16]: reader = pa.StreamReader(source)

In [17]: table = reader.read_all()

In [18]: table
Out[18]: <pyarrow.table.Table at 0x7fae8281f6f0>

In [19]: df = table.to_pandas()

In [20]: df.memory_usage().sum()
Out[20]: 1073741904

ይህ ሁሉ በእርግጥ ጥሩ ነው, ግን ጥያቄዎች ሊኖሩዎት ይችላሉ. ይህ ምን ያህል በፍጥነት ይከሰታል? የ chunk መጠን የፓንዳስ ዳታ ፍሬም ሰርስሮ አፈጻጸምን እንዴት ይጎዳል?

የዥረት አፈጻጸም

የዥረት ቻንክ መጠኑ እየቀነሰ ሲሄድ፣ በፓንዳዎች ውስጥ ቀጣይነት ያለው የአምድ ዳታ ፍሬም መልሶ የመገንባት ዋጋ ውጤታማ ባልሆነ የመሸጎጫ መዳረሻ ቅጦች ምክንያት ይጨምራል። ከC++ ዳታ አወቃቀሮች እና ድርድሮች እና የማስታወሻ ማቋቋሚያዎቻቸው ጋር አብሮ ለመስራት የተወሰነ ትርፍ አለ።

ለ 1 ሜባ ፣ ከላይ እንደተገለፀው ፣ በእኔ ላፕቶፕ (ኳድ-ኮር Xeon E3-1505M) ላይ ይወጣል

In [20]: %timeit pa.StreamReader(source).read_all().to_pandas()
10 loops, best of 3: 129 ms per loop

7.75GB DataFrame ከ 1 1024MB ቸንክ ወደነበረበት ለመመለስ ውጤታማው የውጤት መጠን 1 ጂቢ/ሰ ነው። ትላልቅ ወይም ትናንሽ ቁርጥራጮችን ከተጠቀምን ምን ይሆናል? ውጤቶቹ እነዚህ ናቸው፡-

የአምድ ውሂብ በ Apache ቀስት በዥረት መልቀቅ

አፈጻጸሙ ከ256K ወደ 64K ቁርጥራጮች በእጅጉ ይቀንሳል። 1 ሜጋ ባይት ከ16 ሜባ ክፍልፋዮች በበለጠ ፍጥነት መሰራታቸው አስገርሞኛል። ይህ የተለመደ ስርጭት መሆኑን ወይም ሌላ ነገር በጨዋታ ላይ እንዳለ ወይም አለመሆኑን የበለጠ ጥልቅ ጥናት ማካሄድ እና መረዳት ተገቢ ነው።

አሁን ባለው የቅርጸቱ አተገባበር ላይ መረጃው በመርህ ደረጃ አልተጨመቀም, ስለዚህ በማህደረ ትውስታ እና "በሽቦዎች" ውስጥ ያለው መጠን በግምት ተመሳሳይ ነው. ለወደፊቱ, መጨናነቅ ተጨማሪ አማራጭ ሊሆን ይችላል.

ውጤቱ

የአምድ ዳታ በዥረት የሚለቀቅበት ውጤታማ መንገድ ትላልቅ የውሂብ ስብስቦችን እንደ ፓንዳዎች በትንሽ ቁርጥራጮች ለመመገብ ውጤታማ መንገድ ሊሆን ይችላል። በረድፍ ተኮር ማከማቻ የሚጠቀሙ የውሂብ አገልግሎቶች ለአቀነባባሪዎ L2 እና L3 መሸጎጫዎች የበለጠ ምቹ የሆኑ ትናንሽ ቁርጥራጮችን ማስተላለፍ እና ማስተላለፍ ይችላሉ።

ሙሉ ኮድ

import time
import numpy as np
import pandas as pd
import pyarrow as pa

def generate_data(total_size, ncols):
    nrows = total_size / ncols / np.dtype('float64').itemsize
    return pd.DataFrame({
        'c' + str(i): np.random.randn(nrows)
        for i in range(ncols)
    })

KILOBYTE = 1 << 10
MEGABYTE = KILOBYTE * KILOBYTE
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16

def get_timing(f, niter):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    return (time.clock_gettime(time.CLOCK_REALTIME) - start) / NITER

def read_as_dataframe(klass, source):
    reader = klass(source)
    table = reader.read_all()
    return table.to_pandas()
NITER = 5
results = []

CHUNKSIZES = [16 * KILOBYTE, 64 * KILOBYTE, 256 * KILOBYTE, MEGABYTE, 16 * MEGABYTE]

for chunksize in CHUNKSIZES:
    nchunks = DATA_SIZE // chunksize
    batch = pa.RecordBatch.from_pandas(generate_data(chunksize, NCOLS))

    sink = pa.InMemoryOutputStream()
    stream_writer = pa.StreamWriter(sink, batch.schema)

    for i in range(nchunks):
        stream_writer.write_batch(batch)

    source = sink.get_result()

    elapsed = get_timing(lambda: read_as_dataframe(pa.StreamReader, source), NITER)

    result = (chunksize, elapsed)
    print(result)
    results.append(result)

ምንጭ: hab.com

አስተያየት ያክሉ