เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2

เดŽเดฒเตเดฒเดพเดตเตผเด•เตเด•เตเด‚ เดนเดพเดฏเต. เด•เต‹เดดเตโ€Œเดธเดฟเดฒเต† เดตเดฟเดฆเตเดฏเดพเตผเดคเตเดฅเดฟเด•เตพเด•เตเด•เดพเดฏเดฟ เดชเตเดฐเดคเตเดฏเต‡เด•เด‚ เดคเดฏเตเดฏเดพเดฑเดพเด•เตเด•เดฟเดฏ เดฒเต‡เด–เดจเดคเตเดคเดฟเดจเตเดฑเต† เด…เดตเดธเดพเดจ เดญเดพเด—เดคเตเดคเดฟเดจเตเดฑเต† เดตเดฟเดตเตผเดคเตเดคเดจเด‚ เดžเด™เตเด™เตพ เดชเด™เตเด•เดฟเดŸเตเดจเตเดจเต. "เดกเดพเดฑเตเดฑ เดŽเดžเตเดšเดฟเดจเต€เดฏเตผ". เด†เดฆเตเดฏเดญเดพเด—เด‚ เดตเดพเดฏเดฟเด•เตเด•เดพเด‚ เด‡เดตเดฟเดŸเต†.

เดคเดคเตเดธเดฎเดฏ เดชเตˆเดชเตเดชเตเดฒเตˆเดจเตเด•เตพเด•เตเด•เดพเดฏเดฟ เด…เดชเตเดชเดพเดšเตเดšเต† เดฌเต€เดฎเตเด‚ เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต‹เดฏเตเด‚

เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2

Google เด•เตเดฒเต—เดกเต เดธเดœเตเดœเต€เด•เดฐเดฟเด•เตเด•เตเดจเตเดจเต

เดถเตเดฐเดฆเตเดงเดฟเด•เตเด•เตเด•: เดชเตˆเดคเตเดคเตบ 3-เตฝ เดชเตˆเดชเตเดชเตโ€Œเดฒเตˆเตป เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเตฝ เดŽเดจเดฟเด•เตเด•เต เดชเตเดฐเดถเตโ€ŒเดจเดฎเตเดฃเตเดŸเดพเดฏเดคเดฟเดจเดพเตฝ เดชเตˆเดชเตเดชเตโ€Œเดฒเตˆเตป เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเด‚ เด‡เดทเตโ€ŒเดŸเดพเดจเตเดธเตƒเดค เดฒเต‹เด—เต เดกเดพเดฑเตเดฑ เดชเตเดฐเดธเดฟเดฆเตเดงเต€เด•เดฐเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเด‚ เดžเดพเตป Google เด•เตเดฒเต—เดกเต เดทเต†เตฝ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต.

เดชเตˆเดชเตเดชเตเดฒเตˆเตป เด†เดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต, เดžเด™เตเด™เตพ เด•เตเดฐเดฎเต€เด•เดฐเดฃเด™เตเด™เดณเดฟเตฝ เด…เตฝเดชเตเดชเด‚ เด•เตเดดเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต. เดจเดฟเด™เตเด™เดณเดฟเตฝ เดฎเตเดฎเตเดชเต GCP เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเดคเตเดคเดตเตผเด•เตเด•เดพเดฏเดฟ, เด‡เดคเดฟเตฝ เดตเดฟเดตเดฐเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจ 6 เด˜เดŸเตเดŸเด™เตเด™เตพ เดจเดฟเด™เตเด™เตพ เดชเดฟเดจเตเดคเตเดŸเดฐเต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต เดชเต‡เดœเต.

เด‡เดคเดฟเดจเตเดถเต‡เดทเด‚, เดžเด™เตเด™เดณเตเดŸเต† เดธเตโ€Œเด•เตเดฐเดฟเดชเตเดฑเตเดฑเตเด•เตพ Google เด•เตเดฒเต—เดกเต เดธเตเดฑเตเดฑเต‹เดฑเต‡เดœเดฟเดฒเต‡เด•เตเด•เต เด…เดชเตโ€Œเดฒเต‹เดกเต เดšเต†เดฏเตเดฏเตเด•เดฏเตเด‚ เด…เดต เดžเด™เตเด™เดณเตเดŸเต† Google เด•เตเดฒเต—เดกเต เดทเต†เดฒเตเดฒเดฟเดฒเต‡เด•เตเด•เต เดชเด•เตผเดคเตเดคเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต. เด•เตเดฒเต—เดกเต เดธเตเดฑเตเดฑเต‹เดฑเต‡เดœเดฟเดฒเต‡เด•เตเด•เต เด…เดชเตโ€Œเดฒเต‹เดกเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต เดตเดณเดฐเต† เดจเดฟเดธเตเดธเดพเดฐเดฎเดพเดฃเต (เด’เดฐเต เดตเดฟเดตเดฐเดฃเด‚ เด•เดฃเตเดŸเต†เดคเตเดคเดพเดจเดพเด•เตเด‚ เด‡เดตเดฟเดŸเต†). เดžเด™เตเด™เดณเตเดŸเต† เดซเดฏเดฒเตเด•เตพ เดชเด•เตผเดคเตเดคเดพเตป, เดšเตเดตเดŸเต†เดฏเตเดณเตเดณ เดšเดฟเดคเตเดฐเด‚ 2-เตฝ เด‡เดŸเดคเตเดตเดถเดคเตเดคเตเดณเตเดณ เด†เดฆเตเดฏ เดเด•เตเด•เดฃเดฟเตฝ เด•เตเดฒเดฟเด•เตเด•เตเดšเต†เดฏเตเดคเต เดŸเต‚เตพเดฌเดพเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต Google เด•เตเดฒเต—เดกเต เดทเต†เตฝ เดคเตเดฑเด•เตเด•เดพเด‚.

เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2
2 เดšเดฟเดคเตเดฐเด‚

เดจเดฎเตเด•เตเด•เต เดซเดฏเดฒเตเด•เตพ เดชเด•เตผเดคเตเดคเดพเดจเตเด‚ เด†เดตเดถเตเดฏเดฎเดพเดฏ เดฒเตˆเดฌเตเดฐเดฑเดฟเด•เตพ เด‡เตปเดธเตเดฑเตเดฑเดพเตพ เดšเต†เดฏเตเดฏเดพเดจเตเด‚ เด†เดตเดถเตเดฏเดฎเดพเดฏ เด•เดฎเดพเตปเดกเตเด•เตพ เดšเตเดตเดŸเต† เดชเดŸเตเดŸเดฟเด•เดชเตเดชเต†เดŸเตเดคเตเดคเดฟเดฏเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต.

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

เดžเด™เตเด™เดณเตเดŸเต† เดกเดพเดฑเตเดฑเดพเดฌเต‡เดธเตเด‚ เดชเดŸเตเดŸเดฟเด•เดฏเตเด‚ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต

เดธเดœเตเดœเต€เด•เดฐเดฃเดตเตเดฎเดพเดฏเดฟ เดฌเดจเตเดงเดชเตเดชเต†เดŸเตเดŸ เดŽเดฒเตเดฒเดพ เด˜เดŸเตเดŸเด™เตเด™เดณเตเด‚ เดžเด™เตเด™เตพ เดชเต‚เตผเดคเตเดคเดฟเดฏเดพเด•เตเด•เดฟเด•เตเด•เดดเดฟเดžเตเดžเดพเตฝ, เด…เดŸเตเดคเตเดคเดคเดพเดฏเดฟ เดจเดฎเตเดฎเตพ เดšเต†เดฏเตเดฏเต‡เดฃเตเดŸเดคเต BigQuery-เดฏเดฟเตฝ เด’เดฐเต เดกเดพเดฑเตเดฑเดพเดธเต†เดฑเตเดฑเตเด‚ เดชเดŸเตเดŸเดฟเด•เดฏเตเด‚ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต. เด‡เดคเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเต เดจเดฟเดฐเดตเดงเดฟ เดฎเดพเตผเด—เด™เตเด™เดณเตเดฃเตเดŸเต, เดŽเดจเตเดจเดพเตฝ เดเดฑเตเดฑเดตเตเด‚ เดฒเดณเดฟเดคเดฎเดพเดฏเดคเต เด†เดฆเตเดฏเด‚ เด’เดฐเต เดกเดพเดฑเตเดฑเดพเดธเต†เดฑเตเดฑเต เดธเตƒเดทเตเดŸเดฟเดšเตเดšเต Google เด•เตเดฒเต—เดกเต เด•เตบเดธเต‹เตพ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต. เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดšเตเดตเดŸเต†เดฏเตเดณเตเดณ เด˜เดŸเตเดŸเด™เตเด™เตพ เดชเดฟเดจเตเดคเตเดŸเดฐเดพเด‚ เดฒเดฟเด™เตเด•เตเด’เดฐเต เดธเตเด•เต€เดฎ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เด’เดฐเต เดชเดŸเตเดŸเดฟเด• เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดพเตป. เดžเด™เตเด™เดณเตเดŸเต† เดฎเต‡เดถ เด‰เดฃเตเดŸเดพเด•เตเด‚ 7 เดจเดฟเดฐเด•เตพ, เด“เดฐเต‹ เด‰เดชเดฏเต‹เด•เตเดคเตƒ เดฒเต‹เด—เดฟเดจเตเดฑเต†เดฏเตเด‚ เด˜เดŸเด•เด™เตเด™เดณเตเดฎเดพเดฏเดฟ เดชเตŠเดฐเตเดคเตเดคเดชเตเดชเต†เดŸเตเดจเตเดจเต. เดธเต—เด•เดฐเตเดฏเดพเตผเดคเตเดฅเด‚, เดŸเตˆเด‚เดฒเต‹เด•เตเด•เตฝ เดตเต‡เดฐเดฟเดฏเดฌเดฟเตพ เด’เดดเดฟเด•เต†เดฏเตเดณเตเดณ เดŽเดฒเตเดฒเดพ เดจเดฟเดฐเด•เดณเต†เดฏเตเด‚ เดžเด™เตเด™เตพ เดธเตเดŸเตเดฐเดฟเด‚เด—เตเด•เดณเดพเดฏเดฟ เดจเดฟเตผเดตเดšเดฟเด•เตเด•เตเด‚, เด•เต‚เดŸเดพเดคเต† เดžเด™เตเด™เตพ เดฎเตเดฎเตเดชเต เดธเตƒเดทเตเดŸเดฟเดšเตเดš เดตเต‡เดฐเดฟเดฏเดฌเดฟเดณเตเด•เตพเด•เตเด•เดจเตเดธเดฐเดฟเดšเตเดšเต เด…เดตเดฏเตเด•เตเด•เต เดชเต‡เดฐเดฟเดŸเตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเด‚. เดžเด™เตเด™เดณเตเดŸเต† เดชเดŸเตเดŸเดฟเด•เดฏเตเดŸเต† เดฒเต‡เด”เดŸเตเดŸเต เดšเดฟเดคเตเดฐเด‚ 3-เตฝ เด‰เดณเตเดณเดคเตเดชเต‹เดฒเต† เด†เดฏเดฟเดฐเดฟเด•เตเด•เดฃเด‚.

เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2
เดšเดฟเดคเตเดฐเด‚ 3. เดชเดŸเตเดŸเดฟเด• เดฒเต‡เด”เดŸเตเดŸเต

เด‰เดชเดฏเต‹เด•เตเดคเตƒ เดฒเต‹เด—เต เดกเดพเดฑเตเดฑ เดชเตเดฐเดธเดฟเดฆเตเดงเต€เด•เดฐเดฟเด•เตเด•เตเดจเตเดจเต

เดชเดฌเต/เดธเดฌเต เดžเด™เตเด™เดณเตเดŸเต† เดชเตˆเดชเตเดชเตเดฒเตˆเดจเดฟเดจเตเดฑเต† เด’เดฐเต เดจเดฟเตผเดฃเดพเดฏเด• เด˜เดŸเด•เดฎเดพเดฃเต, เด•เดพเดฐเดฃเด‚ เด‡เดคเต เด’เดจเตเดจเดฟเดฒเดงเดฟเด•เด‚ เดธเตเดตเดคเดจเตเดคเตเดฐ เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเดจเตเด•เดณเต† เดชเดฐเดธเตเดชเดฐเด‚ เด†เดถเดฏเดตเดฟเดจเดฟเดฎเดฏเด‚ เดจเดŸเดคเตเดคเดพเตป เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเดจเตเดจเต. เดชเตเดฐเดคเตเดฏเต‡เด•เดฟเดšเตเดšเตเด‚, เด†เดชเตเดฒเดฟเด•เตเด•เต‡เดทเดจเตเด•เตพเด•เตเด•เดฟเดŸเดฏเดฟเตฝ เดธเดจเตเดฆเต‡เดถเด™เตเด™เตพ เด…เดฏเดฏเตเด•เตเด•เดพเดจเตเด‚ เดธเตเดตเต€เด•เดฐเดฟเด•เตเด•เดพเดจเตเด‚ เดžเด™เตเด™เดณเต† เด…เดจเตเดตเดฆเดฟเด•เตเด•เตเดจเตเดจ เด’เดฐเต เด‡เดŸเดจเดฟเดฒเด•เตเด•เดพเดฐเดจเดพเดฏเดฟ เด‡เดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจเต. เดจเดฎเตเดฎเตพ เด†เดฆเตเดฏเด‚ เดšเต†เดฏเตเดฏเต‡เดฃเตเดŸเดคเต เด’เดฐเต เดตเดฟเดทเดฏเด‚ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต. เด•เตบเดธเต‹เดณเดฟเดฒเต† เดชเดฌเต/เดธเดฌเต เดŽเดจเตเดจเดคเดฟเดฒเต‡เด•เตเด•เต เดชเต‹เดฏเดฟ เดตเดฟเดทเดฏเด‚ เดธเตƒเดทเตโ€ŒเดŸเดฟเด•เตเด•เตเด• เด•เตเดฒเดฟเด•เตเด•เต เดšเต†เดฏเตเดฏเตเด•.

เดฎเตเด•เดณเดฟเตฝ เดจเดฟเตผเดตเดšเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจ เดฒเต‹เด—เต เดกเดพเดฑเตเดฑ เดธเตƒเดทเตโ€ŒเดŸเดฟเด•เตเด•เดพเตป เดšเตเดตเดŸเต†เดฏเตเดณเตเดณ เด•เต‹เดกเต เดžเด™เตเด™เดณเตเดŸเต† เดธเตโ€Œเด•เตเดฐเดฟเดชเตโ€Œเดฑเตเดฑเดฟเดจเต† เดตเดฟเดณเดฟเด•เตเด•เตเดจเตเดจเต, เดคเตเดŸเตผเดจเตเดจเต เดฒเต‹เด—เตเด•เตพ เดชเดฌเต/เดธเดฌเดฟเดฒเต‡เด•เตเด•เต เด•เดฃเด•เตโ€Œเดฑเตเดฑเต เดšเต†เดฏเตโ€Œเดคเต เด…เดฏเดฏเตโ€Œเด•เตเด•เตเดจเตเดจเต. เดจเดฎเตเดฎเตพ เดšเต†เดฏเตเดฏเต‡เดฃเตเดŸเดคเต เด’เดฐเต เดตเดธเตเดคเตเดตเดฟเดจเต† เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเด• เดŽเดจเตเดจเดคเดพเดฃเต เดชเตเดฐเดธเดพเดงเด•เตป, เดฐเต€เดคเดฟ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดตเดฟเดทเดฏเดคเตเดคเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดชเดพเดค เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เตเด• topic_path เด’เดชเตเดชเด‚ เดซเด‚เด—เตโ€Œเดทเตป เดตเดฟเดณเดฟเด•เตเด•เตเด• publish ั topic_path เดกเดพเดฑเตเดฑเดฏเตเด‚. เดžเด™เตเด™เตพ เด‡เดฑเด•เตเด•เตเดฎเดคเดฟ เดšเต†เดฏเตเดฏเตเดจเตเดจ เด•เดพเดฐเตเดฏเด‚ เดถเตเดฐเดฆเตเดงเดฟเด•เตเด•เตเด• generate_log_line เดžเด™เตเด™เดณเตเดŸเต† เดธเตเด•เตเดฐเดฟเดชเตเดฑเตเดฑเดฟเตฝ เดจเดฟเดจเตเดจเต stream_logs, เด…เดคเดฟเดจเดพเตฝ เดˆ เดซเดฏเดฒเตเด•เตพ เด’เดฐเต‡ เดซเต‹เตพเดกเดฑเดฟเดฒเดพเดฃเต†เดจเตเดจเต เด‰เดฑเดชเตเดชเดพเด•เตเด•เตเด•, เด…เดฒเตเดฒเดพเดคเตเดคเดชเด•เตเดทเด‚ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด’เดฐเต เด‡เดฑเด•เตเด•เตเดฎเดคเดฟ เดชเดฟเดถเด•เต เดฒเดญเดฟเด•เตเด•เตเด‚. เดชเดฟเดจเตเดจเต€เดŸเต เด‡เดคเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดžเด™เตเด™เดณเตเดŸเต† เด—เต‚เด—เดฟเตพ เด•เตบเดธเต‹เดณเดฟเดฒเต‚เดŸเต† เด‡เดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เดพเด‚:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

เดซเดฏเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจ เด‰เดŸเตป, เดคเดพเดดเต†เดฏเตเดณเตเดณ เดšเดฟเดคเตเดฐเดคเตเดคเดฟเตฝ เด•เดพเดฃเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเตเดชเต‹เดฒเต†, เด•เตบเดธเต‹เดณเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดฒเต‹เด—เต เดกเดพเดฑเตเดฑเดฏเตเดŸเต† เด”เดŸเตเดŸเตเดชเตเดŸเตเดŸเต เดจเดฎเตเด•เตเด•เต เด•เดพเดฃเดพเตป เด•เดดเดฟเดฏเตเด‚. เดจเดฎเตเดฎเตพ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เดพเดคเตเดคเดฟเดŸเดคเตเดคเต‹เดณเด‚ เด•เดพเดฒเด‚ เดˆ เดธเตเด•เตเดฐเดฟเดชเตเดฑเตเดฑเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเด‚ CTRL + Cเด…เดคเต เดชเต‚เตผเดคเตเดคเดฟเดฏเดพเด•เตเด•เดพเตป.

เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2
เดšเดฟเดคเตเดฐเด‚ 4. เด”เดŸเตเดŸเตเดชเตเดŸเตเดŸเต publish_logs.py

เดžเด™เตเด™เดณเตเดŸเต† เดชเตˆเดชเตเดชเตเดฒเตˆเตป เด•เต‹เดกเต เดŽเดดเตเดคเตเดจเตเดจเต

เด‡เดชเตเดชเต‹เตพ เดžเด™เตเด™เตพ เดŽเดฒเตเดฒเดพเด‚ เดคเดฏเตเดฏเดพเดฑเดพเด•เตเด•เดฟเดฏเดฟเดŸเตเดŸเตเดฃเตเดŸเต, เดจเดฎเตเด•เตเด•เต เดฐเดธเด•เดฐเดฎเดพเดฏ เดญเดพเด—เด‚ เด†เดฐเด‚เดญเดฟเด•เตเด•เดพเด‚ - เดฌเต€เด‚, เดชเตˆเดคเตเดคเตบ เดŽเดจเตเดจเดฟเดต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดžเด™เตเด™เดณเตเดŸเต† เดชเตˆเดชเตเดชเตเดฒเตˆเตป เด•เต‹เดกเดฟเด‚เด—เต. เด’เดฐเต เดฌเต€เด‚ เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต, เดžเด™เตเด™เตพ เด’เดฐเต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เด’เดฌเตเดœเด•เตเดฑเตเดฑเต (p) เดธเตƒเดทเตเดŸเดฟเด•เตเด•เต‡เดฃเตเดŸเดคเตเดฃเตเดŸเต. เด’เดฐเต เดชเตˆเดชเตเดชเต เดฒเตˆเตป เด’เดฌเตโ€Œเดœเด•เตโ€Œเดฑเตเดฑเต เดธเตƒเดทเตโ€ŒเดŸเดฟเดšเตเดšเตเด•เดดเดฟเดžเตเดžเดพเตฝ, เด“เดชเตเดชเดฑเต‡เดฑเตเดฑเตผ เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดจเดฎเตเด•เตเด•เต เด’เดจเตเดจเดฟเดฒเดงเดฟเด•เด‚ เดซเด‚เด—เตโ€Œเดทเดจเตเด•เตพ เดชเตเดฐเดฏเต‹เด—เดฟเด•เตเด•เดพเตป เด•เดดเดฟเดฏเตเด‚ pipe (|). เดชเตŠเดคเตเดตเต‡, เดตเตผเด•เตเด•เตเดซเตเดฒเต‹ เดšเตเดตเดŸเต†เดฏเตเดณเตเดณ เดšเดฟเดคเตเดฐเด‚ เดชเต‹เดฒเต† เด•เดพเดฃเดชเตเดชเต†เดŸเตเดจเตเดจเต.

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

เดžเด™เตเด™เดณเตเดŸเต† เด•เต‹เดกเดฟเตฝ, เดžเด™เตเด™เตพ เดฐเดฃเตเดŸเต เด‡เดทเตโ€ŒเดŸเดพเดจเตเดธเตƒเดค เดชเตเดฐเดตเตผเดคเตเดคเดจเด™เตเด™เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเด‚. เดซเด‚เด—เตเดทเตป regex_clean, เด…เดคเต เดกเดพเดฑเตเดฑ เดธเตเด•เดพเตป เดšเต†เดฏเตเดฏเตเด•เดฏเตเด‚ เดซเด‚เด—เตเดทเตป เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดชเดพเดฑเตเดฑเต‡เตบเดธเต เดฒเดฟเดธเตเดฑเตเดฑเดฟเดจเต† เด…เดŸเดฟเดธเตเดฅเดพเดจเดฎเดพเด•เตเด•เดฟ เด…เดจเตเดฌเดจเตเดง เดตเดฐเดฟ เดตเต€เดฃเตเดŸเต†เดŸเตเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต re.search. เดซเด‚เด—เตเดทเตป เด•เต‹เดฎเดฏเดพเตฝ เดตเต‡เตผเดคเดฟเดฐเดฟเดšเตเดš เด’เดฐเต เดธเตเดŸเตเดฐเดฟเด‚เด—เต เดจเตฝเด•เตเดจเตเดจเต. เดจเดฟเด™เตเด™เตพ เด’เดฐเต เดธเดพเดงเดพเดฐเดฃ เดŽเด•เตเดธเตเดชเตเดฐเดทเตป เดตเดฟเดฆเด—เตเดฆเตเดงเดจเดฒเตเดฒเต†เด™เตเด•เดฟเตฝ, เด‡เดคเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เดพเตป เดžเดพเตป เดถเตเดชเดพเตผเดถ เดšเต†เดฏเตเดฏเตเดจเตเดจเต เดŸเตเดฏเต‚เดŸเตเดŸเต‹เดฑเดฟเดฏเตฝ เด•เต‹เดกเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เดพเตป เด’เดฐเต เดจเต‹เดŸเตเดŸเตเดชเดพเดกเดฟเตฝ เดชเดฐเดฟเดถเต€เดฒเดฟเด•เตเด•เตเด•. เด‡เดคเดฟเดจเตเดถเต‡เดทเด‚ เดžเด™เตเด™เตพ เด’เดฐเต เด‡เดทเตโ€ŒเดŸเดพเดจเตเดธเตƒเดค ParDo เดซเด‚เด—เตโ€Œเดทเตป เดจเดฟเตผเดตเดšเดฟเด•เตเด•เตเดจเตเดจเต เดฐเดฃเตเดŸเดพเดฏเดฟ เดชเดฟเดฐเดฟเดฏเตเด•, เดธเดฎเดพเดจเตเดคเดฐ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เดฟเดจเตเดณเตเดณ เดฌเต€เด‚ เดชเดฐเดฟเดตเตผเดคเตเดคเดจเดคเตเดคเดฟเดจเตเดฑเต† เด’เดฐเต เดตเตเดฏเดคเดฟเดฏเดพเดจเดฎเดพเดฃเดฟเดคเต. เดชเตˆเดคเตเดคเดฃเดฟเตฝ, เด‡เดคเต เด’เดฐเต เดชเตเดฐเดคเตเดฏเต‡เด• เดฐเต€เดคเดฟเดฏเดฟเดฒเดพเดฃเต เดšเต†เดฏเตเดฏเตเดจเตเดจเดคเต - DoFn เดฌเต€เด‚ เด•เตเดฒเดพเดธเดฟเตฝ เดจเดฟเดจเตเดจเต เดชเดพเดฐเดฎเตเดชเดฐเตเดฏเดฎเดพเดฏเดฟ เดฒเดญเดฟเด•เตเด•เตเดจเตเดจ เด’เดฐเต เด•เตเดฒเดพเดธเต เดžเด™เตเด™เตพ เดธเตƒเดทเตเดŸเดฟเด•เตเด•เดฃเด‚. เดธเตเดชเตเดฒเดฟเดฑเตเดฑเต เดซเด‚เด—เตโ€Œเดทเตป เดฎเตเดฎเตเดชเดคเตเดคเต† เดซเด‚เด—เตโ€Œเดทเดจเดฟเตฝ เดจเดฟเดจเตเดจเต เดชเดพเดดเตโ€Œเดธเต เดšเต†เดฏเตโ€Œเดค เดตเดฐเดฟ เดŽเดŸเตเด•เตเด•เตเด•เดฏเตเด‚ เดžเด™เตเด™เดณเตเดŸเต† BigQuery เดชเดŸเตเดŸเดฟเด•เดฏเดฟเดฒเต† เด•เต‹เดณเด‚ เดชเต‡เดฐเตเด•เตพเด•เตเด•เต เด…เดจเตเดฏเต‹เดœเตเดฏเดฎเดพเดฏ เด•เต€เด•เดณเตเดณเตเดณ เดจเดฟเด˜เดฃเตเดŸเตเด•เตเด•เดณเตเดŸเต† เด’เดฐเต เดฒเดฟเดธเตเดฑเตเดฑเต เดจเตฝเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจเต. เดˆ เดซเด‚เด—เตโ€Œเดทเดจเดฟเตฝ เดถเตเดฐเดฆเตเดงเดฟเด•เตเด•เต‡เดฃเตเดŸ เดšเดฟเดฒเดคเตเดฃเตเดŸเต: เดŽเดจเดฟเด•เตเด•เต เด‡เดฑเด•เตเด•เตเดฎเดคเดฟ เดšเต†เดฏเตเดฏเต‡เดฃเตเดŸเดฟเดตเดจเตเดจเต datetime เด’เดฐเต เดซเด‚เด—เตโ€Œเดทเดจเตเดฑเต† เด‰เดณเตเดณเดฟเตฝ เด…เดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดจเตเดจเต. เดซเดฏเดฒเดฟเดจเตเดฑเต† เดคเตเดŸเด•เตเด•เดคเตเดคเดฟเตฝ เดŽเดจเดฟเด•เตเด•เต เด’เดฐเต เด‡เดฑเด•เตเด•เตเดฎเดคเดฟ เดชเดฟเดถเด•เต เดฒเดญเดฟเด•เตเด•เตเดจเตเดจเต, เด…เดคเต เดตเดฟเดšเดฟเดคเตเดฐเดฎเดพเดฏเดฟเดฐเตเดจเตเดจเต. เดˆ เดฒเดฟเดธเตเดฑเตเดฑเต เดชเดฟเดจเตเดจเต€เดŸเต เดซเด‚เด—เตเดทเดจเดฟเดฒเต‡เด•เตเด•เต เด•เตˆเดฎเดพเดฑเตเด‚ WriteToBigQuery, เด‡เดคเต เดžเด™เตเด™เดณเตเดŸเต† เดกเดพเดฑเตเดฑเดฏเต† เดชเดŸเตเดŸเดฟเด•เดฏเดฟเดฒเต‡เด•เตเด•เต เดšเต‡เตผเด•เตเด•เตเดจเตเดจเต. Batch DataFlow Job, Streaming DataFlow Job เดŽเดจเตเดจเดฟเดตเดฏเตเดŸเต† เด•เต‹เดกเต เดšเตเดตเดŸเต† เดจเตฝเด•เดฟเดฏเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเต. เดฌเดพเดšเตเดšเตเด‚ เดธเตเดŸเตเดฐเต€เดฎเดฟเด‚เด—เต เด•เต‹เดกเตเด‚ เดคเดฎเตเดฎเดฟเดฒเตเดณเตเดณ เด’เดฐเต‡เดฏเตŠเดฐเต เดตเตเดฏเดคเตเดฏเดพเดธเด‚ เดฌเดพเดšเตเดšเดฟเตฝ เดžเด™เตเด™เตพ CSV เดตเดพเดฏเดฟเด•เตเด•เตเดจเตเดจเต เดŽเดจเตเดจเดคเดพเดฃเต src_pathเดซเด‚เด—เตเดทเตป เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต ReadFromText เดฌเต€เดฎเดฟเตฝ เดจเดฟเดจเตเดจเต.

เดฌเดพเดšเตเดšเต เดกเดพเดฑเตเดฑเดซเตเดฒเต‹ เดœเต‹เดฒเดฟ (เดฌเดพเดšเตเดšเต เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

เดธเตเดŸเตเดฐเต€เดฎเดฟเด‚เด—เต เดกเดพเดฑเตเดฑเดซเตเดฒเต‹ เดœเต‹เดฒเดฟ (เดธเตเดŸเตเดฐเต€เด‚ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

เด•เตบเดตเต†เดฏเตผ เด†เดฐเด‚เดญเดฟเด•เตเด•เตเดจเตเดจเต

เดชเตˆเดชเตเดชเต เดฒเตˆเตป เดจเดฎเตเด•เตเด•เต เดชเดฒ เดคเดฐเดคเตเดคเดฟเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เดพเด‚. เดžเด™เตเด™เตพเด•เตเด•เต เดตเต‡เดฃเดฎเต†เด™เตเด•เดฟเตฝ, เดœเดฟเดธเดฟเดชเดฟเดฏเดฟเดฒเต‡เด•เตเด•เต เดตเดฟเดฆเต‚เดฐเดฎเดพเดฏเดฟ เดฒเต‹เด—เดฟเตป เดšเต†เดฏเตเดฏเตเดฎเตเดชเต‹เตพ เดŸเต†เตผเดฎเดฟเดจเดฒเดฟเตฝ เดจเดฟเดจเตเดจเต เดชเตเดฐเดพเดฆเต‡เดถเดฟเด•เดฎเดพเดฏเดฟ เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เดพเด‚.

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

เดŽเดจเตเดจเดฟเดฐเตเดจเตเดจเดพเดฒเตเด‚, DataFlow เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดžเด™เตเด™เตพ เด‡เดคเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเดชเตเดชเดฟเด•เตเด•เดพเตป เดชเต‹เด•เตเดจเตเดจเต. เด‡เดจเดฟเดชเตเดชเดฑเดฏเตเดจเตเดจ เด†เดตเดถเตเดฏเดฎเดพเดฏ เดชเดพเดฐเดพเดฎเต€เดฑเตเดฑเดฑเตเด•เตพ เดธเดœเตเดœเต€เด•เดฐเดฟเดšเตเดšเต เดšเตเดตเดŸเต†เดฏเตเดณเตเดณ เด•เดฎเดพเตปเดกเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเต เดจเดฎเตเด•เตเด•เต เด‡เดคเต เดšเต†เดฏเตเดฏเดพเตป เด•เดดเดฟเดฏเตเด‚.

  • project โ€” เดจเดฟเด™เตเด™เดณเตเดŸเต† GCP เดชเตเดฐเต‹เดœเด•เตเดฑเตเดฑเดฟเดจเตเดฑเต† เดเดกเดฟ.
  • runner เดจเดฟเด™เตเด™เดณเตเดŸเต† เดชเตเดฐเต‹เด—เตเดฐเดพเด‚ เดตเดฟเดถเด•เดฒเดจเด‚ เดšเต†เดฏเตเดฏเตเด•เดฏเตเด‚ เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดจเดฟเตผเดฎเตเดฎเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจ เด’เดฐเต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดฑเดฃเตเดฃเดฑเดพเดฃเต. เด•เตเดฒเต—เดกเดฟเตฝ เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เดพเตป, เดจเดฟเด™เตเด™เตพ เด’เดฐเต DataflowRunner เดตเตเดฏเด•เตเดคเดฎเดพเด•เตเด•เดฃเด‚.
  • staging_location โ€” เดœเต‹เดฒเดฟ เดจเดฟเตผเดตเดนเดฟเด•เตเด•เตเดจเตเดจ เดชเตเดฐเต‹เดธเดธเตเดธเดฑเตเด•เตพเด•เตเด•เต เด†เดตเดถเตเดฏเดฎเดพเดฏ เด•เต‹เดกเต เดชเดพเด•เตเด•เต‡เดœเตเด•เตพ เดธเต‚เดšเดฟเด•เดฏเดฟเดฒเดพเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เด•เตเดฒเต—เดกเต เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต‹ เด•เตเดฒเต—เดกเต เดธเตเดฑเตเดฑเต‹เดฑเต‡เดœเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดชเดพเดค.
  • temp_location โ€” เดชเตˆเดชเตเดชเตโ€Œเดฒเตˆเตป เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ เดธเตƒเดทเตโ€ŒเดŸเดฟเดšเตเดš เดคเดพเตฝเด•เตเด•เดพเดฒเดฟเด• เดœเต‹เดฒเดฟ เดซเดฏเดฒเตเด•เตพ เดธเด‚เดญเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเดณเตเดณ เด•เตเดฒเต—เดกเต เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต‹ เด•เตเดฒเต—เดกเต เดธเด‚เดญเดฐเดฃเดคเตเดคเดฟเดฒเต‡เด•เตเด•เตเดณเตเดณ เดชเดพเดค.
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

เดˆ เด•เดฎเดพเตปเดกเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดฎเตเดชเต‹เตพ, เดจเดฎเตเด•เตเด•เต เด—เต‚เด—เดฟเตพ เด•เตบเดธเต‹เดณเดฟเดฒเต† เดกเดพเดฑเตเดฑเดพเดซเตเดฒเต‹ เดŸเดพเดฌเดฟเดฒเต‡เด•เตเด•เต เดชเต‹เดฏเดฟ เดžเด™เตเด™เดณเตเดŸเต† เดชเตˆเดชเตเดชเตเดฒเตˆเตป เด•เดพเดฃเดพเด‚. เดชเตˆเดชเตเดชเตโ€Œเดฒเตˆเดจเดฟเตฝ เด•เตเดฒเดฟเด•เตเด•เต เดšเต†เดฏเตเดฏเตเดฎเตเดชเต‹เตพ, เดšเดฟเดคเตเดฐเด‚ 4-เดจเต เดธเดฎเดพเดจเดฎเดพเดฏ เด’เดจเตเดจเต เดจเดฎเตเดฎเตพ เด•เดพเดฃเตเด‚. เดกเต€เดฌเด—เตเด—เดฟเด‚เด—เต เด†เดตเดถเตเดฏเด™เตเด™เตพเด•เตเด•เดพเดฏเดฟ, เดตเดฟเดถเดฆเดฎเดพเดฏ เดฒเต‹เด—เตเด•เตพ เด•เดพเดฃเตเดจเตเดจเดคเดฟเดจเต เดฒเต‹เด—เตเด•เดณเดฟเดฒเต‡เด•เตเด•เตเด‚ เดคเตเดŸเตผเดจเตเดจเต Stackdriver-เดฒเต‡เดฏเตเด•เตเด•เตเด‚ เดชเต‹เด•เตเดจเตเดจเดคเต เดตเดณเดฐเต† เดธเดนเดพเดฏเด•เดฐเดฎเดพเดฃเต. เดจเดฟเดฐเดตเดงเดฟ เด•เต‡เดธเตเด•เดณเดฟเดฒเต† เดชเตˆเดชเตเดชเต เดฒเตˆเตป เดชเตเดฐเดถเตเดจเด™เตเด™เตพ เดชเดฐเดฟเดนเดฐเดฟเด•เตเด•เดพเตป เด‡เดคเต เดŽเดจเตเดจเต† เดธเดนเดพเดฏเดฟเดšเตเดšเดฟเดŸเตเดŸเตเดฃเตเดŸเต.

เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2
เดšเดฟเดคเตเดฐเด‚ 4: เดฌเต€เด‚ เด•เตบเดตเต†เดฏเตผ

BigQuery-เตฝ เดžเด™เตเด™เดณเตเดŸเต† เดกเดพเดฑเตเดฑ เด†เด•เตโ€Œเดธเดธเต เดšเต†เดฏเตเดฏเตเด•

เด…เดคเดฟเดจเดพเตฝ, เดžเด™เตเด™เดณเตเดŸเต† เดŸเต‡เดฌเดฟเดณเดฟเดฒเต‡เด•เตเด•เต เดกเดพเดฑเตเดฑ เด’เดดเตเด•เตเดจเตเดจ เด’เดฐเต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เด‡เดคเดฟเดจเด•เด‚ เดคเดจเตเดจเต† เด‰เดฃเตเดŸเดพเดฏเดฟเดฐเดฟเด•เตเด•เดฃเด‚. เด‡เดคเต เดชเดฐเดฟเดถเต‹เดงเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเต, เดจเดฎเตเด•เตเด•เต BigQuery-เดฒเต‡เด•เตเด•เต เดชเต‹เดฏเดฟ เดกเดพเดฑเตเดฑ เดจเต‹เด•เตเด•เดพเด‚. เดšเตเดตเดŸเต†เดฏเตเดณเตเดณ เด•เดฎเดพเตปเดกเต เด‰เดชเดฏเต‹เด—เดฟเดšเตเดšเดคเดฟเดจเต เดถเต‡เดทเด‚ เดจเดฟเด™เตเด™เตพ เดกเดพเดฑเตเดฑเดพเดธเต†เดฑเตเดฑเดฟเดจเตเดฑเต† เด†เดฆเตเดฏ เด•เตเดฑเดšเตเดšเต เดตเดฐเดฟเด•เตพ เด•เดพเดฃเตเด‚. เด‡เดชเตเดชเต‹เตพ BigQuery-เตฝ เดกเดพเดฑเตเดฑ เดธเด‚เดญเดฐเดฟเดšเตเดšเดฟเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเดพเตฝ, เดžเด™เตเด™เตพเด•เตเด•เต เด•เต‚เดŸเตเดคเตฝ เดตเดฟเดถเด•เดฒเดจเด‚ เดจเดŸเดคเตเดคเดพเดจเตเด‚ เด…เดคเตเดชเต‹เดฒเต† เดธเดนเดชเตเดฐเดตเตผเดคเตเดคเด•เดฐเตเดฎเดพเดฏเดฟ เดกเดพเดฑเตเดฑ เดชเด™เตเด•เดฟเดŸเดพเดจเตเด‚ เดฌเดฟเดธเดฟเดจเดธเต เดšเต‹เดฆเตเดฏเด™เตเด™เตพเด•เตเด•เต เด‰เดคเตเดคเดฐเด‚ เดจเตฝเด•เดพเดจเตเด‚ เด•เดดเดฟเดฏเตเด‚.

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

เดžเด™เตเด™เตพ เด’เดฐเต เดธเตเดŸเตเดฐเต€เด‚ เดกเดพเดฑเตเดฑ เดชเตเดฐเต‹เดธเดธเตเดธเดฟเด‚เด—เต เดชเตˆเดชเตเดชเตเดฒเตˆเตป เดธเตƒเดทเตเดŸเดฟเด•เตเด•เตเดจเตเดจเต. เดญเดพเด—เด‚ 2
เดšเดฟเดคเตเดฐเด‚ 5: BigQuery

เดคเต€เดฐเตเดฎเดพเดจเด‚

เด’เดฐเต เดธเตเดŸเตเดฐเต€เดฎเดฟเด‚เด—เต เดกเดพเดฑเตเดฑเดพ เดชเตˆเดชเตเดชเตโ€Œเดฒเตˆเตป เดธเตƒเดทเตโ€ŒเดŸเดฟเด•เตเด•เตเดจเตเดจเดคเดฟเดจเตเด‚ เดกเดพเดฑเตเดฑ เด•เต‚เดŸเตเดคเตฝ เด†เด•เตโ€Œเดธเดธเต เดšเต†เดฏเตเดฏเดพเดจเตเดณเตเดณ เดตเดดเดฟเด•เตพ เด•เดฃเตเดŸเต†เดคเตเดคเตเดจเตเดจเดคเดฟเดจเตเดฎเตเดณเตเดณ เด‰เดชเดฏเต‹เด—เดชเตเดฐเดฆเดฎเดพเดฏ เด‰เดฆเดพเดนเดฐเดฃเดฎเดพเดฏเดฟ เดˆ เดชเต‹เดธเตเดฑเตเดฑเต เดชเตเดฐเดตเตผเดคเตเดคเดฟเด•เตเด•เตเดฎเต†เดจเตเดจเต เดžเด™เตเด™เตพ เดชเตเดฐเดคเต€เด•เตเดทเดฟเด•เตเด•เตเดจเตเดจเต. เดˆ เดซเต‹เตผเดฎเดพเดฑเตเดฑเดฟเตฝ เดกเดพเดฑเตเดฑ เดธเด‚เดญเดฐเดฟเด•เตเด•เตเดจเตเดจเดคเต เดจเดฎเตเด•เตเด•เต เดงเดพเดฐเดพเดณเด‚ เด—เตเดฃเด™เตเด™เตพ เดจเตฝเด•เตเดจเตเดจเต. เดžเด™เตเด™เดณเตเดŸเต† เด‰เตฝเดชเตเดชเดจเตเดจเด‚ เดŽเดคเตเดฐ เดชเต‡เตผ เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเดจเตเดจเต เดŽเดจเตเดจเดคเตเดชเต‹เดฒเตเดณเตเดณ เดชเตเดฐเดงเดพเดจเดชเตเดชเต†เดŸเตเดŸ เดšเต‹เดฆเตเดฏเด™เตเด™เตพเด•เตเด•เต เด‡เดชเตเดชเต‹เตพ เด‰เดคเตเดคเดฐเด‚ เดจเตฝเด•เดพเตป เดคเตเดŸเด™เตเด™เดพเด‚. เด•เดพเดฒเด•เตเดฐเดฎเต‡เดฃ เดจเดฟเด™เตเด™เดณเตเดŸเต† เด‰เดชเดฏเต‹เด•เตเดคเตƒ เด…เดŸเดฟเดคเตเดคเดฑ เดตเดณเดฐเตเด•เดฏเดพเดฃเต‹? เด‰เตฝเดชเตเดชเดจเตเดจเดคเตเดคเดฟเดจเตเดฑเต† เดเดคเต เดตเดถเด™เตเด™เดณเตเดฎเดพเดฏเดฟ เด†เดณเตเด•เตพ เด•เต‚เดŸเตเดคเตฝ เด‡เดŸเดชเดดเด•เตเดจเตเดจเต? เด•เต‚เดŸเดพเดคเต† เด‰เดฃเตเดŸเดพเด•เดพเตป เดชเดพเดŸเดฟเดฒเตเดฒเดพเดคเตเดคเดฟเดŸเดคเตเดคเต เดชเดฟเดดเดตเตเด•เดณเตเดฃเตเดŸเต‹? เดˆ เดšเต‹เดฆเตเดฏเด™เตเด™เดณเดพเดฃเต เดธเตเดฅเดพเดชเดจเดคเตเดคเดฟเดจเต เดคเดพเตฝเดชเตเดชเดฐเตเดฏเดฎเตเดณเตเดณเดคเต. เดˆ เดšเต‹เดฆเตเดฏเด™เตเด™เตพเด•เตเด•เตเดณเตเดณ เด‰เดคเตเดคเดฐเด™เตเด™เดณเดฟเตฝ เดจเดฟเดจเตเดจเต เด‰เดฏเตผเดจเตเดจเตเดตเดฐเตเดจเตเดจ เดธเตเดฅเดฟเดคเดฟเดตเดฟเดตเดฐเด•เตเด•เดฃเด•เตเด•เตเด•เตพ เด…เดŸเดฟเดธเตเดฅเดพเดจเดฎเดพเด•เตเด•เดฟ, เดžเด™เตเด™เตพเด•เตเด•เต เด‰เตฝเดชเตเดชเดจเตเดจเด‚ เดฎเต†เดšเตเดšเดชเตเดชเต†เดŸเตเดคเตเดคเดพเดจเตเด‚ เด‰เดชเดฏเต‹เด•เตเดคเตƒ เด‡เดŸเดชเดดเด•เตฝ เดตเตผเดฆเตเดงเดฟเดชเตเดชเดฟเด•เตเด•เดพเดจเตเด‚ เด•เดดเดฟเดฏเตเด‚.

เด‡เดคเตเดคเดฐเดคเตเดคเดฟเดฒเตเดณเตเดณ เดตเตเดฏเดพเดฏเดพเดฎเดคเตเดคเดฟเดจเต เดฌเต€เด‚ เดถเดฐเดฟเด•เตเด•เตเด‚ เด‰เดชเดฏเต‹เด—เดชเตเดฐเดฆเดฎเดพเดฃเต เด•เต‚เดŸเดพเดคเต† เดฎเดฑเตเดฑเต เดฐเดธเด•เดฐเดฎเดพเดฏ เด‰เดชเดฏเต‹เด— เด•เต‡เดธเตเด•เดณเตเด‚ เด‰เดฃเตเดŸเต. เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดธเตเดฑเตเดฑเต‹เด•เตเด•เต เดŸเดฟเด•เตเด•เต เดกเดพเดฑเตเดฑ เดคเดคเตเดธเดฎเดฏเด‚ เดตเดฟเดถเด•เดฒเดจเด‚ เดšเต†เดฏเตเดฏเดพเดจเตเด‚ เดตเดฟเดถเด•เดฒเดจเดคเตเดคเต† เด…เดŸเดฟเดธเตเดฅเดพเดจเดฎเดพเด•เตเด•เดฟ เดŸเตเดฐเต‡เดกเตเด•เตพ เดจเดŸเดคเตเดคเดพเดจเตเด‚ เดจเดฟเด™เตเด™เตพ เด†เด—เตเดฐเดนเดฟเดšเตเดšเต‡เด•เตเด•เดพเด‚, เด’เดฐเตเดชเด•เตเดทเต‡ เดจเดฟเด™เตเด™เตพเด•เตเด•เต เดตเดพเดนเดจเด™เตเด™เดณเดฟเตฝ เดจเดฟเดจเตเดจเตเดณเตเดณ เดธเต†เตปเดธเตผ เดกเดพเดฑเตเดฑเดฏเตเด‚ เดŸเตเดฐเดพเดซเดฟเด•เต เดฒเต†เดตเตฝ เด•เดฃเด•เตเด•เตเด•เต‚เดŸเตเดŸเดฒเตเด•เตพ เด•เดฃเด•เตเด•เดพเด•เตเด•เดพเดจเตเด‚ เดคเดพเตฝเดชเตเดชเดฐเตเดฏเดฎเตเดฃเตเดŸเดพเด•เดพเด‚. เด‰เดฆเดพเดนเดฐเดฃเดคเตเดคเดฟเดจเต, เดจเดฟเด™เตเด™เตพเด•เตเด•เต เด‰เดชเดฏเต‹เด•เตเดคเตƒ เดกเดพเดฑเตเดฑ เดถเต‡เด–เดฐเดฟเด•เตเด•เตเด•เดฏเตเด‚ เดชเตเดฐเดงเดพเดจ เด…เดณเดตเตเด•เตพ เดŸเตเดฐเดพเด•เตเด•เตเดšเต†เดฏเตเดฏเตเดจเตเดจเดคเดฟเดจเต เดกเดพเดทเตโ€Œเดฌเต‹เตผเดกเตเด•เตพ เดธเตƒเดทเตโ€ŒเดŸเดฟเด•เตเด•เดพเตป เด…เดคเต เด‰เดชเดฏเต‹เด—เดฟเด•เตเด•เตเด•เดฏเตเด‚ เดšเต†เดฏเตเดฏเตเดจเตเดจ เด’เดฐเต เด—เต†เดฏเดฟเดฎเดฟเด‚เด—เต เด•เดฎเตเดชเดจเดฟเดฏเดพเด•เดพเด‚. เดถเดฐเดฟ, เดฎเดพเดจเตเดฏเดฐเต‡, เด‡เดคเต เดฎเดฑเตเดฑเตŠเดฐเต เดชเต‹เดธเตเดฑเตเดฑเดฟเดจเตเดณเตเดณ เดตเดฟเดทเดฏเดฎเดพเดฃเต, เดตเดพเดฏเดฟเดšเตเดšเดคเดฟเดจเต เดจเดจเตเดฆเดฟ, เด•เต‚เดŸเดพเดคเต† เดฎเตเดดเตเดตเตป เด•เต‹เดกเตเด‚ เด•เดพเดฃเดพเตป เด†เด—เตเดฐเดนเดฟเด•เตเด•เตเดจเตเดจเดตเตผเด•เตเด•เต เดŽเดจเตเดฑเต† GitHub-เดฒเต‡เด•เตเด•เตเดณเตเดณ เดฒเดฟเด™เตเด•เต เดšเตเดตเดŸเต†เดฏเตเดฃเตเดŸเต.

https://github.com/DFoly/User_log_pipeline

เด…เดคเตเดฐเดฎเดพเดคเตเดฐเด‚. เด’เดจเตเดจเดพเด‚ เดญเดพเด—เด‚ เดตเดพเดฏเดฟเด•เตเด•เตเด•.

เด…เดตเดฒเด‚เดฌเด‚: www.habr.com

เด’เดฐเต เด…เดญเดฟเดชเตเดฐเดพเดฏเด‚ เดšเต‡เตผเด•เตเด•เตเด•