αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2

αžŸαž½αžŸαŸ’αžαžΈβ€‹αž’αŸ’αž“αž€β€‹αž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž‚αŸ’αž“αžΆαŸ” αž™αžΎαž„αž€αŸ†αž–αž»αž„αž…αŸ‚αž€αžšαŸ†αž›αŸ‚αž€αž€αžΆαžšαž”αž€αž”αŸ’αžšαŸ‚αž“αŸƒαž•αŸ’αž“αŸ‚αž€αž…αž»αž„αž€αŸ’αžšαŸ„αž™αž“αŸƒαž’αžαŸ’αžαž”αž‘ αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž”αžΆαž“αžšαŸ€αž”αž…αŸ†αž‡αžΆαž–αž·αžŸαŸαžŸαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžŸαž·αžŸαŸ’αžŸαž“αŸƒαžœαž‚αŸ’αž‚αžŸαž·αž€αŸ’αžŸαžΆαŸ” "αžœαž·αžŸαŸ’αžœαž€αžšαž‘αž·αž“αŸ’αž“αž“αŸαž™". αž’αŸ’αž“αž€αž’αžΆαž…αž’αžΆαž“αž•αŸ’αž“αŸ‚αž€αžŠαŸ†αž”αžΌαž„ αž“αŸ…αž‘αžΈαž“αŸαŸ‡.

Apache Beam αž“αž·αž„ DataFlow αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αž–αŸαž›αžœαŸαž›αžΆαž–αž·αž

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2

αž€αžΆαžšαžŠαŸ†αž‘αžΎαž„ Google Cloud

αž…αŸ†αžŽαžΆαŸ†αŸ– αžαŸ’αž‰αž»αŸ†αž”αžΆαž“αž”αŸ’αžšαžΎ Google Cloud Shell αžŠαžΎαž˜αŸ’αž”αžΈαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„ αž“αž·αž„αž”αŸ„αŸ‡αž•αŸ’αžŸαžΆαž™αž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“ αžŠαŸ„αž™αžŸαžΆαžšαžαŸ’αž‰αž»αŸ†αž˜αžΆαž“αž”αž‰αŸ’αž αžΆαž€αŸ’αž“αž»αž„αž€αžΆαžšαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αž“αŸ…αž€αŸ’αž“αž»αž„ Python 3αŸ” Google Cloud Shell αž”αŸ’αžšαžΎ Python 2 αžŠαŸ‚αž›αžŸαž˜αžŸαŸ’αžšαž”αž‡αžΆαž„αž‡αžΆαž˜αž½αž™ Apache Beam αŸ”

αžŠαžΎαž˜αŸ’αž”αžΈαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αž™αžΎαž„αžαŸ’αžšαžΌαžœαž‡αžΈαž€αž”αž“αŸ’αžαž·αž…αž‘αŸ…αž€αŸ’αž“αž»αž„αž€αžΆαžšαž€αŸ†αžŽαžαŸ‹αŸ” αžŸαž˜αŸ’αžšαžΆαž”αŸ‹β€‹αž’αŸ’αž“αž€β€‹αžŠαŸ‚αž›β€‹αž˜αž·αž“β€‹αž’αŸ’αž›αžΆαž”αŸ‹β€‹αž”αŸ’αžšαžΎ GCP αž–αžΈαž˜αž»αž“ αž’αŸ’αž“αž€β€‹αž“αžΉαž„β€‹αžαŸ’αžšαžΌαžœβ€‹αž’αŸ’αžœαžΎβ€‹αžαžΆαž˜β€‹αž‡αŸ†αž αžΆαž“ 6 αžαžΆαž„αž€αŸ’αžšαŸ„αž˜β€‹αžŠαŸ‚αž›β€‹αž”αžΆαž“β€‹αžšαŸ€αž”αžšαžΆαž”αŸ‹β€‹αž“αŸ…αž€αŸ’αž“αž»αž„β€‹αž“αŸαŸ‡αŸ” αž‘αŸ†αž–αŸαžš.

αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαž“αŸαŸ‡ αž™αžΎαž„αž“αžΉαž„αžαŸ’αžšαžΌαžœαž”αž„αŸ’αž αŸ„αŸ‡αžŸαŸ’αž‚αŸ’αžšαžΈαž”αžšαž”αžŸαŸ‹αž™αžΎαž„αž‘αŸ… Google Cloud Storage αž αžΎαž™αž…αž˜αŸ’αž›αž„αžœαžΆαž‘αŸ… Google Cloud Shel αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αž€αžΆαžšβ€‹αž•αŸ’αž‘αž»αž€β€‹αž‘αžΎαž„β€‹αž‘αŸ…β€‹αž€αž“αŸ’αž›αŸ‚αž„β€‹αž•αŸ’αž‘αž»αž€β€‹αž–αž–αž€β€‹αž‚αžΊβ€‹αž‡αžΆβ€‹αžšαžΏαž„β€‹αžαžΌαž…αžαžΆαž…β€‹αžŽαžΆαžŸαŸ‹ (αž’αžΆαž…β€‹αžšαž€β€‹αžƒαžΎαž‰β€‹αž€αžΆαžšβ€‹αž–αž·αž–αžŽαŸŒαž“αžΆ αž“αŸ…αž‘αžΈαž“αŸαŸ‡) αžŠαžΎαž˜αŸ’αž”αžΈαž…αž˜αŸ’αž›αž„αž―αž€αžŸαžΆαžšαžšαž”αžŸαŸ‹αž™αžΎαž„ αž™αžΎαž„αž’αžΆαž…αž”αžΎαž€ Google Cloud Shel αž–αžΈαžšαž”αžΆαžšαž§αž”αž€αžšαžŽαŸαžŠαŸ„αž™αž…αž»αž…αž›αžΎαžšαžΌαž”αžαŸ†αžŽαžΆαž„αž‘αžΈαž˜αž½αž™αž“αŸ…αžαžΆαž„αž†αŸ’αžœαŸαž„αž€αŸ’αž“αž»αž„αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 2 αžαžΆαž„αž€αŸ’αžšαŸ„αž˜αŸ”

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2
αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 2 αŸ”

αž–αžΆαž€αŸ’αž™β€‹αž”αž‰αŸ’αž‡αžΆβ€‹αžŠαŸ‚αž›β€‹αž™αžΎαž„β€‹αžαŸ’αžšαžΌαžœβ€‹αž€αžΆαžšβ€‹αž…αž˜αŸ’αž›αž„β€‹αž―αž€αžŸαžΆαžš αž“αž·αž„β€‹αžŠαŸ†αž‘αžΎαž„β€‹αž”αžŽαŸ’αžŽαžΆαž›αŸαž™β€‹αžŠαŸ‚αž›β€‹αžαŸ’αžšαžΌαžœβ€‹αž€αžΆαžšβ€‹αžαŸ’αžšαžΌαžœβ€‹αž”αžΆαž“β€‹αžšαžΆαž™β€‹αž”αž‰αŸ’αž‡αžΈβ€‹αžαžΆαž„β€‹αž€αŸ’αžšαŸ„αž˜αŸ”

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

αž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™ αž“αž·αž„αžαžΆαžšαžΆαž„αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ”

αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αž™αžΎαž„αž”αžΆαž“αž”αž‰αŸ’αž…αž”αŸ‹αžšαžΆαž›αŸ‹αž‡αŸ†αž αžΆαž“αžŠαŸ‚αž›αž‘αžΆαž€αŸ‹αž‘αž„αž“αžΉαž„αž€αžΆαžšαžŠαŸ†αž‘αžΎαž„αž“αŸ„αŸ‡ αžšαžΏαž„αž”αž“αŸ’αž‘αžΆαž”αŸ‹αžŠαŸ‚αž›αž™αžΎαž„αžαŸ’αžšαžΌαžœαž’αŸ’αžœαžΎαž‚αžΊαž”αž„αŸ’αž€αžΎαžαžŸαŸ†αžŽαž»αŸ†αž‘αž·αž“αŸ’αž“αž“αŸαž™ αž“αž·αž„αžαžΆαžšαžΆαž„αž“αŸ…αž€αŸ’αž“αž»αž„ BigQuery αŸ” αž˜αžΆαž“αžœαž·αž’αžΈαž‡αžΆαž…αŸ’αžšαžΎαž“αžŠαžΎαž˜αŸ’αž”αžΈαž’αŸ’αžœαžΎαžœαžΆ αž”αŸ‰αž»αž“αŸ’αžαŸ‚αžŸαžΆαž˜αž‰αŸ’αž‰αž”αŸ†αž•αž»αžαž‚αžΊαž”αŸ’αžšαžΎαž€αž»αž„αžŸαžΌαž› Google Cloud αžŠαŸ„αž™αž”αž„αŸ’αž€αžΎαžαžŸαŸ†αžŽαž»αŸ†αž‘αž·αž“αŸ’αž“αž“αŸαž™αž‡αžΆαž˜αž»αž“αžŸαž·αž“αŸ” αž’αŸ’αž“αž€αž’αžΆαž…αž’αž“αž»αžœαžαŸ’αžαžαžΆαž˜αž‡αŸ†αž αžΆαž“αžαžΆαž„αž€αŸ’αžšαŸ„αž˜ αžαŸ†αžŽαž—αŸ’αž‡αžΆαž”αŸ‹αžŠαžΎαž˜αŸ’αž”αžΈαž”αž„αŸ’αž€αžΎαžαžαžΆαžšαžΆαž„αž‡αžΆαž˜αž½αž™αž‚αŸ’αžšαŸ„αž„αž€αžΆαžšαžŽαŸαŸ” αžαž»αžšαž”αžŸαŸ‹αž™αžΎαž„αž“αžΉαž„αž˜αžΆαž“ 7 αž‡αž½αžšαžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž‚αŸ’αž“αžΆαž‘αŸ…αž“αžΉαž„αžŸαž˜αžΆαžŸαž’αžΆαžαž»αž“αŸƒαž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αž“αžΈαž˜αž½αž™αŸ—αŸ” αžŠαžΎαž˜αŸ’αž”αžΈαž—αžΆαž–αž„αžΆαž™αžŸαŸ’αžšαž½αž› αž™αžΎαž„αž“αžΉαž„αž€αŸ†αžŽαžαŸ‹αž‡αž½αžšαžˆαžšαž‘αžΆαŸ†αž„αž’αžŸαŸ‹αž‡αžΆαžαŸ’αžŸαŸ‚αž’αž€αŸ’αžŸαžš αž›αžΎαž€αž›αŸ‚αž„αžαŸ‚αž’αžαŸαžšαžαžΆαž˜αž€αžΆαž›αž€αŸ†αžŽαžαŸ‹ αž αžΎαž™αžŠαžΆαž€αŸ‹αžˆαŸ’αž˜αŸ„αŸ‡αž–αž½αž€αžœαžΆαžαžΆαž˜αž’αžαŸαžšαžŠαŸ‚αž›αž™αžΎαž„αž”αžΆαž“αž”αž„αŸ’αž€αžΎαžαž˜αž»αž“αž“αŸαŸ‡αŸ” αž”αŸ’αž›αž„αŸ‹αž“αŸƒαžαžΆαžšαžΆαž„αžšαž”αžŸαŸ‹αž™αžΎαž„αž‚αž½αžšαžαŸ‚αž˜αžΎαž›αž‘αŸ…αžŠαžΌαž…αž€αŸ’αž“αž»αž„αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 3 αŸ”

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2
αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 3. αž”αŸ’αž›αž„αŸ‹αžαžΆαžšαžΆαž„

αž€αžΆαžšαž”αŸ„αŸ‡αž–αž»αž˜αŸ’αž–αž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹

Pub/Sub αž‚αžΊαž‡αžΆαž’αžΆαžαž»αž•αŸ’αžŸαŸ†αžŠαŸαžŸαŸ†αžαžΆαž“αŸ‹αž“αŸƒαž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αžšαž”αžŸαŸ‹αž™αžΎαž„ αž–αŸ’αžšαŸ„αŸ‡αžœαžΆαž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαž―αž€αžšαžΆαž‡αŸ’αž™αž‡αžΆαž…αŸ’αžšαžΎαž“αž‘αžΆαž€αŸ‹αž‘αž„αž‚αŸ’αž“αžΆαž‘αŸ…αžœαž·αž‰αž‘αŸ…αž˜αž€αŸ” αž‡αžΆαž–αž·αžŸαŸαžŸ αžœαžΆαž’αŸ’αžœαžΎαž€αžΆαžšαž‡αžΆαž’αž“αŸ’αžαžšαž€αžΆαžšαžΈαžŠαŸ‚αž›αž’αž“αž»αž‰αŸ’αž‰αžΆαžαž±αŸ’αž™αž™αžΎαž„αž•αŸ’αž‰αžΎ αž“αž·αž„αž‘αž‘αž½αž›αžŸαžΆαžšαžšαžœαžΆαž„αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαŸ” αžšαžΏαž„αžŠαŸ†αž”αžΌαž„αžŠαŸ‚αž›αž™αžΎαž„αžαŸ’αžšαžΌαžœαž’αŸ’αžœαžΎαž‚αžΊαž”αž„αŸ’αž€αžΎαžαž”αŸ’αžšαž’αžΆαž“αž”αž‘αŸ” αž‚αŸ’αžšαžΆαž“αŸ‹αžαŸ‚αž…αžΌαž›αž‘αŸ…αž€αžΆαž“αŸ‹ Pub/Sub αž“αŸ…αž€αŸ’αž“αž»αž„αž€αž»αž„αžŸαžΌαž› αž αžΎαž™αž…αž»αž… αž”αž„αŸ’αž€αžΎαžαž”αŸ’αžšαž’αžΆαž“αž”αž‘αŸ”

αž€αžΌαžŠαžαžΆαž„αž€αŸ’αžšαŸ„αž˜αž αŸ…αžŸαŸ’αž‚αŸ’αžšαžΈαž”αžšαž”αžŸαŸ‹αž™αžΎαž„αžŠαžΎαž˜αŸ’αž”αžΈαž”αž„αŸ’αž€αžΎαžαž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αžŠαŸ‚αž›αž”αžΆαž“αž€αŸ†αžŽαžαŸ‹αžαžΆαž„αž›αžΎ αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž—αŸ’αž‡αžΆαž”αŸ‹ αž“αž·αž„αž•αŸ’αž‰αžΎαž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αž‘αŸ… Pub/Sub αŸ” αžšαžΏαž„αžαŸ‚αž˜αž½αž™αž‚αžαŸ‹αžŠαŸ‚αž›αž™αžΎαž„αžαŸ’αžšαžΌαžœαž’αŸ’αžœαžΎαž‚αžΊαž”αž„αŸ’αž€αžΎαžαžœαžαŸ’αžαž»αž˜αž½αž™αŸ” αž’αŸ’αž“αž€αž”αŸ„αŸ‡αž–αž»αž˜αŸ’αž–αž•αŸ’αžŸαžΆαž™αž”αž‰αŸ’αž‡αžΆαž€αŸ‹αž•αŸ’αž›αžΌαžœαž‘αŸ…αž€αžΆαž“αŸ‹αž”αŸ’αžšαž’αžΆαž“αž”αž‘αžŠαŸ„αž™αž”αŸ’αžšαžΎαžœαž·αž’αžΈαžŸαžΆαžŸαŸ’αžαŸ’αžš topic_path αž αžΎαž™αž αŸ…αž˜αž»αžαž„αžΆαžš publish с topic_path αž“αž·αž„αž‘αž·αž“αŸ’αž“αž“αŸαž™αŸ” αžŸαžΌαž˜αž…αŸ†αžŽαžΆαŸ†αžαžΆαž™αžΎαž„αž“αžΆαŸ†αž…αžΌαž› generate_log_line αž–αžΈαžŸαŸ’αž‚αŸ’αžšαžΈαž”αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” stream_logsαžŠαžΌαž…αŸ’αž“αŸαŸ‡αžαŸ’αžšαžΌαžœαž”αŸ’αžšαžΆαž€αžŠαžαžΆαž―αž€αžŸαžΆαžšαž‘αžΆαŸ†αž„αž“αŸαŸ‡αžŸαŸ’αžαž·αžαž“αŸ…αž€αŸ’αž“αž»αž„αžαžαžαŸ‚αž˜αž½αž™ αž”αžΎαž˜αž·αž“αžŠαžΌαž…αŸ’αž“αŸαŸ‡αž‘αŸ αž’αŸ’αž“αž€αž“αžΉαž„αž‘αž‘αž½αž›αž”αžΆαž“αž€αŸ†αž αž»αžŸαž€αŸ’αž“αž»αž„αž€αžΆαžšαž“αžΆαŸ†αž…αžΌαž›αŸ” αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž™αžΎαž„αž’αžΆαž…αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžœαžΆαžαžΆαž˜αžšαž™αŸˆαž€αž»αž„αžŸαžΌαž› Google αžšαž”αžŸαŸ‹αž™αžΎαž„αžŠαŸ„αž™αž”αŸ’αžšαžΎαŸ–

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

αžŠαžšαžΆαž”αžŽαžΆαž―αž€αžŸαžΆαžšαžŠαŸ†αžŽαžΎαžšαž€αžΆαžš αž™αžΎαž„αž“αžΉαž„αž’αžΆαž…αžƒαžΎαž‰αž›αž‘αŸ’αž’αž•αž›αž“αŸƒαž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αž‘αŸ…αž€αžΆαž“αŸ‹αž€αž»αž„αžŸαžΌαž› αžŠαžΌαž…αž”αž„αŸ’αž αžΆαž‰αž€αŸ’αž“αž»αž„αžšαžΌαž”αžαžΆαž„αž€αŸ’αžšαŸ„αž˜αŸ” αžŸαŸ’αž‚αŸ’αžšαžΈαž”αž“αŸαŸ‡αž“αžΉαž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžŠαžšαžΆαž”αžŽαžΆαž™αžΎαž„αž˜αž·αž“αž”αŸ’αžšαžΎ CTRL + CαžŠαžΎαž˜αŸ’αž”αžΈαž”αŸ†αž–αŸαž‰αžœαžΆαŸ”

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2
αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 4. αž‘αž·αž“αŸ’αž“αž•αž› publish_logs.py

αž€αžΆαžšαžŸαžšαžŸαŸαžšαž€αžΌαžŠαž”αŸ†αž–αž„αŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ”

αž₯αž‘αžΌαžœαž“αŸαŸ‡ αž™αžΎαž„αž”αžΆαž“αžšαŸ€αž”αž…αŸ†αž’αŸ’αžœαžΈαŸ—αž‚αŸ’αžšαž”αŸ‹αž™αŸ‰αžΆαž„αž αžΎαž™ αž™αžΎαž„αž’αžΆαž…αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž•αŸ’αž“αŸ‚αž€αžŠαŸαžšαžΈαž€αžšαžΆαž™ αž–αŸ„αž›αž‚αžΊαž€αžΆαžšαžŸαžšαžŸαŸαžšαž€αžΌαžŠαž”αŸ†αž–αž„αŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„αžŠαŸ„αž™αž”αŸ’αžšαžΎ Beam αž“αž·αž„ Python αŸ” αžŠαžΎαž˜αŸ’αž”αžΈαž”αž„αŸ’αž€αžΎαž Beam pipeline αž™αžΎαž„αžαŸ’αžšαžΌαžœαž”αž„αŸ’αž€αžΎαž pipeline object (p)αŸ” αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αž™αžΎαž„αž”αžΆαž“αž”αž„αŸ’αž€αžΎαžαžœαžαŸ’αžαž»αž”αŸ†αž–αž„αŸ‹αž˜αž½αž™ αž™αžΎαž„αž’αžΆαž…αž’αž“αž»αžœαžαŸ’αžαž˜αž»αžαž„αžΆαžšαž…αŸ’αžšαžΎαž“αž˜αž½αž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαž˜αž½αž™αž•αŸ’αžŸαŸαž„αž‘αŸ€αžαžŠαŸ„αž™αž”αŸ’αžšαžΎαž”αŸ’αžšαžαž·αž”αžαŸ’αžαž·αž€αžš pipe (|). αž‡αžΆαž‘αžΌαž‘αŸ…αž›αŸ†αž αžΌαžšαž€αžΆαžšαž„αžΆαžšαž˜αžΎαž›αž‘αŸ…αžŠαžΌαž…αžšαžΌαž”αž—αžΆαž–αžαžΆαž„αž€αŸ’αžšαŸ„αž˜αŸ”

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžΌαžŠαžšαž”αžŸαŸ‹αž™αžΎαž„ αž™αžΎαž„αž“αžΉαž„αž”αž„αŸ’αž€αžΎαžαž˜αž»αžαž„αžΆαžšαž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“αž–αžΈαžšαŸ” αž˜αž»αžαž„αžΆαžš regex_cleanαžŠαŸ‚αž›αžŸαŸ’αž€αŸ‚αž“αž‘αž·αž“αŸ’αž“αž“αŸαž™ αž“αž·αž„αž‘αžΆαž‰αž™αž€αž‡αž½αžšαžŠαŸαž€αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž‚αŸ’αž“αžΆαžŠαŸ„αž™αž•αŸ’αž’αŸ‚αž€αž›αžΎαž”αž‰αŸ’αž‡αžΈ PATTERNS αžŠαŸ„αž™αž”αŸ’αžšαžΎαž˜αž»αžαž„αžΆαžš re.search. αž’αž“αž»αž‚αž˜αž“αŸβ€‹αžαŸ’αžšαž‘αž”αŸ‹β€‹αžαŸ’αžŸαŸ‚αž’αž€αŸ’αžŸαžšβ€‹αžŠαŸ‚αž›αž”αŸ†αž”αŸ‚αž€αžŠαŸ„αž™β€‹αžŸαž‰αŸ’αž‰αžΆαž€αŸ’αž”αŸ€αžŸαŸ” αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž’αŸ’αž“αž€αž˜αž·αž“αž˜αŸ‚αž“αž‡αžΆαž’αŸ’αž“αž€αž‡αŸ†αž“αžΆαž‰αžαžΆαž„αž€αžΆαžšαž”αž‰αŸ’αž…αŸαž‰αž˜αžαž·αž’αž˜αŸ’αž˜αžαžΆαž‘αŸ αžαŸ’αž‰αž»αŸ†αžŸαžΌαž˜αžŽαŸ‚αž“αžΆαŸ†αž±αŸ’αž™αž–αž·αž“αž·αžαŸ’αž™αž˜αžΎαž›αžšαžΏαž„αž“αŸαŸ‡ αž€αžΆαžšαž”αž„αŸ’αžšαŸ€αž“ αž αžΎαž™αž’αž“αž»αžœαžαŸ’αžαž“αŸ…αž€αŸ’αž“αž»αž„ notepad αžŠαžΎαž˜αŸ’αž”αžΈαž–αž·αž“αž·αžαŸ’αž™αž˜αžΎαž›αž€αžΌαžŠαŸ” αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαž“αŸαŸ‡αž™αžΎαž„αž€αŸ†αžŽαžαŸ‹αž˜αž»αžαž„αžΆαžš ParDo αž•αŸ’αž‘αžΆαž›αŸ‹αžαŸ’αž›αž½αž“αž αŸ…αžαžΆ αž–αž»αŸ‡αžŠαŸ‚αž›αž‡αžΆαž”αŸ†αžšαŸ‚αž”αŸ†αžšαž½αž›αž“αŸƒαž€αžΆαžšαž•αŸ’αž›αžΆαžŸαŸ‹αž”αŸ’αžαžΌαžš Beam αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αŸ‰αžΆαžšαŸ‰αžΆαž‘αŸ‚αž›αŸ” αž“αŸ…αž€αŸ’αž“αž»αž„ Python αž“αŸαŸ‡αžαŸ’αžšαžΌαžœαž”αžΆαž“αž’αŸ’αžœαžΎαžαžΆαž˜αžšαž”αŸ€αž”αž–αž·αžŸαŸαžŸαž˜αž½αž™ - αž™αžΎαž„αžαŸ’αžšαžΌαžœαž”αž„αŸ’αž€αžΎαž class αžŠαŸ‚αž›αž‘αž‘αž½αž›αž˜αžšαžαž€αž–αžΈ DoFn Beam class αŸ” αž˜αž»αžαž„αžΆαžšαž”αŸ†αž”αŸ‚αž€αž™αž€αž‡αž½αžšαžŠαŸ‚αž›αž”αžΆαž“αž‰αŸ‚αž€αž…αŸαž‰αž–αžΈαž˜αž»αžαž„αžΆαžšαž˜αž»αž“ αž αžΎαž™αžαŸ’αžšαž‘αž”αŸ‹αž”αž‰αŸ’αž‡αžΈαžœαž…αž“αžΆαž“αž»αž€αŸ’αžšαž˜αžŠαŸ‚αž›αž˜αžΆαž“αž‚αŸ’αžšαžΆαž”αŸ‹αž…αž»αž…αžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž‚αŸ’αž“αžΆαž“αžΉαž„αžˆαŸ’αž˜αŸ„αŸ‡αž‡αž½αžšαžˆαžšαž“αŸ…αž€αŸ’αž“αž»αž„αžαžΆαžšαžΆαž„ BigQuery αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αž˜αžΆαž“αž’αŸ’αžœαžΈαžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž€αžαŸ‹αžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž’αŸ†αž–αžΈαž˜αž»αžαž„αžΆαžšαž“αŸαŸ‡αŸ– αžαŸ’αž‰αž»αŸ†αžαŸ’αžšαžΌαžœαž“αžΆαŸ†αž…αžΌαž› datetime αž“αŸ…αžαžΆαž„αž€αŸ’αž“αž»αž„αž˜αž»αžαž„αžΆαžšαžŠαžΎαž˜αŸ’αž”αžΈαž’αŸ’αžœαžΎαž±αŸ’αž™αžœαžΆαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαŸ” αžαŸ’αž‰αž»αŸ†β€‹αž‘αž‘αž½αž›β€‹αž”αžΆαž“β€‹αž€αŸ†αž αž»αžŸβ€‹αž€αžΆαžšβ€‹αž“αžΆαŸ†β€‹αž…αžΌαž›β€‹αž“αŸ…β€‹αžŠαžΎαž˜β€‹αž―αž€αžŸαžΆαžšβ€‹αžŠαŸ‚αž›β€‹αž…αž˜αŸ’αž›αŸ‚αž€αŸ” αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž”αž‰αŸ’αž‡αžΈαž“αŸαŸ‡αžαŸ’αžšαžΌαžœαž”αžΆαž“αž”αž‰αŸ’αž‡αžΌαž“αž‘αŸ…αž˜αž»αžαž„αžΆαžš WriteToBigQueryαžŠαŸ‚αž›αž‚αŸ’αžšαžΆαž“αŸ‹αžαŸ‚αž”αž“αŸ’αžαŸ‚αž˜αž‘αž·αž“αŸ’αž“αž“αŸαž™αžšαž”αžŸαŸ‹αž™αžΎαž„αž‘αŸ…αž€αŸ’αž“αž»αž„αžαžΆαžšαžΆαž„αŸ” αž€αžΌαžŠαžŸαž˜αŸ’αžšαžΆαž”αŸ‹ Batch DataFlow Job αž“αž·αž„ Streaming DataFlow Job αžαŸ’αžšαžΌαžœαž”αžΆαž“αž•αŸ’αžαž›αŸ‹αž±αŸ’αž™αžαžΆαž„αž€αŸ’αžšαŸ„αž˜αŸ” αž—αžΆαž–αžαž»αžŸαž‚αŸ’αž“αžΆαžαŸ‚αž˜αž½αž™αž‚αžαŸ‹αžšαžœαžΆαž„ batch αž“αž·αž„ streaming code αž‚αžΊαžαžΆαž“αŸ…αž€αŸ’αž“αž»αž„ batch αž™αžΎαž„αž’αžΆαž“ CSV αž–αžΈ src_pathαžŠαŸ„αž™αž”αŸ’αžšαžΎαž˜αž»αžαž„αžΆαžš ReadFromText αž–αžΈ Beam αŸ”

Batch DataFlow Job (αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‡αžΆαž”αžΆαž…αŸ‹)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

αž€αžΆαžšαž„αžΆαžšαžŸαŸ’αž‘αŸ’αžšαžΈαž˜ DataFlow (αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžŸαŸ’αž‘αŸ’αžšαžΈαž˜)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

αž€αžΆαžšαž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž§αž”αž€αžšαžŽαŸαž”αž‰αŸ’αž‡αžΌαž“

αž™αžΎαž„αž’αžΆαž…αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž”αŸ†αž–αž„αŸ‹αžαžΆαž˜αžœαž·αž’αžΈαž•αŸ’αžŸαŸαž„αŸ—αž‚αŸ’αž“αžΆαž‡αžΆαž…αŸ’αžšαžΎαž“αŸ” αž”αŸ’αžšαžŸαž·αž“αž”αžΎαž™αžΎαž„αž…αž„αŸ‹αž”αžΆαž“ αž™αžΎαž„αž’αžΆαž…αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžœαžΆαž“αŸ…αž€αŸ’αž“αž»αž„αž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž–αžΈαžŸαŸ’αžαžΆαž“αžΈαž™αž˜αž½αž™ αžαžŽαŸˆαž–αŸαž›αžŠαŸ‚αž›αž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„ GCP αž–αžΈαž…αž˜αŸ’αž„αžΆαž™αŸ”

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

αž‘αŸ„αŸ‡αž™αŸ‰αžΆαž„αžŽαžΆαž€αŸαžŠαŸ„αž™ αž™αžΎαž„αž“αžΉαž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαžœαžΆαžŠαŸ„αž™αž”αŸ’αžšαžΎ DataFlow αŸ” αž™αžΎαž„αž’αžΆαž…αž’αŸ’αžœαžΎαžœαžΆαžŠαŸ„αž™αž”αŸ’αžšαžΎαž–αžΆαž€αŸ’αž™αž”αž‰αŸ’αž‡αžΆαžαžΆαž„αž€αŸ’αžšαŸ„αž˜αžŠαŸ„αž™αž€αŸ†αžŽαžαŸ‹αž”αŸ‰αžΆαžšαŸ‰αžΆαž˜αŸ‰αŸ‚αžαŸ’αžšαžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž€αžΆαžšαžŠαžΌαž…αžαžΆαž„αž€αŸ’αžšαŸ„αž˜αŸ”

  • project - αž›αŸαžαžŸαž˜αŸ’αž‚αžΆαž›αŸ‹αž‚αž˜αŸ’αžšαŸ„αž„ GCP αžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αŸ”
  • runner αž‚αžΊαž‡αžΆαž’αŸ’αž“αž€αžšαžαŸ‹αž”αŸ†αž–αž„αŸ‹αžŠαŸ‚αž›αž“αžΉαž„αžœαž·αž—αžΆαž‚αž€αž˜αŸ’αž˜αžœαž·αž’αžΈαžšαž”αžŸαŸ‹αž’αŸ’αž“αž€ αž“αž·αž„αžŸαžΆαž„αžŸαž„αŸ‹αž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αŸ” αžŠαžΎαž˜αŸ’αž”αžΈαžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž€αŸ’αž“αž»αž„αž–αž–αž€ αž’αŸ’αž“αž€αžαŸ’αžšαžΌαžœαžαŸ‚αž”αž‰αŸ’αž‡αžΆαž€αŸ‹ DataflowRunner αŸ”
  • staging_location - αž•αŸ’αž›αžΌαžœαž‘αŸ…αž€αžΆαž“αŸ‹ Cloud Dataflow cloud storage αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž”αž„αŸ’αž€αžΎαžαž›αž·αž”αž·αž€αŸ’αžšαž˜αž€αž‰αŸ’αž…αž”αŸ‹αž€αžΌαžŠαžŠαŸ‚αž›αžαŸ’αžšαžΌαžœαž€αžΆαžšαžŠαŸ„αž™ processors αžŠαŸ‚αž›αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž€αžΆαžšαž„αžΆαžšαŸ”
  • temp_location β€” αž•αŸ’αž›αžΌαžœαž‘αŸ…αž€αžΆαž“αŸ‹ Cloud Dataflow cloud storage αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αžšαž€αŸ’αžŸαžΆαž‘αž»αž€αž―αž€αžŸαžΆαžšαž€αžΆαžšαž„αžΆαžšαž”αžŽαŸ’αžŠαŸ„αŸ‡αž’αžΆαžŸαž“αŸ’αž“αžŠαŸ‚αž›αž”αžΆαž“αž”αž„αŸ’αž€αžΎαžαžαžŽαŸˆαž–αŸαž›αžŠαŸ‚αž›αž”αŸ†αž–αž„αŸ‹αž€αŸ†αž–αž»αž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαŸ”
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

αžαžŽαŸˆαž–αŸαž›αžŠαŸ‚αž›αž–αžΆαž€αŸ’αž™αž”αž‰αŸ’αž‡αžΆαž“αŸαŸ‡αž€αŸ†αž–αž»αž„αžŠαŸ†αžŽαžΎαžšαž€αžΆαžš αž™αžΎαž„αž’αžΆαž…αž…αžΌαž›αž‘αŸ…αž€αžΆαž“αŸ‹αž•αŸ’αž‘αžΆαŸ†αž„ DataFlow αž“αŸ…αž€αŸ’αž“αž»αž„αž€αž»αž„αžŸαžΌαž› Google αž αžΎαž™αž˜αžΎαž›αž”αŸ†αž–αž„αŸ‹αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αž“αŸ…αž–αŸαž›αžŠαŸ‚αž›αž™αžΎαž„αž…αž»αž…αž›αžΎαž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„ αž™αžΎαž„αž‚αž½αžšαžαŸ‚αžƒαžΎαž‰αž’αŸ’αžœαžΈαžŠαŸ‚αž›αžŸαŸ’αžšαžŠαŸ€αž„αž“αžΉαž„αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 4αŸ” αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž‚αŸ„αž›αž”αŸ†αžŽαž„αž”αŸ†αž”αžΆαžαŸ‹αž€αŸ†αž αž»αžŸ αžœαžΆαž’αžΆαž…αž˜αžΆαž“αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαžαŸ’αž›αžΆαŸ†αž„αžŽαžΆαžŸαŸ‹αž€αŸ’αž“αž»αž„αž€αžΆαžšαž…αžΌαž›αž‘αŸ…αž€αžΆαž“αŸ‹ Logs αž αžΎαž™αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž˜αž€αž‘αŸ…αž€αžΆαž“αŸ‹ Stackdriver αžŠαžΎαž˜αŸ’αž”αžΈαž˜αžΎαž›αž€αŸ†αžŽαžαŸ‹αž αŸαžαž»αž›αž˜αŸ’αž’αž·αžαŸ” αž“αŸαŸ‡αž”αžΆαž“αž‡αž½αž™αžαŸ’αž‰αž»αŸ†αžŠαŸ„αŸ‡αžŸαŸ’αžšαžΆαž™αž”αž‰αŸ’αž αžΆαž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αž“αŸ…αž€αŸ’αž“αž»αž„αž€αžšαžŽαžΈαž˜αž½αž™αž…αŸ†αž“αž½αž“αŸ”

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2
αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 4: αž§αž”αž€αžšαžŽαŸαž”αž‰αŸ’αž‡αžΌαž“αž’αŸ’αž“αžΉαž˜

αž…αžΌαž›αž”αŸ’αžšαžΎαž‘αž·αž“αŸ’αž“αž“αŸαž™αžšαž”αžŸαŸ‹αž™αžΎαž„αž“αŸ…αž€αŸ’αž“αž»αž„ BigQuery

αžŠαžΌαž…αŸ’αž“αŸαŸ‡ αž™αžΎαž„αž‚αž½αžšαžαŸ‚αž˜αžΆαž“αž”αŸ†αž–αž„αŸ‹αž”αž„αŸ’αž αžΌαžšαž”αŸ’αžšαŸαž„αžŠαŸ‚αž›αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‡αžΆαž˜αž½αž™αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ‚αž›αž αžΌαžšαž…αžΌαž›αž‘αŸ…αž€αŸ’αž“αž»αž„αžαžΆαžšαžΆαž„αžšαž”αžŸαŸ‹αž™αžΎαž„αŸ” αžŠαžΎαž˜αŸ’αž”αžΈαžŸαžΆαž€αž›αŸ’αž”αž„αž“αŸαŸ‡ αž™αžΎαž„αž’αžΆαž…αž…αžΌαž›αž‘αŸ…αž€αžΆαž“αŸ‹ BigQuery αž αžΎαž™αž˜αžΎαž›αž‘αž·αž“αŸ’αž“αž“αŸαž™αŸ” αž”αž“αŸ’αž‘αžΆαž”αŸ‹αž–αžΈαž”αŸ’αžšαžΎαž–αžΆαž€αŸ’αž™αž”αž‰αŸ’αž‡αžΆαžαžΆαž„αž€αŸ’αžšαŸ„αž˜ αž’αŸ’αž“αž€αž‚αž½αžšαžαŸ‚αžƒαžΎαž‰αž‡αž½αžšαž–αžΈαžšαž”αžΈαžŠαŸ†αž”αžΌαž„αž“αŸƒαžŸαŸ†αžŽαž»αŸ†αž‘αž·αž“αŸ’αž“αž“αŸαž™αŸ” αž₯αž‘αžΌαžœαž“αŸαŸ‡αž™αžΎαž„αž˜αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŠαŸ‚αž›αž”αžΆαž“αžšαž€αŸ’αžŸαžΆαž‘αž»αž€αž“αŸ…αž€αŸ’αž“αž»αž„ BigQuery αž™αžΎαž„αž’αžΆαž…αž’αŸ’αžœαžΎαž€αžΆαžšαžœαž·αž—αžΆαž‚αž”αž“αŸ’αžαŸ‚αž˜ αž€αŸαžŠαžΌαž…αž‡αžΆαž…αŸ‚αž€αžšαŸ†αž›αŸ‚αž€αž‘αž·αž“αŸ’αž“αž“αŸαž™αž‡αžΆαž˜αž½αž™αžŸαž αžŸαŸαžœαž·αž€ αž αžΎαž™αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž†αŸ’αž›αžΎαž™αžŸαŸ†αžŽαž½αžšαž’αžΆαž‡αžΈαžœαž€αž˜αŸ’αž˜αŸ”

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

αž™αžΎαž„αž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αžŠαŸ†αžŽαžΎαžšαž€αžΆαžšαž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαŸ’αž‘αŸ’αžšαžΈαž˜αŸ” αž•αŸ’αž“αŸ‚αž€αž‘αžΈ 2
αžšαžΌαž”αž—αžΆαž–αž‘αžΈ 5αŸ– BigQuery

αžŸαŸαž…αž€αŸ’αžαžΈαžŸαž“αŸ’αž“αž·αžŠαŸ’αž‹αžΆαž“

αž™αžΎαž„αžŸαž„αŸ’αžƒαžΉαž˜αžαžΆαž€αžΆαžšαž”αž„αŸ’αž αŸ„αŸ‡αž“αŸαŸ‡αžŠαžΎαžšαžαž½αž‡αžΆαž§αž‘αžΆαž αžšαžŽαŸαžŠαŸαž˜αžΆαž“αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαž“αŸƒαž€αžΆαžšαž”αž„αŸ’αž€αžΎαžαž”αŸ†αž–αž„αŸ‹αž”αž‰αŸ’αž‡αžΌαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™ αž€αŸαžŠαžΌαž…αž‡αžΆαž€αžΆαžšαžŸαŸ’αžœαŸ‚αž„αžšαž€αžœαž·αž’αžΈαž’αŸ’αžœαžΎαž±αŸ’αž™αž‘αž·αž“αŸ’αž“αž“αŸαž™αž’αžΆαž…αž…αžΌαž›αž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αž”αžΆαž“αž€αžΆαž“αŸ‹αžαŸ‚αž…αŸ’αžšαžΎαž“αŸ” αž€αžΆαžšαžšαž€αŸ’αžŸαžΆαž‘αž»αž€αž‘αž·αž“αŸ’αž“αž“αŸαž™αž€αŸ’αž“αž»αž„αž‘αž˜αŸ’αžšαž„αŸ‹αž“αŸαŸ‡αž•αŸ’αžαž›αŸ‹αž±αŸ’αž™αž™αžΎαž„αž“αžΌαžœαž’αžαŸ’αžαž”αŸ’αžšαž™αŸ„αž‡αž“αŸαž‡αžΆαž…αŸ’αžšαžΎαž“αŸ” αž₯αž‘αžΌαžœαž“αŸαŸ‡αž™αžΎαž„αž’αžΆαž…αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž†αŸ’αž›αžΎαž™αžŸαŸ†αžŽαž½αžšαžŸαŸ†αžαžΆαž“αŸ‹αŸ—αžŠαžΌαž…αž‡αžΆαžαžΎαž˜αžΆαž“αž˜αž“αž»αžŸαŸ’αžŸαž”αŸ‰αž»αž“αŸ’αž˜αžΆαž“αž“αžΆαž€αŸ‹αžŠαŸ‚αž›αž”αŸ’αžšαžΎαž•αž›αž·αžαž•αž›αžšαž”αžŸαŸ‹αž™αžΎαž„? αžαžΎαž˜αžΌαž›αžŠαŸ’αž‹αžΆαž“αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž€αžΎαž“αž‘αžΎαž„αžαžΆαž˜αž–αŸαž›αžœαŸαž›αžΆαžŠαŸ‚αžšαž¬αž‘αŸ? αžαžΎαž‘αž·αžŠαŸ’αž‹αž—αžΆαž–αž’αŸ’αžœαžΈαžαŸ’αž›αŸ‡αž“αŸƒαž•αž›αž·αžαž•αž›αžŠαŸ‚αž›αž˜αž“αž»αžŸαŸ’αžŸαž‘αžΆαž€αŸ‹αž‘αž„αž‚αŸ’αž“αžΆαž…αŸ’αžšαžΎαž“αž”αŸ†αž•αž»αž? αž αžΎαž™αž˜αžΆαž“αž€αŸ†αž αž»αžŸαžαŸ’αžšαž„αŸ‹αžŽαžΆαžŠαŸ‚αž›αž˜αž·αž“αž‚αž½αžšαž˜αžΆαž“? αž‘αžΆαŸ†αž„αž“αŸαŸ‡αž‚αžΊαž‡αžΆαžŸαŸ†αžŽαž½αžšαžŠαŸ‚αž›αž“αžΉαž„αž…αžΆαž”αŸ‹αž’αžΆαžšαž˜αŸ’αž˜αžŽαŸαž…αŸ†αž–αŸ„αŸ‡αž’αž„αŸ’αž‚αž€αžΆαžšαŸ” αžŠαŸ„αž™αž•αŸ’αž’αŸ‚αž€αž›αžΎαž€αžΆαžšαž™αž›αŸ‹αžŠαžΉαž„αžŠαŸ‚αž›αž•αž»αžŸαž…αŸαž‰αž–αžΈαž…αž˜αŸ’αž›αžΎαž™αž…αŸ†αž–αŸ„αŸ‡αžŸαŸ†αžŽαž½αžšαž‘αžΆαŸ†αž„αž“αŸαŸ‡ αž™αžΎαž„αž’αžΆαž…αž€αŸ‚αž›αž˜αŸ’αž’αž•αž›αž·αžαž•αž› αž“αž·αž„αž”αž„αŸ’αž€αžΎαž“αž€αžΆαžšαž…αžΌαž›αžšαž½αž˜αžšαž”αžŸαŸ‹αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αŸ”

Beam αž–αž·αžαž‡αžΆαž˜αžΆαž“αž”αŸ’αžšαž™αŸ„αž‡αž“αŸαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž›αŸ†αž αžΆαžαŸ‹αž”αŸ’αžšαž—αŸαž‘αž“αŸαŸ‡ αž αžΎαž™αž˜αžΆαž“αž€αžšαžŽαžΈαž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹αž‚αž½αžšαž±αŸ’αž™αž…αžΆαž”αŸ‹αž’αžΆαžšαž˜αŸ’αž˜αžŽαŸαž˜αž½αž™αž…αŸ†αž“αž½αž“αž‘αŸ€αžαž•αž„αžŠαŸ‚αžšαŸ” αž§αž‘αžΆαž αžšαžŽαŸ αž’αŸ’αž“αž€αž”αŸ’αžšαž αŸ‚αž›αž‡αžΆαž…αž„αŸ‹αžœαž·αž—αžΆαž‚αž‘αž·αž“αŸ’αž“αž“αŸαž™αžŸαž‰αŸ’αž‰αžΆαž’αžΈαž€αž€αŸ’αž“αž»αž„αž–αŸαž›αžœαŸαž›αžΆαž‡αžΆαž€αŸ‹αžŸαŸ’αžαŸ‚αž„ αž“αž·αž„αž’αŸ’αžœαžΎαž–αžΆαžŽαž·αž‡αŸ’αž‡αž€αž˜αŸ’αž˜αžŠαŸ„αž™αž•αŸ’αž’αŸ‚αž€αž›αžΎαž€αžΆαžšαžœαž·αž—αžΆαž‚ αž”αŸ’αžšαž αŸ‚αž›αž‡αžΆαž’αŸ’αž“αž€αž˜αžΆαž“αž‘αž·αž“αŸ’αž“αž“αŸαž™αž§αž”αž€αžšαžŽαŸαž…αžΆαž”αŸ‹αžŸαž‰αŸ’αž‰αžΆαž˜αž€αž–αžΈαž™αžΆαž“αž™αž“αŸ’αž αž αžΎαž™αž…αž„αŸ‹αž‚αžŽαž“αžΆαž€αžΆαžšαž‚αžŽαž“αžΆαž€αž˜αŸ’αžšαž·αžαž…αžšαžΆαž…αžšαžŽαŸαŸ” αž§αž‘αžΆαž αžšαžŽαŸ αž’αŸ’αž“αž€αž€αŸαž’αžΆαž…αž‡αžΆαž€αŸ’αžšαž»αž˜αž αŸŠαž»αž“αž αŸ’αž‚αŸαž˜αžŠαŸ‚αž›αž”αŸ’αžšαž˜αžΌαž›αž‘αž·αž“αŸ’αž“αž“αŸαž™αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαž”αŸ’αžšαžΆαžŸαŸ‹ αž αžΎαž™αž”αŸ’αžšαžΎαžœαžΆαžŠαžΎαž˜αŸ’αž”αžΈαž”αž„αŸ’αž€αžΎαžαž•αŸ’αž‘αžΆαŸ†αž„αž‚αŸ’αžšαž”αŸ‹αž‚αŸ’αžšαž„αžŠαžΎαž˜αŸ’αž”αžΈαžαžΆαž˜αžŠαžΆαž“αž˜αŸ‰αŸ‚αžαŸ’αžšαžŸαŸ†αžαžΆαž“αŸ‹αŸ—αŸ” ធូខេ αžŸαž»αž—αžΆαž–αž“αžΆαžšαžΈ αž“αŸαŸ‡αž‡αžΆαž”αŸ’αžšαž’αžΆαž“αž”αž‘αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž”αŸ’αžšαž€αžΆαžŸαž˜αž½αž™αž‘αŸ€αž αž’αžšαž‚αž»αžŽαžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž€αžΆαžšαž’αžΆαž“ αž αžΎαž™αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž’αŸ’αž“αž€αž…αž„αŸ‹αž˜αžΎαž›αž€αžΌαžŠαž–αŸαž‰ αžαžΆαž„αž€αŸ’αžšαŸ„αž˜αž“αŸαŸ‡αž‡αžΆαžαŸ†αžŽαž—αŸ’αž‡αžΆαž”αŸ‹αž‘αŸ…αž€αžΆαž“αŸ‹ GitHub αžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ†αŸ”

https://github.com/DFoly/User_log_pipeline

αž’αžŸαŸ‹αž αžΎαž™αŸ” αž’αžΆαž“αž—αžΆαž‚αž˜αž½αž™αŸ”.

αž”αŸ’αžšαž—αž–: www.habr.com

αž”αž“αŸ’αžαŸ‚αž˜αž˜αžαž·αž™αŸ„αž”αž›αŸ‹