เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2

เจธเจพเจฐเจฟเจ†เจ‚ เจจเฉ‚เฉฐ เจธเจคเจฟ เจธเจผเฉเจฐเฉ€ เจ…เจ•เจพเจฒ. เจ…เจธเฉ€เจ‚ เจฒเฉ‡เจ– เจฆเฉ‡ เจ…เฉฐเจคเจฎ เจนเจฟเฉฑเจธเฉ‡ เจฆเจพ เจ…เจจเฉเจตเจพเจฆ เจธเจพเจ‚เจเจพ เจ•เจฐ เจฐเจนเฉ‡ เจนเจพเจ‚, เจ–เจพเจธ เจคเฉŒเจฐ 'เจคเฉ‡ เจ•เฉ‹เจฐเจธ เจฆเฉ‡ เจตเจฟเจฆเจฟเจ†เจฐเจฅเฉ€เจ†เจ‚ เจฒเจˆ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเจพ เจ—เจฟเจ† เจนเฉˆเฅค เจกเจพเจŸเจพ เจ‡เฉฐเจœเฉ€เจจเฉ€เจ…เจฐ. เจคเฉเจธเฉ€เจ‚ เจชเจนเจฟเจฒเจพ เจญเจพเจ— เจชเฉœเฉเจน เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจ‡เฉฑเจฅเฉ‡.

เจฐเฉ€เจ…เจฒ-เจŸเจพเจˆเจฎ เจชเจพเจˆเจชเจฒเจพเจˆเจจเจพเจ‚ เจฒเจˆ เจ…เจชเจพเจšเฉ‡ เจฌเฉ€เจฎ เจ…เจคเฉ‡ เจกเฉ‡เจŸเจพเจซเจฒเฉ‹

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2

Google เจ•เจฒเจพเจŠเจก เจธเฉˆเฉฑเจŸเจ…เฉฑเจช เจ•เฉ€เจคเจพ เจœเจพ เจฐเจฟเจนเจพ เจนเฉˆ

เจจเฉ‹เจŸ: เจฎเฉˆเจ‚ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจจเฉ‚เฉฐ เจšเจฒเจพเจ‰เจฃ เจ…เจคเฉ‡ เจ•เจธเจŸเจฎ เจฒเฉŒเจ— เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจชเฉเจฐเจ•เจพเจธเจผเจฟเจค เจ•เจฐเจจ เจฒเจˆ เจ—เฉ‚เจ—เจฒ เจ•เจฒเจพเจ‰เจก เจธเจผเฉˆเฉฑเจฒ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เฉ€เจคเฉ€ เจ•เจฟเจ‰เจ‚เจ•เจฟ เจฎเฉˆเจจเฉ‚เฉฐ เจชเจพเจˆเจฅเจจ 3 เจตเจฟเฉฑเจš เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจšเจฒเจพเจ‰เจฃ เจตเจฟเฉฑเจš เจฎเฉเจธเจผเจ•เจฒ เจ† เจฐเจนเฉ€ เจธเฉ€เฅค เจ—เฉ‚เจ—เจฒ เจ•เจฒเจพเจ‰เจก เจธเจผเฉˆเฉฑเจฒ เจชเจพเจˆเจฅเจจ 2 เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเจพ เจนเฉˆ, เจœเฉ‹ เจ•เจฟ เจ…เจชเจพเจšเฉ‡ เจฌเฉ€เจฎ เจจเจพเจฒ เจตเจงเฉ‡เจฐเฉ‡ เจ…เจจเฉเจ•เฉ‚เจฒ เจนเฉˆเฅค

เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจธเจผเฉเจฐเฉ‚ เจ•เจฐเจจ เจฒเจˆ, เจธเจพเจจเฉ‚เฉฐ เจธเฉˆเจŸเจฟเฉฐเจ—เจพเจ‚ เจตเจฟเฉฑเจš เจฅเฉ‹เฉœเจพ เจœเจฟเจนเจพ เจ–เฉ‹เจฆเจฃ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆเฅค เจคเฉเจนเจพเจกเฉ‡ เจตเจฟเฉฑเจšเฉ‹เจ‚ เจœเจฟเจจเฉเจนเจพเจ‚ เจจเฉ‡ เจชเจนเจฟเจฒเจพเจ‚ GCP เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจจเจนเฉ€เจ‚ เจ•เฉ€เจคเฉ€ เจนเฉˆ, เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‡เจธ เจตเจฟเฉฑเจš เจฆเฉฑเจธเฉ‡ เจ—เจ เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ‡ 6 เจ•เจฆเจฎเจพเจ‚ เจฆเฉ€ เจชเจพเจฒเจฃเจพ เจ•เจฐเจจ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉ‹เจตเฉ‡เจ—เฉ€ เจธเจซเจผเจพ.

เจ‡เจธ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ, เจธเจพเจจเฉ‚เฉฐ เจ†เจชเจฃเฉ€เจ†เจ‚ เจธเจ•เฉเจฐเจฟเจชเจŸเจพเจ‚ เจจเฉ‚เฉฐ เจ—เฉ‚เจ—เจฒ เจ•เจฒเจพเจ‰เจก เจธเจŸเฉ‹เจฐเฉ‡เจœ 'เจคเฉ‡ เจ…เจชเจฒเฉ‹เจก เจ•เจฐเจจ เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจธเจพเจกเฉ‡ เจ—เฉ‚เจ—เจฒ เจ•เจฒเจพเจ‰เจก เจธเจผเฉˆเจฒ 'เจคเฉ‡ เจ•เจพเจชเฉ€ เจ•เจฐเจจ เจฆเฉ€ เจœเจผเจฐเฉ‚เจฐเจค เจนเฉ‹เจเจ—เฉ€เฅค เจ•เจฒเจพเจ‰เจก เจธเจŸเฉ‹เจฐเฉ‡เจœ 'เจคเฉ‡ เจ…เจชเจฒเฉ‹เจก เจ•เจฐเจจเจพ เจฌเจนเฉเจค เจฎเจพเจฎเฉ‚เจฒเฉ€ เจนเฉˆ (เจ‡เฉฑเจ• เจตเฉ‡เจฐเจตเจพ เจชเจพเจ‡เจ† เจœเจพ เจธเจ•เจฆเจพ เจนเฉˆ เจ‡เฉฑเจฅเฉ‡). เจธเจพเจกเฉ€เจ†เจ‚ เจซเจพเจˆเจฒเจพเจ‚ เจฆเฉ€ เจจเจ•เจฒ เจ•เจฐเจจ เจฒเจˆ, เจ…เจธเฉ€เจ‚ เจนเฉ‡เจ เจพเจ‚ เจšเจฟเฉฑเจคเจฐ 2 เจตเจฟเฉฑเจš เจ–เฉฑเจฌเฉ‡ เจชเจพเจธเฉ‡ เจชเจนเจฟเจฒเฉ‡ เจ†เจˆเจ•เจจ 'เจคเฉ‡ เจ•เจฒเจฟเฉฑเจ• เจ•เจฐเจ•เฉ‡ เจŸเฉ‚เจฒเจฌเจพเจฐ เจคเฉ‹เจ‚ เจ—เฉ‚เจ—เจฒ เจ•เจฒเจพเจ‰เจก เจธเจผเฉˆเจฒ เจ–เฉ‹เจฒเฉเจน เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2
2 เจšเจฟเฉฑเจคเจฐ

เจซเจพเจˆเจฒเจพเจ‚ เจฆเฉ€ เจจเจ•เจฒ เจ•เจฐเจจ เจ…เจคเฉ‡ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ€เจ†เจ‚ เจฒเจพเจ‡เจฌเฉเจฐเฉ‡เจฐเฉ€เจ†เจ‚ เจจเฉ‚เฉฐ เจธเจฅเจพเจชเจฟเจค เจ•เจฐเจจ เจฒเจˆ เจธเจพเจจเฉ‚เฉฐ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ€เจ†เจ‚ เจ•เจฎเจพเจ‚เจกเจพเจ‚ เจนเฉ‡เจ เจพเจ‚ เจธเฉ‚เจšเฉ€เจฌเฉฑเจง เจนเจจเฅค

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

เจธเจพเจกเจพ เจกเจพเจŸเจพเจฌเฉ‡เจธ เจ…เจคเฉ‡ เจธเจพเจฐเจฃเฉ€ เจฌเจฃเจพเจ‰เจฃเจพ

เจ‡เฉฑเจ• เจตเจพเจฐ เจœเจฆเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ เจธเฉˆเฉฑเจŸเจ…เฉฑเจช เจธเฉฐเจฌเฉฐเจงเฉ€ เจธเจพเจฐเฉ‡ เจชเฉœเจพเจ… เจชเฉ‚เจฐเฉ‡ เจ•เจฐ เจฒเฉˆเจ‚เจฆเฉ‡ เจนเจพเจ‚, เจคเจพเจ‚ เจ…เจ—เจฒเฉ€ เจšเฉ€เจœเจผ เจœเฉ‹ เจธเจพเจจเฉ‚เฉฐ เจ•เจฐเจจ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆ เจ‰เจน เจนเฉˆ BigQuery เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจกเจพเจŸเจพเจธเฉˆเจŸ เจ…เจคเฉ‡ เจธเจพเจฐเจฃเฉ€ เจฌเจฃเจพเจ‰เจฃเจพเฅค เจ…เจœเจฟเจนเจพ เจ•เจฐเจจ เจฆเฉ‡ เจ•เจˆ เจคเจฐเฉ€เจ•เฉ‡ เจนเจจ, เจชเจฐ เจธเจญ เจคเฉ‹เจ‚ เจธเจฐเจฒ เจนเฉˆ เจชเจนเจฟเจฒเจพเจ‚ เจกเฉ‡เจŸเจพเจธเฉˆเจŸ เจฌเจฃเจพ เจ•เฉ‡ เจ—เฉ‚เจ—เจฒ เจ•เจฒเจพเจ‰เจก เจ•เฉฐเจธเฉ‹เจฒ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจจเจพเฅค เจคเฉเจธเฉ€เจ‚ เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ‡ เจ•เจฆเจฎเจพเจ‚ เจฆเฉ€ เจชเจพเจฒเจฃเจพ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจฒเจฟเฉฐเจ• เจจเฉ‚เฉฐเจ‡เฉฑเจ• เจธเจ•เฉ€เจฎเจพ เจจเจพเจฒ เจ‡เฉฑเจ• เจธเจพเจฐเจฃเฉ€ เจฌเจฃเจพเจ‰เจฃ เจฒเจˆ. เจธเจพเจกเฉ€ เจฎเฉ‡เจœเจผ เจนเฉ‹เจตเฉ‡เจ—เฉ€ 7 เจ•เจพเจฒเจฎ, เจนเจฐเฉ‡เจ• เจ‰เจชเจญเฉ‹เจ—เจคเจพ เจฒเฉŒเจ— เจฆเฉ‡ เจญเจพเจ—เจพเจ‚ เจฆเฉ‡ เจ…เจจเฉเจธเจพเจฐเฉ€เฅค เจธเจนเฉ‚เจฒเจค เจฒเจˆ, เจ…เจธเฉ€เจ‚ เจธเจฎเจพเจ‚-เจธเจฅเจพเจจเจ• เจตเฉ‡เจฐเฉ€เจเจฌเจฒ เจจเฉ‚เฉฐ เจ›เฉฑเจก เจ•เฉ‡, เจธเจพเจฐเฉ‡ เจ•เจพเจฒเจฎเจพเจ‚ เจจเฉ‚เฉฐ เจธเจคเจฐ เจตเจœเฉ‹เจ‚ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจ•เจฐเจพเจ‚เจ—เฉ‡, เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจจเฉ‚เฉฐ เจ‰เจนเจจเจพเจ‚ เจตเฉ‡เจฐเฉ€เจเจฌเจฒเจพเจ‚ เจฆเฉ‡ เจ…เจจเฉเจธเจพเจฐ เจจเจพเจฎ เจฆเฉ‡เจตเจพเจ‚เจ—เฉ‡ เจœเฉ‹ เจ…เจธเฉ€เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจคเจฟเจ†เจฐ เจ•เฉ€เจคเฉ‡ เจธเจจเฅค เจธเจพเจกเฉ€ เจธเจพเจฐเจฃเฉ€ เจฆเจพ เจ–เจพเจ•เจพ เจšเจฟเฉฑเจคเจฐ 3 เจตเจฐเจ—เจพ เจฆเจฟเจ–เจพเจˆ เจฆเฉ‡เจฃเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆเฅค

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2
เจšเจฟเฉฑเจคเจฐ 3. เจŸเฉ‡เจฌเจฒ เจฒเฉ‡เจ†เจ‰เจŸ

เจ‰เจชเจญเฉ‹เจ—เจคเจพ เจฒเฉŒเจ— เจกเฉ‡เจŸเจพ เจชเฉเจฐเจ•เจพเจธเจผเจฟเจค เจ•เฉ€เจคเจพ เจœเจพ เจฐเจฟเจนเจพ เจนเฉˆ

Pub/Sub เจธเจพเจกเฉ€ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฆเจพ เจ‡เฉฑเจ• เจฎเจนเฉฑเจคเจตเจชเฉ‚เจฐเจจ เจนเจฟเฉฑเจธเจพ เจนเฉˆ เจ•เจฟเจ‰เจ‚เจ•เจฟ เจ‡เจน เจ•เจˆ เจธเฉเจคเฉฐเจคเจฐ เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจเจพเจ‚ เจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจฆเฉ‚เจœเฉ‡ เจจเจพเจฒ เจธเฉฐเจšเจพเจฐ เจ•เจฐเจจ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉˆเฅค เจ–เจพเจธ เจคเฉŒเจฐ 'เจคเฉ‡, เจ‡เจน เจ‡เฉฑเจ• เจตเจฟเจšเฉ‹เจฒเฉ‡ เจตเจœเฉ‹เจ‚ เจ•เฉฐเจฎ เจ•เจฐเจฆเจพ เจนเฉˆ เจœเฉ‹ เจธเจพเจจเฉ‚เฉฐ เจเจชเจฒเฉ€เจ•เฉ‡เจธเจผเจจเจพเจ‚ เจตเจฟเจšเจ•เจพเจฐ เจธเฉฐเจฆเฉ‡เจธเจผ เจญเฉ‡เจœเจฃ เจ…เจคเฉ‡ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจจ เจฆเฉ€ เจ‡เจœเจพเจœเจผเจค เจฆเจฟเฉฐเจฆเจพ เจนเฉˆเฅค เจธเจญ เจคเฉ‹เจ‚ เจชเจนเจฟเจฒเจพเจ‚ เจธเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจตเจฟเจธเจผเจพ เจฌเจฃเจพเจ‰เจฃ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆเฅค เจฌเจธ เจ•เฉฐเจธเฉ‹เจฒ เจตเจฟเฉฑเจš เจชเจฌ/เจธเจฌ 'เจคเฉ‡ เจœเจพเจ“ เจ…เจคเฉ‡ เจตเจฟเจธเจผเจพ เจฌเจฃเจพเจ“ 'เจคเฉ‡ เจ•เจฒเจฟเฉฑเจ• เจ•เจฐเฉ‹เฅค

เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเจพ เจ•เฉ‹เจก เจ‰เฉฑเจชเจฐ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจฒเฉŒเจ— เจกเฉ‡เจŸเจพ เจฌเจฃเจพเจ‰เจฃ เจฒเจˆ เจธเจพเจกเฉ€ เจธเจ•เฉเจฐเจฟเจชเจŸ เจจเฉ‚เฉฐ เจ•เจพเจฒ เจ•เจฐเจฆเจพ เจนเฉˆ เจ…เจคเฉ‡ เจซเจฟเจฐ เจฒเฉŒเจ—เจธ เจจเฉ‚เฉฐ Pub/Sub เจจเฉ‚เฉฐ เจœเฉ‹เฉœเจฆเจพ เจนเฉˆ เจ…เจคเฉ‡ เจญเฉ‡เจœเจฆเจพ เจนเฉˆเฅค เจธเจพเจจเฉ‚เฉฐ เจธเจฟเจฐเจซ เจ‡เฉฑเจ• เจšเฉ€เจœเจผ เจฌเจฃเจพเจ‰เจฃ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆ เจชเฉเจฐเจ•เจพเจธเจผเจ• เจ•เจฒเจพเจ‡เฉฐเจŸ, เจตเจฟเจงเฉ€ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจตเจฟเจธเจผเฉ‡ เจฆเจพ เจฎเจพเจฐเจ— เจฆเจฟเจ“ topic_path เจ…เจคเฉ‡ เจซเฉฐเจ•เจธเจผเจจ เจจเฉ‚เฉฐ เจ•เจพเจฒ เจ•เจฐเฉ‹ publish ั topic_path เจ…เจคเฉ‡ เจกเจพเจŸเจพเฅค เจ•เจฟเจฐเจชเจพ เจ•เจฐเจ•เฉ‡ เจงเจฟเจ†เจจ เจฆเจฟเจ“ เจ•เจฟ เจ…เจธเฉ€เจ‚ เจ†เจฏเจพเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ generate_log_line เจธเจพเจกเฉ€ เจธเจ•เฉเจฐเจฟเจชเจŸ เจคเฉ‹เจ‚ stream_logs, เจ‡เจธ เจฒเจˆ เจฏเจ•เฉ€เจจเฉ€ เจฌเจฃเจพเจ“ เจ•เจฟ เจ‡เจน เจซเจพเจˆเจฒเจพเจ‚ เจ‡เฉฑเจ•เฉ‹ เจซเฉ‹เจฒเจกเจฐ เจตเจฟเฉฑเจš เจนเจจ, เจจเจนเฉ€เจ‚ เจคเจพเจ‚ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจ†เจฏเจพเจค เจ—เจฒเจคเฉ€ เจฎเจฟเจฒเฉ‡เจ—เฉ€เฅค เจซเจฟเจฐ เจ…เจธเฉ€เจ‚ เจ‡เจธเจจเฉ‚เฉฐ เจ†เจชเจฃเฉ‡ เจ—เฉ‚เจ—เจฒ เจ•เฉฐเจธเฉ‹เจฒ เจฆเฉเจ†เจฐเจพ เจตเจฐเจค เจ•เฉ‡ เจšเจฒเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

เจœเจฟเจตเฉ‡เจ‚ เจนเฉ€ เจซเจพเจˆเจฒ เจšเฉฑเจฒเจฆเฉ€ เจนเฉˆ, เจ…เจธเฉ€เจ‚ เจ•เฉฐเจธเฉ‹เจฒ เจตเจฟเฉฑเจš เจฒเฉŒเจ— เจกเฉ‡เจŸเจพ เจฆเฉ‡ เจ†เจ‰เจŸเจชเฉเฉฑเจŸ เจจเฉ‚เฉฐ เจตเฉ‡เจ–เจฃ เจฆเฉ‡ เจฏเฉ‹เจ— เจนเฉ‹เจตเจพเจ‚เจ—เฉ‡, เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจนเฉ‡เจ เจพเจ‚ เจšเจฟเฉฑเจคเจฐ เจตเจฟเฉฑเจš เจฆเจฟเจ–เจพเจ‡เจ† เจ—เจฟเจ† เจนเฉˆเฅค เจ‡เจน เจธเจ•เฉเจฐเจฟเจชเจŸ เจ‰เจฆเฉ‹เจ‚ เจคเฉฑเจ• เจ•เฉฐเจฎ เจ•เจฐเฉ‡เจ—เฉ€ เจœเจฆเฉ‹เจ‚ เจคเฉฑเจ• เจ…เจธเฉ€เจ‚ เจ‡เจธเจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจจเจนเฉ€เจ‚ เจ•เจฐเจฆเฉ‡ CTRL + Cเจ‡เจธ เจจเฉ‚เฉฐ เจชเฉ‚เจฐเจพ เจ•เจฐเจจ เจฒเจˆ.

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2
เจšเจฟเฉฑเจคเจฐ 4. เจ†เจ‰เจŸเจชเฉเฉฑเจŸ publish_logs.py

เจธเจพเจกเจพ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจ•เฉ‹เจก เจฒเจฟเจ–เจฃเจพ

เจนเฉเจฃ เจœเจฆเฉ‹เจ‚ เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจธเจญ เจ•เฉเจ เจคเจฟเจ†เจฐ เจนเฉˆ, เจ…เจธเฉ€เจ‚ เจฎเจœเจผเฉ‡เจฆเจพเจฐ เจนเจฟเฉฑเจธเจพ เจธเจผเฉเจฐเฉ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ - เจฌเฉ€เจฎ เจ…เจคเฉ‡ เจชเจพเจˆเจฅเจจ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจธเจพเจกเฉ€ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจจเฉ‚เฉฐ เจ•เฉ‹เจกเจฟเฉฐเจ— เจ•เจฐเจจเจพเฅค เจ‡เฉฑเจ• เจฌเฉ€เจฎ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจฃ เจฒเจˆ, เจธเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจ†เจฌเจœเฉˆเจ•เจŸ (เจชเฉ€) เจฌเจฃเจพเจ‰เจฃ เจฆเฉ€ เจฒเฉ‹เฉœ เจนเฉˆ. เจ‡เฉฑเจ• เจตเจพเจฐ เจœเจฆเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจ†เจฌเจœเฉˆเจ•เจŸ เจฌเจฃเจพ เจฒเฉˆเจ‚เจฆเฉ‡ เจนเจพเจ‚, เจคเจพเจ‚ เจ…เจธเฉ€เจ‚ เจ†เจชเจฐเฉ‡เจŸเจฐ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจ‡เฉฑเจ• เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ เจ‡เฉฑเจ• เจ•เจˆ เจซเฉฐเจ•เจธเจผเจจเจพเจ‚ เจจเฉ‚เฉฐ เจฒเจพเจ—เฉ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ pipe (|). เจ†เจฎ เจคเฉŒเจฐ 'เจคเฉ‡, เจตเจฐเจ•เจซเจฒเฉ‹ เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ€ เจคเจธเจตเฉ€เจฐ เจฆเฉ€ เจคเจฐเฉเจนเจพเจ‚ เจฆเจฟเจ–เจพเจˆ เจฆเจฟเฉฐเจฆเจพ เจนเฉˆเฅค

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

เจธเจพเจกเฉ‡ เจ•เฉ‹เจก เจตเจฟเฉฑเจš, เจ…เจธเฉ€เจ‚ เจฆเฉ‹ เจ•เจธเจŸเจฎ เจซเฉฐเจ•เจธเจผเจจ เจฌเจฃเจพเจตเจพเจ‚เจ—เฉ‡เฅค เจซเฉฐเจ•เจธเจผเจจ regex_clean, เจœเฉ‹ เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจธเจ•เฉˆเจจ เจ•เจฐเจฆเจพ เจนเฉˆ เจ…เจคเฉ‡ เจซเฉฐเจ•เจธเจผเจจ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเฉ‹เจ PATTERNS เจธเฉ‚เจšเฉ€ เจฆเฉ‡ เจ…เจงเจพเจฐ เจคเฉ‡ เจธเฉฐเจฌเฉฐเจงเจฟเจค เจ•เจคเจพเจฐ เจจเฉ‚เฉฐ เจฎเฉเฉœ เจชเฉเจฐเจพเจชเจค เจ•เจฐเจฆเจพ เจนเฉˆ re.search. เจซเฉฐเจ•เจธเจผเจจ เจ•เจพเจฎเฉ‡ เจจเจพเจฒ เจตเฉฑเจ– เจ•เฉ€เจคเฉ€ เจธเจคเจฐ เจตเจพเจชเจธ เจ•เจฐเจฆเจพ เจนเฉˆเฅค เจœเฉ‡ เจคเฉเจธเฉ€เจ‚ เจจเจฟเจฏเจฎเจค เจธเจฎเฉ€เจ•เจฐเจจ เจฎเจพเจนเจฐ เจจเจนเฉ€เจ‚ เจนเฉ‹, เจคเจพเจ‚ เจฎเฉˆเจ‚ เจ‡เจธเจฆเฉ€ เจœเจพเจ‚เจš เจ•เจฐเจจ เจฆเฉ€ เจธเจฟเจซเจผเจพเจฐเจฟเจธเจผ เจ•เจฐเจฆเจพ เจนเจพเจ‚ เจŸเจฟเจŠเจŸเฉ‹เจฐเจฟเจ…เจฒ เจ…เจคเฉ‡ เจ•เฉ‹เจก เจฆเฉ€ เจœเจพเจ‚เจš เจ•เจฐเจจ เจฒเจˆ เจ‡เฉฑเจ• เจจเฉ‹เจŸเจชเฉˆเจก เจตเจฟเฉฑเจš เจ…เจญเจฟเจ†เจธ เจ•เจฐเฉ‹เฅค เจ‡เจธ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจ•เจธเจŸเจฎ ParDo เจซเฉฐเจ•เจธเจผเจจ เจจเฉ‚เฉฐ เจชเจฐเจฟเจญเจพเจธเจผเจฟเจค เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ เจœเจฟเจธ เจจเฉ‚เฉฐ เจ•เจฟเจนเจพ เจœเจพเจ‚เจฆเจพ เจนเฉˆ เจตเฉฐเจก, เจœเฉ‹ เจ•เจฟ เจธเจฎเจพเจจเจพเจ‚เจคเจฐ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจฒเจˆ เจฌเฉ€เจฎ เจŸเฉเจฐเจพเจ‚เจธเจซเจพเจฐเจฎ เจฆเฉ€ เจ‡เฉฑเจ• เจชเจฐเจฟเจตเจฐเจคเจจ เจนเฉˆเฅค เจชเจพเจˆเจฅเจจ เจตเจฟเฉฑเจš, เจ‡เจน เจ‡เฉฑเจ• เจ–เจพเจธ เจคเจฐเฉ€เจ•เฉ‡ เจจเจพเจฒ เจ•เฉ€เจคเจพ เจœเจพเจ‚เจฆเจพ เจนเฉˆ - เจธเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• เจ…เจœเจฟเจนเฉ€ เจ•เจฒเจพเจธ เจฌเจฃเจพเจ‰เจฃเฉ€ เจšเจพเจนเฉ€เจฆเฉ€ เจนเฉˆ เจœเฉ‹ DoFn เจฌเฉ€เจฎ เจ•เจฒเจพเจธ เจคเฉ‹เจ‚ เจชเฉเจฐเจพเจชเจค เจนเฉเฉฐเจฆเฉ€ เจนเฉˆเฅค เจธเจชเจฒเจฟเจŸ เจซเฉฐเจ•เจธเจผเจจ เจชเจฟเจ›เจฒเฉ‡ เจซเฉฐเจ•เจธเจผเจจ เจคเฉ‹เจ‚ เจชเจพเจฐเจธ เจ•เฉ€เจคเฉ€ เจ•เจคเจพเจฐ เจจเฉ‚เฉฐ เจฒเฉˆเจ‚เจฆเจพ เจนเฉˆ เจ…เจคเฉ‡ เจธเจพเจกเฉ€ BigQuery เจธเจพเจฐเจฃเฉ€ เจตเจฟเฉฑเจš เจ•เจพเจฒเจฎ เจจเจพเจฎเจพเจ‚ เจจเจพเจฒ เจธเฉฐเจฌเฉฐเจงเจฟเจค เจ•เฉเฉฐเจœเฉ€เจ†เจ‚ เจฆเฉ‡ เจจเจพเจฒ เจธเจผเจฌเจฆเจ•เฉ‹เจธเจผเจพเจ‚ เจฆเฉ€ เจ‡เฉฑเจ• เจธเฉ‚เจšเฉ€ เจตเจพเจชเจธ เจ•เจฐเจฆเจพ เจนเฉˆเฅค เจ‡เจธ เจซเฉฐเจ•เจธเจผเจจ เจฌเจพเจฐเฉ‡ เจจเฉ‹เจŸ เจ•เจฐเจจ เจฒเจˆ เจ•เฉเจ เจนเฉˆ: เจฎเฉˆเจจเฉ‚เฉฐ เจ†เจฏเจพเจค เจ•เจฐเจจเจพ เจชเจฟเจ† datetime เจ‡เจธ เจจเฉ‚เฉฐ เจ•เฉฐเจฎ เจ•เจฐเจจ เจฒเจˆ เจ‡เฉฑเจ• เจซเฉฐเจ•เจธเจผเจจ เจฆเฉ‡ เจ…เฉฐเจฆเจฐ. เจฎเฉˆเจจเฉ‚เฉฐ เจซเจพเจˆเจฒ เจฆเฉ€ เจธเจผเฉเจฐเฉ‚เจ†เจค เจตเจฟเฉฑเจš เจ‡เฉฑเจ• เจ†เจฏเจพเจค เจ—เจฒเจคเฉ€ เจฎเจฟเจฒ เจฐเจนเฉ€ เจธเฉ€, เจœเฉ‹ เจ•เจฟ เจ…เจœเฉ€เจฌ เจธเฉ€. เจ‡เจน เจธเฉ‚เจšเฉ€ เจซเจฟเจฐ เจซเฉฐเจ•เจธเจผเจจ เจจเฉ‚เฉฐ เจชเจพเจธ เจ•เฉ€เจคเฉ€ เจœเจพเจ‚เจฆเฉ€ เจนเฉˆ เจฐเจพเจˆเจŸ เจŸเฉ‚ เจฌเจฟเจ—เจ•เฉเจ†เจฐเฉ€, เจœเฉ‹ เจธเจฟเจฐเจซเจผ เจธเจพเจกเฉ‡ เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจธเจพเจฐเจฃเฉ€ เจตเจฟเฉฑเจš เจœเฉ‹เฉœเจฆเจพ เจนเฉˆเฅค เจฌเฉˆเจš เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจœเฉŒเจฌ เจ…เจคเฉ‡ เจธเจŸเฉเจฐเฉ€เจฎเจฟเฉฐเจ— เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจœเฉŒเจฌ เจฒเจˆ เจ•เฉ‹เจก เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเจพ เจ—เจฟเจ† เจนเฉˆเฅค เจฌเฉˆเจš เจ…เจคเฉ‡ เจธเจŸเฉเจฐเฉ€เจฎเจฟเฉฐเจ— เจ•เฉ‹เจก เจตเจฟเฉฑเจš เจธเจฟเจฐเจซ เจซเจฐเจ• เจ‡เจน เจนเฉˆ เจ•เจฟ เจฌเฉˆเจš เจตเจฟเฉฑเจš เจ…เจธเฉ€เจ‚ CSV เจจเฉ‚เฉฐ เจชเฉœเฉเจนเจฆเฉ‡ เจนเจพเจ‚ src_pathเจซเฉฐเจ•เจธเจผเจจ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเฉ‹เจ ReadFromText เจฌเฉ€เจฎ เจคเฉ‹เจ‚เฅค

เจฌเฉˆเจš เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจœเฉŒเจฌ (เจฌเฉˆเจš เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ—)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

เจธเจŸเฉเจฐเฉ€เจฎเจฟเฉฐเจ— เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจœเฉŒเจฌ (เจธเจŸเฉเจฐเฉ€เจฎ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ—)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจšเจฒเจพเจˆ เจœเจพ เจฐเจนเฉ€ เจนเฉˆ

เจ…เจธเฉ€เจ‚ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจจเฉ‚เฉฐ เจ•เจˆ เจตเฉฑเจ–-เจตเฉฑเจ– เจคเจฐเฉ€เจ•เจฟเจ†เจ‚ เจจเจพเจฒ เจšเจฒเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค เจœเฉ‡เจ•เจฐ เจ…เจธเฉ€เจ‚ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจนเจพเจ‚, เจคเจพเจ‚ เจ…เจธเฉ€เจ‚ เจ‡เจธเจจเฉ‚เฉฐ เจธเจฟเจฐเจซเจผ GCP เจตเจฟเฉฑเจš เจฐเจฟเจฎเฉ‹เจŸเจฒเฉ€ เจฒเฉŒเจ—เจ‡เจจ เจ•เจฐเจฆเฉ‡ เจนเฉ‹เจ เจ‡เฉฑเจ• เจŸเจฐเจฎเฉ€เจจเจฒ เจคเฉ‹เจ‚ เจธเจฅเจพเจจเจ• เจคเฉŒเจฐ 'เจคเฉ‡ เจšเจฒเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

เจนเจพเจฒเจพเจ‚เจ•เจฟ, เจ…เจธเฉ€เจ‚ เจ‡เจธเจจเฉ‚เฉฐ เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจšเจฒเจพเจ‰เจฃ เจœเจพ เจฐเจนเฉ‡ เจนเจพเจ‚เฅค เจ…เจธเฉ€เจ‚ เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ‡ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ‡ เจชเฉˆเจฐเจพเจฎเฉ€เจŸเจฐเจพเจ‚ เจจเฉ‚เฉฐ เจธเฉˆเฉฑเจŸ เจ•เจฐเจ•เฉ‡ เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ€ เจ•เจฎเจพเจ‚เจก เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจ•เฉ‡ เจ…เจœเจฟเจนเจพ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค

  • project โ€” เจคเฉเจนเจพเจกเฉ‡ GCP เจชเฉเจฐเฉ‹เจœเฉˆเจ•เจŸ เจฆเฉ€ IDเฅค
  • runner เจ‡เฉฑเจ• เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฆเฉŒเฉœเจพเจ• เจนเฉˆ เจœเฉ‹ เจคเฉเจนเจพเจกเฉ‡ เจชเฉเจฐเฉ‹เจ—เจฐเจพเจฎ เจฆเจพ เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจฃ เจ•เจฐเฉ‡เจ—เจพ เจ…เจคเฉ‡ เจคเฉเจนเจพเจกเฉ€ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฆเจพ เจจเจฟเจฐเจฎเจพเจฃ เจ•เจฐเฉ‡เจ—เจพเฅค เจ•เจฒเจพเจ‰เจก เจตเจฟเฉฑเจš เจšเจฒเจพเจ‰เจฃ เจฒเจˆ, เจคเฉเจนเจพเจจเฉ‚เฉฐ เจ‡เฉฑเจ• DataflowRunner เจจเจฟเจฐเจงเจพเจฐเจฟเจค เจ•เจฐเจจเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆเฅค
  • staging_location โ€” เจ•เฉฐเจฎ เจ•เจฐเจจ เจตเจพเจฒเฉ‡ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฐเจพเจ‚ เจฆเฉเจ†เจฐเจพ เจฒเฉ‹เฉœเฉ€เจ‚เจฆเฉ‡ เจ•เฉ‹เจก เจชเฉˆเจ•เฉ‡เจœเจพเจ‚ เจจเฉ‚เฉฐ เจ‡เฉฐเจกเฉˆเจ•เจธ เจ•เจฐเจจ เจฒเจˆ เจ•เจฒเจพเจ‰เจก เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจ•เจฒเจพเจ‰เจก เจธเจŸเฉ‹เจฐเฉ‡เจœ เจฆเจพ เจฎเจพเจฐเจ—เฅค
  • temp_location โ€” เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฆเฉ‡ เจšเฉฑเจฒเจฃ เจฆเฉŒเจฐเจพเจจ เจฌเจฃเจพเจˆเจ†เจ‚ เจ—เจˆเจ†เจ‚ เจ…เจธเจฅเจพเจˆ เจจเฉŒเจ•เจฐเฉ€เจ†เจ‚ เจฆเฉ€เจ†เจ‚ เจซเจพเจˆเจฒเจพเจ‚ เจจเฉ‚เฉฐ เจธเจŸเฉ‹เจฐ เจ•เจฐเจจ เจฒเจˆ เจ•เจฒเจพเจ‰เจก เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจ•เจฒเจพเจ‰เจก เจธเจŸเฉ‹เจฐเฉ‡เจœ เจฆเจพ เจฎเจพเจฐเจ—เฅค
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

เจœเจฆเฉ‹เจ‚ เจ‡เจน เจ•เจฎเจพเจ‚เจก เจšเฉฑเจฒ เจฐเจนเฉ€ เจนเฉˆ, เจ…เจธเฉ€เจ‚ เจ—เฉ‚เจ—เจฒ เจ•เฉฐเจธเฉ‹เจฒ เจตเจฟเฉฑเจš เจกเฉ‡เจŸเจพเจซเจฒเฉ‹ เจŸเฉˆเจฌ 'เจคเฉ‡ เจœเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจธเจพเจกเฉ€ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฆเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค เจœเจฆเฉ‹เจ‚ เจ…เจธเฉ€เจ‚ เจชเจพเจˆเจชเจฒเจพเจˆเจจ 'เจคเฉ‡ เจ•เจฒเจฟเฉฑเจ• เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚, เจคเจพเจ‚ เจธเจพเจจเฉ‚เฉฐ เจšเจฟเฉฑเจคเจฐ 4 เจฆเฉ‡ เจธเจฎเจพเจจ เจ•เฉเจ เจฆเจฟเจ–เจพเจˆ เจฆเฉ‡เจฃเจพ เจšเจพเจนเฉ€เจฆเจพ เจนเฉˆเฅค เจกเฉ€เจฌเฉฑเจ—เจฟเฉฐเจ— เจฆเฉ‡ เจ‰เจฆเฉ‡เจธเจผเจพเจ‚ เจฒเจˆ, เจตเจฟเจธเจคเฉเจฐเจฟเจค เจฒเฉŒเจ—เจธ เจจเฉ‚เฉฐ เจฆเฉ‡เจ–เจฃ เจฒเจˆ เจฒเฉŒเจ—เจธ เจ…เจคเฉ‡ เจซเจฟเจฐ เจธเจŸเฉˆเจ•เจกเฉเจฐเจพเจˆเจตเจฐ 'เจคเฉ‡ เจœเจพเจฃเจพ เจฌเจนเฉเจค เจฎเจฆเจฆเจ—เจพเจฐ เจนเฉ‹ เจธเจ•เจฆเจพ เจนเฉˆเฅค เจ‡เจธ เจจเฉ‡ เจ•เจˆ เจฎเจพเจฎเจฒเจฟเจ†เจ‚ เจตเจฟเฉฑเจš เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฆเฉ‡ เจฎเฉเฉฑเจฆเจฟเจ†เจ‚ เจจเฉ‚เฉฐ เจนเฉฑเจฒ เจ•เจฐเจจ เจตเจฟเฉฑเจš เจฎเฉ‡เจฐเฉ€ เจฎเจฆเจฆ เจ•เฉ€เจคเฉ€ เจนเฉˆเฅค

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2
เจšเจฟเฉฑเจคเจฐ 4: เจฌเฉ€เจฎ เจ•เจจเจตเฉ‡เจ…เจฐ

BigQuery เจตเจฟเฉฑเจš เจธเจพเจกเฉ‡ เจกเฉ‡เจŸเจพ เจคเฉฑเจ• เจชเจนเฉเฉฐเจš เจ•เจฐเฉ‹

เจ‡เจธ เจฒเจˆ, เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจชเจนเจฟเจฒเจพเจ‚ เจนเฉ€ เจ‡เฉฑเจ• เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจนเฉ‹เจฃเฉ€ เจšเจพเจนเฉ€เจฆเฉ€ เจนเฉˆ เจœเจฟเจธ เจตเจฟเฉฑเจš เจกเฉ‡เจŸเจพ เจธเจพเจกเฉ‡ เจŸเฉ‡เจฌเจฒ เจตเจฟเฉฑเจš เจตเจนเจฟ เจฐเจฟเจนเจพ เจนเฉˆเฅค เจ‡เจธเจฆเฉ€ เจœเจพเจ‚เจš เจ•เจฐเจจ เจฒเจˆ, เจ…เจธเฉ€เจ‚ BigQuery 'เจคเฉ‡ เจœเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจฆเฉ‡เจ– เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค เจนเฉ‡เจ เจพเจ‚ เจฆเจฟเฉฑเจคเฉ€ เจ•เจฎเจพเจ‚เจก เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจจ เจคเฉ‹เจ‚ เจฌเจพเจ…เจฆ เจคเฉเจนเจพเจจเฉ‚เฉฐ เจกเฉ‡เจŸเจพเจธเฉˆเจŸ เจฆเฉ€เจ†เจ‚ เจชเจนเจฟเจฒเฉ€เจ†เจ‚ เจ•เฉเจ เจ•เจคเจพเจฐเจพเจ‚ เจฆเฉ‡เจ–เจฃเฉ€เจ†เจ‚ เจšเจพเจนเฉ€เจฆเฉ€เจ†เจ‚ เจนเจจเฅค เจนเฉเจฃ เจœเจฆเฉ‹เจ‚ เจธเจพเจกเฉ‡ เจ•เฉ‹เจฒ BigQuery เจตเจฟเฉฑเจš เจธเจŸเฉ‹เจฐ เจ•เฉ€เจคเจพ เจกเจพเจŸเจพ เจนเฉˆ, เจ…เจธเฉ€เจ‚ เจนเฉ‹เจฐ เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจฃ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚, เจจเจพเจฒ เจนเฉ€ เจธเจนเจฟเจฏเฉ‹เจ—เฉ€เจ†เจ‚ เจจเจพเจฒ เจกเจพเจŸเจพ เจธเจพเจ‚เจเจพ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจ•เจพเจฐเฉ‹เจฌเจพเจฐเฉ€ เจธเจตเจพเจฒเจพเจ‚ เจฆเฉ‡ เจœเจตเจพเจฌ เจฆเฉ‡เจฃเจพ เจธเจผเฉเจฐเฉ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

เจ…เจธเฉ€เจ‚ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎ เจกเฉ‡เจŸเจพ เจชเฉเจฐเฉ‹เจธเฉˆเจธเจฟเฉฐเจ— เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจ‚เจฆเฉ‡ เจนเจพเจ‚เฅค เจญเจพเจ— 2
เจšเจฟเฉฑเจคเจฐ 5: BigQuery

เจธเจฟเฉฑเจŸเจพ

เจ…เจธเฉ€เจ‚ เจ†เจธ เจ•เจฐเจฆเฉ‡ เจนเจพเจ‚ เจ•เจฟ เจ‡เจน เจชเฉ‹เจธเจŸ เจ‡เฉฑเจ• เจธเจŸเฉเจฐเฉ€เจฎเจฟเฉฐเจ— เจกเฉ‡เจŸเจพ เจชเจพเจˆเจชเจฒเจพเจˆเจจ เจฌเจฃเจพเจ‰เจฃ เจฆเฉ‡ เจจเจพเจฒ-เจจเจพเจฒ เจกเฉ‡เจŸเจพ เจจเฉ‚เฉฐ เจตเจงเฉ‡เจฐเฉ‡ เจชเจนเฉเฉฐเจšเจฏเฉ‹เจ— เจฌเจฃเจพเจ‰เจฃ เจฆเฉ‡ เจคเจฐเฉ€เจ•เฉ‡ เจฒเฉฑเจญเจฃ เจฆเฉ€ เจ‡เฉฑเจ• เจ‰เจชเจฏเฉ‹เจ—เฉ€ เจ‰เจฆเจพเจนเจฐเจฃ เจตเจœเฉ‹เจ‚ เจ•เฉฐเจฎ เจ•เจฐเฉ‡เจ—เฉ€เฅค เจ‡เจธ เจซเจพเจฐเจฎเฉˆเจŸ เจตเจฟเฉฑเจš เจกเจพเจŸเจพ เจธเจŸเฉ‹เจฐ เจ•เจฐเจจ เจจเจพเจฒ เจธเจพเจจเฉ‚เฉฐ เจฌเจนเฉเจค เจธเจพเจฐเฉ‡ เจซเจพเจ‡เจฆเฉ‡ เจฎเจฟเจฒเจฆเฉ‡ เจนเจจเฅค เจนเฉเจฃ เจ…เจธเฉ€เจ‚ เจฎเจนเฉฑเจคเจตเจชเฉ‚เจฐเจจ เจธเจตเจพเจฒเจพเจ‚ เจฆเฉ‡ เจœเจตเจพเจฌ เจฆเฉ‡เจฃเจพ เจธเจผเฉเจฐเฉ‚ เจ•เจฐ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจœเจฟเจตเฉ‡เจ‚ เจ•เจฟ เจ•เจฟเฉฐเจจเฉ‡ เจฒเฉ‹เจ• เจธเจพเจกเฉ‡ เจ‰เจคเจชเจพเจฆ เจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ‡ เจนเจจ? เจ•เฉ€ เจคเฉเจนเจพเจกเจพ เจ‰เจชเจญเฉ‹เจ—เจคเจพ เจ…เจงเจพเจฐ เจธเจฎเฉ‡เจ‚ เจฆเฉ‡ เจจเจพเจฒ เจตเจง เจฐเจฟเจนเจพ เจนเฉˆ? เจ‰เจคเจชเจพเจฆ เจฆเฉ‡ เจ•เจฟเจนเฉœเฉ‡ เจชเจนเจฟเจฒเฉ‚เจ†เจ‚ เจจเจพเจฒ เจฒเฉ‹เจ• เจธเจญ เจคเฉ‹เจ‚ เจตเฉฑเจง เจ—เฉฑเจฒเจฌเจพเจค เจ•เจฐเจฆเฉ‡ เจนเจจ? เจ…เจคเฉ‡ เจ•เฉ€ เจ…เจœเจฟเจนเฉ€เจ†เจ‚ เจ—เจฒเจคเฉ€เจ†เจ‚ เจนเจจ เจœเจฟเฉฑเจฅเฉ‡ เจจเจนเฉ€เจ‚ เจนเฉ‹เจฃเฉ€เจ†เจ‚ เจšเจพเจนเฉ€เจฆเฉ€เจ†เจ‚ เจนเจจ? เจ‡เจน เจ‰เจน เจธเจตเจพเจฒ เจนเจจ เจœเฉ‹ เจธเฉฐเจธเจฅเจพ เจฒเจˆ เจฆเจฟเจฒเจšเจธเจชเฉ€ เจฆเฉ‡ เจนเฉ‹เจฃเจ—เฉ‡. เจ‡เจนเจจเจพเจ‚ เจธเจตเจพเจฒเจพเจ‚ เจฆเฉ‡ เจœเจตเจพเจฌเจพเจ‚ เจคเฉ‹เจ‚ เจ‰เจญเจฐเจจ เจตเจพเจฒเฉ€เจ†เจ‚ เจธเฉ‚เจเจพเจ‚ เจฆเฉ‡ เจ†เจงเจพเจฐ 'เจคเฉ‡, เจ…เจธเฉ€เจ‚ เจ‰เจคเจชเจพเจฆ เจจเฉ‚เฉฐ เจฌเจฟเจนเจคเจฐ เจฌเจฃเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚ เจ…เจคเฉ‡ เจ‰เจชเจญเฉ‹เจ—เจคเจพ เจฆเฉ€ เจธเจผเจฎเฉ‚เจฒเฉ€เจ…เจค เจตเจงเจพ เจธเจ•เจฆเฉ‡ เจนเจพเจ‚เฅค

เจฌเฉ€เจฎ เจ‡เจธ เจ•เจฟเจธเจฎ เจฆเฉ€ เจ•เจธเจฐเจค เจฒเจˆ เจ…เจธเจฒ เจตเจฟเฉฑเจš เจฒเจพเจญเจฆเจพเจ‡เจ• เจนเฉˆ เจ…เจคเฉ‡ เจ‡เจธ เจฆเฉ‡ เจจเจพเจฒ-เจจเจพเจฒ เจ•เจˆ เจนเฉ‹เจฐ เจฆเจฟเจฒเจšเจธเจช เจตเจฐเจคเฉ‹เจ‚ เจฆเฉ‡ เจ•เฉ‡เจธ เจตเฉ€ เจนเจจเฅค เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจคเฉเจธเฉ€เจ‚ เจฐเฉ€เจ…เจฒ เจŸเจพเจˆเจฎ เจตเจฟเฉฑเจš เจธเจŸเจพเจ• เจŸเจฟเฉฑเจ• เจกเฉ‡เจŸเจพ เจฆเจพ เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจฃ เจ•เจฐเจจเจพ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจนเฉ‹ เจ…เจคเฉ‡ เจตเจฟเจธเจผเจฒเฉ‡เจธเจผเจฃ เจฆเฉ‡ เจ…เจงเจพเจฐ เจคเฉ‡ เจตเจชเจพเจฐ เจ•เจฐเจจเจพ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจนเฉ‹, เจธเจผเจพเจ‡เจฆ เจคเฉเจนเจพเจกเฉ‡ เจ•เฉ‹เจฒ เจตเจพเจนเจจเจพเจ‚ เจคเฉ‹เจ‚ เจ†เจ‰เจฃ เจตเจพเจฒเฉ‡ เจธเฉˆเจ‚เจธเจฐ เจกเฉ‡เจŸเจพ เจนเจจ เจ…เจคเฉ‡ เจŸเฉเจฐเฉˆเจซเจฟเจ• เจชเฉฑเจงเจฐ เจฆเฉ€ เจ—เจฃเจจเจพ เจ•เจฐเจจเจพ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจนเฉ‹. เจคเฉเจธเฉ€เจ‚, เจ‰เจฆเจพเจนเจฐเจจ เจฒเจˆ, เจ‡เฉฑเจ• เจ—เฉ‡เจฎเจฟเฉฐเจ— เจ•เฉฐเจชเจจเฉ€ เจตเฉ€ เจนเฉ‹ เจธเจ•เจฆเฉ‡ เจนเฉ‹ เจœเฉ‹ เจ‰เจชเจญเฉ‹เจ—เจคเจพ เจกเฉ‡เจŸเจพ เจ‡เจ•เฉฑเจ เจพ เจ•เจฐเจฆเฉ€ เจนเฉˆ เจ…เจคเฉ‡ เจฎเฉเฉฑเจ– เจฎเฉˆเจŸเฉเจฐเจฟเจ•เจธ เจจเฉ‚เฉฐ เจŸเจฐเฉˆเจ• เจ•เจฐเจจ เจฒเจˆ เจกเฉˆเจธเจผเจฌเฉ‹เจฐเจก เจฌเจฃเจพเจ‰เจฃ เจฒเจˆ เจ‡เจธเจฆเฉ€ เจตเจฐเจคเฉ‹เจ‚ เจ•เจฐเจฆเฉ€ เจนเฉˆเฅค เจ เฉ€เจ• เจนเฉˆ, เจธเฉฑเจœเจฃ, เจ‡เจน เจ‡เฉฑเจ• เจนเฉ‹เจฐ เจชเฉ‹เจธเจŸ เจฒเจˆ เจ‡เฉฑเจ• เจตเจฟเจธเจผเจพ เจนเฉˆ, เจชเฉœเฉเจนเจจ เจฒเจˆ เจงเฉฐเจจเจตเจพเจฆ, เจ…เจคเฉ‡ เจ‰เจนเจจเจพเจ‚ เจฒเจˆ เจœเฉ‹ เจชเฉ‚เจฐเจพ เจ•เฉ‹เจก เจฆเฉ‡เจ–เจฃเจพ เจšเจพเจนเฉเฉฐเจฆเฉ‡ เจนเจจ, เจนเฉ‡เจ เจพเจ‚ เจฎเฉ‡เจฐเฉ‡ GitHub เจฆเจพ เจฒเจฟเฉฐเจ• เจนเฉˆเฅค

https://github.com/DFoly/User_log_pipeline

เจ‡เจน เจธเจญ เจนเฉˆ. เจญเจพเจ— เจชเจนเจฟเจฒเจพ เจชเฉœเฉเจนเฉ‹.

เจธเจฐเฉ‹เจค: www.habr.com

เจ‡เฉฑเจ• เจŸเจฟเฉฑเจชเจฃเฉ€ เจœเฉ‹เฉœเฉ‹