āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2

āĻšāĻžāĻ‡ āĻ¸āĻŦ. āĻ†āĻŽāĻ°āĻž āĻ¨āĻŋāĻŦāĻ¨ā§āĻ§ā§‡āĻ° āĻšā§‚āĻĄāĻŧāĻžāĻ¨ā§āĻ¤ āĻ…āĻ‚āĻļā§‡āĻ° āĻ…āĻ¨ā§āĻŦāĻžāĻĻ āĻļā§‡āĻ¯āĻŧāĻžāĻ° āĻ•āĻ°āĻ›āĻŋ, āĻ¯āĻž āĻŦāĻŋāĻļā§‡āĻˇāĻ­āĻžāĻŦā§‡ āĻ•ā§‹āĻ°ā§āĻ¸ā§‡āĻ° āĻļāĻŋāĻ•ā§āĻˇāĻžāĻ°ā§āĻĨā§€āĻĻā§‡āĻ° āĻœāĻ¨ā§āĻ¯ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻž āĻšāĻ¯āĻŧā§‡āĻ›ā§‡āĨ¤ āĻĄā§‡āĻŸāĻž āĻ‡āĻžā§āĻœāĻŋāĻ¨āĻŋāĻ¯āĻŧāĻžāĻ°. āĻ†āĻĒāĻ¨āĻŋ āĻĒā§āĻ°āĻĨāĻŽ āĻ…āĻ‚āĻļ āĻĒāĻĄāĻŧāĻ¤ā§‡ āĻĒāĻžāĻ°ā§‡āĻ¨ āĻāĻ–āĻžāĻ¨ā§‡.

āĻ°āĻŋāĻ¯āĻŧā§‡āĻ˛-āĻŸāĻžāĻ‡āĻŽ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ā§‡āĻ° āĻœāĻ¨ā§āĻ¯ Apache Beam āĻāĻŦāĻ‚ DataFlow

āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2

Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻ¸ā§‡āĻŸ āĻ†āĻĒ āĻ•āĻ°āĻž āĻšāĻšā§āĻ›ā§‡

āĻĻā§āĻ°āĻˇā§āĻŸāĻŦā§āĻ¯: āĻ†āĻŽāĻŋ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻšāĻžāĻ˛āĻžāĻ¤ā§‡ āĻāĻŦāĻ‚ āĻ•āĻžāĻ¸ā§āĻŸāĻŽ āĻ˛āĻ— āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ•āĻžāĻļ āĻ•āĻ°āĻ¤ā§‡ Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻļā§‡āĻ˛ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡āĻ›āĻŋ āĻ•āĻžāĻ°āĻŖ āĻĒāĻžāĻ‡āĻĨāĻ¨ 3-āĻ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻšāĻžāĻ˛āĻžāĻ¤ā§‡ āĻ†āĻŽāĻžāĻ° āĻ¸āĻŽāĻ¸ā§āĻ¯āĻž āĻšāĻšā§āĻ›āĻŋāĻ˛ā§ˇ Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻļā§‡āĻ˛ āĻĒāĻžāĻ‡āĻĨāĻ¨ 2 āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡, āĻ¯āĻž Apache Beam-āĻāĻ° āĻ¸āĻžāĻĨā§‡ āĻ†āĻ°āĻ“ āĻ¸āĻžāĻŽāĻžā§āĻœāĻ¸ā§āĻ¯āĻĒā§‚āĻ°ā§āĻŖā§ˇ

āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻļā§āĻ°ā§ āĻ•āĻ°āĻ¤ā§‡, āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ¸ā§‡āĻŸāĻŋāĻ‚āĻ¸ā§‡ āĻāĻ•āĻŸā§ āĻ–āĻ¨āĻ¨ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡āĨ¤ āĻ†āĻĒāĻ¨āĻžāĻ°āĻž āĻ¯āĻžāĻ°āĻž āĻ†āĻ—ā§‡ GCP āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡āĻ¨āĻ¨āĻŋ, āĻ¤āĻžāĻĻā§‡āĻ° āĻœāĻ¨ā§āĻ¯ āĻ†āĻĒāĻ¨āĻžāĻ•ā§‡ āĻāĻ¤ā§‡ āĻ‰āĻ˛ā§āĻ˛ā§‡āĻ–āĻŋāĻ¤ āĻ¨āĻŋāĻŽā§āĻ¨āĻ˛āĻŋāĻ–āĻŋāĻ¤ 6āĻŸāĻŋ āĻ§āĻžāĻĒ āĻ…āĻ¨ā§āĻ¸āĻ°āĻŖ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡ āĻĒā§ƒāĻˇā§āĻ āĻž.

āĻāĻ° āĻĒāĻ°ā§‡, āĻ†āĻŽāĻžāĻĻā§‡āĻ° Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻ¸ā§āĻŸā§‹āĻ°ā§‡āĻœā§‡ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ¸ā§āĻ•ā§āĻ°āĻŋāĻĒā§āĻŸāĻ—ā§āĻ˛āĻŋ āĻ†āĻĒāĻ˛ā§‹āĻĄ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡ āĻāĻŦāĻ‚ āĻ¸ā§‡āĻ—ā§āĻ˛āĻŋāĻ•ā§‡ āĻ†āĻŽāĻžāĻĻā§‡āĻ° Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻļā§‡āĻ˛ā§‡ āĻ…āĻ¨ā§āĻ˛āĻŋāĻĒāĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡āĨ¤ āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻ¸ā§āĻŸā§‹āĻ°ā§‡āĻœā§‡ āĻ†āĻĒāĻ˛ā§‹āĻĄ āĻ•āĻ°āĻž āĻŦā§‡āĻļ āĻ¤ā§āĻšā§āĻ› (āĻāĻ•āĻŸāĻŋ āĻŦāĻ°ā§āĻŖāĻ¨āĻž āĻĒāĻžāĻ“āĻ¯āĻŧāĻž āĻ¯āĻžāĻŦā§‡ āĻāĻ–āĻžāĻ¨ā§‡) āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĢāĻžāĻ‡āĻ˛ āĻ•āĻĒāĻŋ āĻ•āĻ°āĻ¤ā§‡, āĻ†āĻŽāĻ°āĻž āĻŸā§āĻ˛āĻŦāĻžāĻ° āĻĨā§‡āĻ•ā§‡ Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻļā§‡āĻ˛ āĻ–ā§āĻ˛āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ āĻ¨ā§€āĻšā§‡āĻ° āĻšāĻŋāĻ¤ā§āĻ° 2-āĻ āĻŦāĻžāĻŽ āĻĻāĻŋāĻ•ā§‡āĻ° āĻĒā§āĻ°āĻĨāĻŽ āĻ†āĻ‡āĻ•āĻ¨ā§‡ āĻ•ā§āĻ˛āĻŋāĻ• āĻ•āĻ°ā§‡āĨ¤

āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2
2 āĻšāĻŋāĻ¤ā§āĻ°

āĻĢāĻžāĻ‡āĻ˛āĻ—ā§āĻ˛āĻŋ āĻ…āĻ¨ā§āĻ˛āĻŋāĻĒāĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻāĻŦāĻ‚ āĻĒā§āĻ°āĻ¯āĻŧā§‹āĻœāĻ¨ā§€āĻ¯āĻŧ āĻ˛āĻžāĻ‡āĻŦā§āĻ°ā§‡āĻ°āĻŋāĻ—ā§āĻ˛āĻŋ āĻ‡āĻ¨āĻ¸ā§āĻŸāĻ˛ āĻ•āĻ°āĻžāĻ° āĻœāĻ¨ā§āĻ¯ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ¯ā§‡ āĻ•āĻŽāĻžāĻ¨ā§āĻĄāĻ—ā§āĻ˛āĻŋ āĻĒā§āĻ°āĻ¯āĻŧā§‹āĻœāĻ¨ āĻ¤āĻž āĻ¨ā§€āĻšā§‡ āĻ¤āĻžāĻ˛āĻŋāĻ•āĻžāĻ­ā§āĻ•ā§āĻ¤ āĻ•āĻ°āĻž āĻšāĻ¯āĻŧā§‡āĻ›ā§‡āĨ¤

# Copy file from cloud storage
gsutil cp gs://<YOUR-BUCKET>/ * .
sudo pip install apache-beam[gcp] oauth2client==3.0.0
sudo pip install -U pip
sudo pip install Faker==1.0.2
# Environment variables
BUCKET=<YOUR-BUCKET>
PROJECT=<YOUR-PROJECT>

āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĄāĻžāĻŸāĻžāĻŦā§‡āĻ¸ āĻāĻŦāĻ‚ āĻŸā§‡āĻŦāĻŋāĻ˛ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻž āĻšāĻšā§āĻ›ā§‡

āĻāĻ•āĻŦāĻžāĻ° āĻ†āĻŽāĻ°āĻž āĻ¸ā§‡āĻŸāĻ†āĻĒ āĻ¸āĻŽā§āĻĒāĻ°ā§āĻ•āĻŋāĻ¤ āĻ¸āĻŽāĻ¸ā§āĻ¤ āĻĒāĻĻāĻ•ā§āĻˇā§‡āĻĒāĻ—ā§āĻ˛āĻŋ āĻ¸āĻŽā§āĻĒāĻ¨ā§āĻ¨ āĻ•āĻ°āĻžāĻ° āĻĒāĻ°ā§‡, āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒāĻ°āĻŦāĻ°ā§āĻ¤ā§€ āĻœāĻŋāĻ¨āĻŋāĻ¸āĻŸāĻŋ BigQuery-āĻ āĻāĻ•āĻŸāĻŋ āĻĄā§‡āĻŸāĻžāĻ¸ā§‡āĻŸ āĻāĻŦāĻ‚ āĻŸā§‡āĻŦāĻŋāĻ˛ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡ā§ˇ āĻāĻŸāĻŋ āĻ•āĻ°āĻžāĻ° āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āĻ¨ āĻ‰āĻĒāĻžāĻ¯āĻŧ āĻ°āĻ¯āĻŧā§‡āĻ›ā§‡, āĻ¤āĻŦā§‡ āĻ¸āĻŦāĻšā§‡āĻ¯āĻŧā§‡ āĻ¸āĻšāĻœ āĻšāĻ˛ āĻĒā§āĻ°āĻĨāĻŽā§‡ āĻāĻ•āĻŸāĻŋ āĻĄā§‡āĻŸāĻžāĻ¸ā§‡āĻŸ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°ā§‡ Google āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻ•āĻ¨āĻ¸ā§‹āĻ˛ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°āĻžā§ˇ āĻ†āĻĒāĻ¨āĻŋ āĻ¨ā§€āĻšā§‡āĻ° āĻĒāĻĻāĻ•ā§āĻˇā§‡āĻĒāĻ—ā§āĻ˛āĻŋ āĻ…āĻ¨ā§āĻ¸āĻ°āĻŖ āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°ā§‡āĻ¨ āĻ˛āĻŋāĻ‚āĻ•āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻ•āĻŋāĻŽāĻž āĻ¸āĻš āĻāĻ•āĻŸāĻŋ āĻŸā§‡āĻŦāĻŋāĻ˛ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡āĨ¤ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻŸā§‡āĻŦāĻŋāĻ˛ āĻĨāĻžāĻ•āĻŦā§‡ 7āĻŸāĻŋ āĻ•āĻ˛āĻžāĻŽ, āĻĒā§āĻ°āĻ¤āĻŋāĻŸāĻŋ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ°āĻ•āĻžāĻ°ā§€ āĻ˛āĻ—ā§‡āĻ° āĻ‰āĻĒāĻžāĻĻāĻžāĻ¨ā§‡āĻ° āĻ¸āĻžāĻĨā§‡ āĻ¸āĻ‚āĻļā§āĻ˛āĻŋāĻˇā§āĻŸāĨ¤ āĻ¸ā§āĻŦāĻŋāĻ§āĻžāĻ° āĻœāĻ¨ā§āĻ¯, āĻ†āĻŽāĻ°āĻž āĻŸāĻžāĻ‡āĻŽāĻ˛ā§‹āĻ•āĻžāĻ˛ āĻ­ā§‡āĻ°āĻŋāĻ¯āĻŧā§‡āĻŦāĻ˛ āĻŦā§āĻ¯āĻ¤ā§€āĻ¤ āĻ¸āĻŽāĻ¸ā§āĻ¤ āĻ•āĻ˛āĻžāĻŽāĻ•ā§‡ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻ‚ āĻšāĻŋāĻ¸āĻžāĻŦā§‡ āĻ¸āĻ‚āĻœā§āĻžāĻžāĻ¯āĻŧāĻŋāĻ¤ āĻ•āĻ°āĻŦ āĻāĻŦāĻ‚ āĻ†āĻŽāĻ°āĻž āĻ†āĻ—ā§‡ āĻ¯ā§‡ āĻ­ā§‡āĻ°āĻŋāĻ¯āĻŧā§‡āĻŦāĻ˛ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°ā§‡āĻ›āĻŋ āĻ¸ā§‡ āĻ…āĻ¨ā§āĻ¸āĻžāĻ°ā§‡ āĻ¤āĻžāĻĻā§‡āĻ° āĻ¨āĻžāĻŽ āĻĻā§‡āĻŦāĨ¤ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻŸā§‡āĻŦāĻŋāĻ˛ā§‡āĻ° āĻŦāĻŋāĻ¨ā§āĻ¯āĻžāĻ¸ āĻšāĻŋāĻ¤ā§āĻ° 3 āĻāĻ° āĻŽāĻ¤ āĻšāĻ“āĻ¯āĻŧāĻž āĻ‰āĻšāĻŋāĻ¤āĨ¤

āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2
āĻšāĻŋāĻ¤ā§āĻ° 3. āĻŸā§‡āĻŦāĻŋāĻ˛ āĻ˛ā§‡āĻ†āĻ‰āĻŸ

āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ°āĻ•āĻžāĻ°ā§€āĻ° āĻ˛āĻ— āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ•āĻžāĻļ āĻ•āĻ°āĻž āĻšāĻšā§āĻ›ā§‡

āĻĒāĻžāĻŦ/āĻ¸āĻžāĻŦ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ā§‡āĻ° āĻāĻ•āĻŸāĻŋ āĻ—ā§āĻ°ā§āĻ¤ā§āĻŦāĻĒā§‚āĻ°ā§āĻŖ āĻ‰āĻĒāĻžāĻĻāĻžāĻ¨ āĻ•āĻžāĻ°āĻŖ āĻāĻŸāĻŋ āĻāĻ•āĻžāĻ§āĻŋāĻ• āĻ¸ā§āĻŦāĻžāĻ§ā§€āĻ¨ āĻ…ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻ•ā§‡āĻļāĻ¨āĻ•ā§‡ āĻāĻ•ā§‡ āĻ…āĻĒāĻ°ā§‡āĻ° āĻ¸āĻžāĻĨā§‡ āĻ¯ā§‹āĻ—āĻžāĻ¯ā§‹āĻ— āĻ•āĻ°āĻ¤ā§‡ āĻĻā§‡āĻ¯āĻŧāĨ¤ āĻŦāĻŋāĻļā§‡āĻˇāĻ¤, āĻāĻŸāĻŋ āĻāĻ•āĻŸāĻŋ āĻŽāĻ§ā§āĻ¯āĻ¸ā§āĻĨāĻ¤āĻžāĻ•āĻžāĻ°ā§€ āĻšāĻŋāĻ¸āĻžāĻŦā§‡ āĻ•āĻžāĻœ āĻ•āĻ°ā§‡ āĻ¯āĻž āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ…ā§āĻ¯āĻžāĻĒā§āĻ˛āĻŋāĻ•ā§‡āĻļāĻ¨āĻ—ā§āĻ˛āĻŋāĻ° āĻŽāĻ§ā§āĻ¯ā§‡ āĻŦāĻžāĻ°ā§āĻ¤āĻž āĻĒāĻžāĻ āĻžāĻ¤ā§‡ āĻāĻŦāĻ‚ āĻ—ā§āĻ°āĻšāĻŖ āĻ•āĻ°āĻ¤ā§‡ āĻĻā§‡āĻ¯āĻŧā§ˇ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒā§āĻ°āĻĨāĻŽā§‡ āĻāĻ•āĻŸāĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡āĨ¤ āĻ•āĻ¨āĻ¸ā§‹āĻ˛ā§‡ āĻļā§āĻ§ā§ āĻĒāĻžāĻŦ/āĻ¸āĻžāĻŦ-āĻ āĻ¯āĻžāĻ¨ āĻāĻŦāĻ‚ āĻŸāĻĒāĻŋāĻ• āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°ā§āĻ¨ āĻ•ā§āĻ˛āĻŋāĻ• āĻ•āĻ°ā§āĻ¨āĨ¤

āĻ¨ā§€āĻšā§‡āĻ° āĻ•ā§‹āĻĄāĻŸāĻŋ āĻ‰āĻĒāĻ°ā§‡ āĻ¸āĻ‚āĻœā§āĻžāĻžāĻ¯āĻŧāĻŋāĻ¤ āĻ˛āĻ— āĻĄā§‡āĻŸāĻž āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ¸ā§āĻ•ā§āĻ°āĻŋāĻĒā§āĻŸāĻ•ā§‡ āĻ•āĻ˛ āĻ•āĻ°ā§‡ āĻāĻŦāĻ‚ āĻ¤āĻžāĻ°āĻĒāĻ° āĻ˛āĻ—āĻ—ā§āĻ˛āĻŋāĻ•ā§‡ Pub/Sub-āĻ āĻ¸āĻ‚āĻ¯ā§āĻ•ā§āĻ¤ āĻ•āĻ°ā§‡ āĻĒāĻžāĻ āĻžāĻ¯āĻŧāĨ¤ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ¯āĻž āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡ āĻ¤āĻž āĻšāĻ˛ āĻāĻ•āĻŸāĻŋ āĻŦāĻ¸ā§āĻ¤ā§ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻž āĻĒā§āĻ°āĻ•āĻžāĻļāĻ• āĻ•ā§āĻ˛āĻžāĻ¯āĻŧā§‡āĻ¨ā§āĻŸ, āĻĒāĻĻā§āĻ§āĻ¤āĻŋ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ āĻŦāĻŋāĻˇāĻ¯āĻŧā§‡āĻ° āĻĒāĻĨ āĻ¨āĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āĻŸ āĻ•āĻ°ā§āĻ¨ topic_path āĻāĻŦāĻ‚ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻ•āĻ˛ āĻ•āĻ°ā§āĻ¨ publish Ņ topic_path āĻāĻŦāĻ‚ āĻĄā§‡āĻŸāĻžāĨ¤ āĻĻāĻ¯āĻŧāĻž āĻ•āĻ°ā§‡ āĻ¨ā§‹āĻŸ āĻ•āĻ°ā§āĻ¨ āĻ¯ā§‡ āĻ†āĻŽāĻ°āĻž āĻ†āĻŽāĻĻāĻžāĻ¨āĻŋ āĻ•āĻ°āĻŋ generate_log_line āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ¸ā§āĻ•ā§āĻ°āĻŋāĻĒā§āĻŸ āĻĨā§‡āĻ•ā§‡ stream_logs, āĻ¤āĻžāĻ‡ āĻ¨āĻŋāĻļā§āĻšāĻŋāĻ¤ āĻ•āĻ°ā§āĻ¨ āĻ¯ā§‡ āĻāĻ‡ āĻĢāĻžāĻ‡āĻ˛āĻ—ā§āĻ˛āĻŋ āĻāĻ•āĻ‡ āĻĢā§‹āĻ˛ā§āĻĄāĻžāĻ°ā§‡ āĻ†āĻ›ā§‡, āĻ…āĻ¨ā§āĻ¯āĻĨāĻžāĻ¯āĻŧ āĻ†āĻĒāĻ¨āĻŋ āĻāĻ•āĻŸāĻŋ āĻ†āĻŽāĻĻāĻžāĻ¨āĻŋ āĻ¤ā§āĻ°ā§āĻŸāĻŋ āĻĒāĻžāĻŦā§‡āĻ¨āĨ¤ āĻ¤āĻžāĻ°āĻĒāĻ°ā§‡ āĻ†āĻŽāĻ°āĻž āĻāĻŸāĻŋ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ—ā§āĻ—āĻ˛ āĻ•āĻ¨āĻ¸ā§‹āĻ˛ā§‡āĻ° āĻŽāĻžāĻ§ā§āĻ¯āĻŽā§‡ āĻāĻŸāĻŋ āĻšāĻžāĻ˛āĻžāĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ:

python publish.py

from stream_logs import generate_log_line
import logging
from google.cloud import pubsub_v1
import random
import time


PROJECT_ID="user-logs-237110"
TOPIC = "userlogs"


publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC)

def publish(publisher, topic, message):
    data = message.encode('utf-8')
    return publisher.publish(topic_path, data = data)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())


if __name__ == '__main__':

    while True:
        line = generate_log_line()
        print(line)
        message_future = publish(publisher, topic_path, line)
        message_future.add_done_callback(callback)

        sleep_time = random.choice(range(1, 3, 1))
        time.sleep(sleep_time)

āĻĢāĻžāĻ‡āĻ˛āĻŸāĻŋ āĻšāĻžāĻ˛āĻžāĻ¨ā§‹āĻ° āĻ¸āĻžāĻĨā§‡ āĻ¸āĻžāĻĨā§‡, āĻ†āĻŽāĻ°āĻž āĻ•āĻ¨āĻ¸ā§‹āĻ˛ā§‡ āĻ˛āĻ— āĻĄā§‡āĻŸāĻžāĻ° āĻ†āĻ‰āĻŸāĻĒā§āĻŸ āĻĻā§‡āĻ–āĻ¤ā§‡ āĻ¸āĻ•ā§āĻˇāĻŽ āĻšāĻŦ, āĻ¯ā§‡āĻŽāĻ¨āĻŸāĻŋ āĻ¨ā§€āĻšā§‡āĻ° āĻšāĻŋāĻ¤ā§āĻ°ā§‡ āĻĻā§‡āĻ–āĻžāĻ¨ā§‹ āĻšāĻ¯āĻŧā§‡āĻ›ā§‡āĨ¤ āĻāĻ‡ āĻ¸ā§āĻ•ā§āĻ°āĻŋāĻĒā§āĻŸāĻŸāĻŋ āĻ•āĻžāĻœ āĻ•āĻ°āĻŦā§‡ āĻ¯āĻ¤āĻ•ā§āĻˇāĻŖ āĻ¨āĻž āĻ†āĻŽāĻ°āĻž āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°āĻŋ āĻāĻŦāĻžāĻ° CTRL + āĻ¸āĻŋāĻāĻŸāĻž āĻ¸āĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ•āĻ°āĻ¤ā§‡

āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2
āĻšāĻŋāĻ¤ā§āĻ° 4. āĻ†āĻ‰āĻŸāĻĒā§āĻŸ publish_logs.py

āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ•ā§‹āĻĄ āĻ˛ā§‡āĻ–āĻž

āĻāĻ–āĻ¨ āĻ¯ā§‡āĻšā§‡āĻ¤ā§ āĻ†āĻŽāĻ°āĻž āĻ¸āĻŦāĻ•āĻŋāĻ›ā§ āĻĒā§āĻ°āĻ¸ā§āĻ¤ā§āĻ¤ āĻ•āĻ°ā§‡āĻ›āĻŋ, āĻ†āĻŽāĻ°āĻž āĻŽāĻœāĻžāĻ° āĻ…āĻ‚āĻļ āĻļā§āĻ°ā§ āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ - āĻŦāĻŋāĻŽ āĻāĻŦāĻ‚ āĻĒāĻžāĻ‡āĻĨāĻ¨ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ•ā§‹āĻĄāĻŋāĻ‚ āĻ•āĻ°āĻžāĨ¤ āĻāĻ•āĻŸāĻŋ āĻŦāĻŋāĻŽ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡, āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻāĻ•āĻŸāĻŋ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ…āĻŦāĻœā§‡āĻ•ā§āĻŸ (āĻĒāĻŋ) āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡āĨ¤ āĻāĻ•āĻŦāĻžāĻ° āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ…āĻŦāĻœā§‡āĻ•ā§āĻŸ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ˛ā§‡, āĻ†āĻŽāĻ°āĻž āĻ…āĻĒāĻžāĻ°ā§‡āĻŸāĻ° āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ āĻāĻ•ā§‡āĻ° āĻĒāĻ° āĻāĻ• āĻāĻ•āĻžāĻ§āĻŋāĻ• āĻĢāĻžāĻ‚āĻļāĻ¨ āĻĒā§āĻ°āĻ¯āĻŧā§‹āĻ— āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ pipe (|). āĻ¸āĻžāĻ§āĻžāĻ°āĻŖāĻ­āĻžāĻŦā§‡, āĻ•āĻ°ā§āĻŽāĻĒā§āĻ°āĻŦāĻžāĻšāĻŸāĻŋ āĻ¨ā§€āĻšā§‡āĻ° āĻšāĻŋāĻ¤ā§āĻ°ā§‡āĻ° āĻŽāĻ¤ā§‹ āĻĻā§‡āĻ–āĻžāĻ¯āĻŧāĨ¤

[Final Output PCollection] = ([Initial Input PCollection] | [First Transform]
             | [Second Transform]
             | [Third Transform])

āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ•ā§‹āĻĄā§‡, āĻ†āĻŽāĻ°āĻž āĻĻā§āĻŸāĻŋ āĻ•āĻžāĻ¸ā§āĻŸāĻŽ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŦāĨ¤ āĻĢāĻžāĻ‚āĻļāĻ¨ regex_clean, āĻ¯āĻž āĻĄā§‡āĻŸāĻž āĻ¸ā§āĻ•ā§āĻ¯āĻžāĻ¨ āĻ•āĻ°ā§‡ āĻāĻŦāĻ‚ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ PATTERNS āĻ¤āĻžāĻ˛āĻŋāĻ•āĻžāĻ° āĻ‰āĻĒāĻ° āĻ­āĻŋāĻ¤ā§āĻ¤āĻŋ āĻ•āĻ°ā§‡ āĻ¸āĻ‚āĻļā§āĻ˛āĻŋāĻˇā§āĻŸ āĻ¸āĻžāĻ°āĻŋ āĻĒā§āĻ¨āĻ°ā§āĻĻā§āĻ§āĻžāĻ° āĻ•āĻ°ā§‡ re.search. āĻĢāĻžāĻ‚āĻļāĻ¨āĻŸāĻŋ āĻāĻ•āĻŸāĻŋ āĻ•āĻŽāĻž āĻŦāĻŋāĻ­āĻ•ā§āĻ¤ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻ‚ āĻĒā§āĻ°āĻĻāĻžāĻ¨ āĻ•āĻ°ā§‡āĨ¤ āĻ†āĻĒāĻ¨āĻŋ āĻ¯āĻĻāĻŋ āĻ¨āĻŋāĻ¯āĻŧāĻŽāĻŋāĻ¤ āĻāĻ•ā§āĻ¸āĻĒā§āĻ°ā§‡āĻļāĻ¨ āĻŦāĻŋāĻļā§‡āĻˇāĻœā§āĻž āĻ¨āĻž āĻšāĻ¨ āĻ¤āĻŦā§‡ āĻ†āĻŽāĻŋ āĻāĻŸāĻŋ āĻĒāĻ°ā§€āĻ•ā§āĻˇāĻž āĻ•āĻ°āĻžāĻ° āĻĒāĻ°āĻžāĻŽāĻ°ā§āĻļ āĻĻāĻŋāĻ‡ āĻŸāĻŋāĻ‰āĻŸā§‹āĻ°āĻŋāĻ¯āĻŧāĻžāĻ˛ āĻāĻŦāĻ‚ āĻ•ā§‹āĻĄ āĻšā§‡āĻ• āĻ•āĻ°āĻ¤ā§‡ āĻāĻ•āĻŸāĻŋ āĻ¨ā§‹āĻŸāĻĒā§āĻ¯āĻžāĻĄā§‡ āĻ…āĻ¨ā§āĻļā§€āĻ˛āĻ¨ āĻ•āĻ°ā§āĻ¨āĨ¤ āĻāĻ° āĻĒāĻ°ā§‡ āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ•āĻžāĻ¸ā§āĻŸāĻŽ ParDo āĻĢāĻžāĻ‚āĻļāĻ¨ āĻ¸āĻ‚āĻœā§āĻžāĻžāĻ¯āĻŧāĻŋāĻ¤ āĻ•āĻ°āĻŋ āĻ¯āĻžāĻ•ā§‡ āĻŦāĻ˛āĻž āĻšāĻ¯āĻŧ āĻŦāĻŋāĻ­āĻ•ā§āĻ¤ āĻ•āĻ°āĻž, āĻ¯āĻž āĻ¸āĻŽāĻžāĻ¨ā§āĻ¤āĻ°āĻžāĻ˛ āĻĒā§āĻ°āĻ•ā§āĻ°āĻŋāĻ¯āĻŧāĻžāĻ•āĻ°āĻŖā§‡āĻ° āĻœāĻ¨ā§āĻ¯ āĻŽāĻ°ā§€āĻšāĻŋ āĻ°ā§‚āĻĒāĻžāĻ¨ā§āĻ¤āĻ°ā§‡āĻ° āĻāĻ•āĻŸāĻŋ āĻĒāĻ°āĻŋāĻŦāĻ°ā§āĻ¤āĻ¨āĨ¤ āĻĒāĻžāĻ‡āĻĨāĻ¨ā§‡, āĻāĻŸāĻŋ āĻāĻ•āĻŸāĻŋ āĻŦāĻŋāĻļā§‡āĻˇ āĻ‰āĻĒāĻžāĻ¯āĻŧā§‡ āĻ•āĻ°āĻž āĻšāĻ¯āĻŧ - āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ…āĻŦāĻļā§āĻ¯āĻ‡ āĻāĻ•āĻŸāĻŋ āĻ•ā§āĻ˛āĻžāĻ¸ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡ āĻ¯āĻž DoFn āĻŦāĻŋāĻŽ āĻ•ā§āĻ˛āĻžāĻ¸ āĻĨā§‡āĻ•ā§‡ āĻ‰āĻ¤ā§āĻ¤āĻ°āĻžāĻ§āĻŋāĻ•āĻžāĻ°āĻ¸ā§‚āĻ¤ā§āĻ°ā§‡ āĻĒāĻžāĻ“āĻ¯āĻŧāĻž āĻ¯āĻžāĻ¯āĻŧāĨ¤ āĻ¸ā§āĻĒā§āĻ˛āĻŋāĻŸ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻĒā§‚āĻ°ā§āĻŦāĻŦāĻ°ā§āĻ¤ā§€ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻĨā§‡āĻ•ā§‡ āĻĒāĻžāĻ°ā§āĻ¸ āĻ•āĻ°āĻž āĻ¸āĻžāĻ°āĻŋāĻŸāĻŋ āĻ¨ā§‡āĻ¯āĻŧ āĻāĻŦāĻ‚ āĻ†āĻŽāĻžāĻĻā§‡āĻ° BigQuery āĻŸā§‡āĻŦāĻŋāĻ˛ā§‡āĻ° āĻ•āĻ˛āĻžāĻŽā§‡āĻ° āĻ¨āĻžāĻŽā§‡āĻ° āĻ¸āĻžāĻĨā§‡ āĻ¸āĻŽā§āĻĒāĻ°ā§āĻ•āĻŋāĻ¤ āĻ•ā§€ āĻ¸āĻš āĻ…āĻ­āĻŋāĻ§āĻžāĻ¨ā§‡āĻ° āĻāĻ•āĻŸāĻŋ āĻ¤āĻžāĻ˛āĻŋāĻ•āĻž āĻĒā§āĻ°āĻĻāĻžāĻ¨ āĻ•āĻ°ā§‡āĨ¤ āĻāĻ‡ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻ¸āĻŽā§āĻĒāĻ°ā§āĻ•ā§‡ āĻ¨ā§‹āĻŸ āĻ•āĻ°āĻžāĻ° āĻ•āĻŋāĻ›ā§ āĻ†āĻ›ā§‡: āĻ†āĻŽāĻžāĻ•ā§‡ āĻ†āĻŽāĻĻāĻžāĻ¨āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻ¯āĻŧā§‡āĻ›āĻŋāĻ˛ datetime āĻāĻŸāĻŋ āĻ•āĻžāĻœ āĻ•āĻ°āĻ¤ā§‡ āĻāĻ•āĻŸāĻŋ āĻĢāĻžāĻ‚āĻļāĻ¨ āĻ­āĻŋāĻ¤āĻ°ā§‡. āĻ†āĻŽāĻŋ āĻĢāĻžāĻ‡āĻ˛ā§‡āĻ° āĻļā§āĻ°ā§āĻ¤ā§‡ āĻāĻ•āĻŸāĻŋ āĻ†āĻŽāĻĻāĻžāĻ¨āĻŋ āĻ¤ā§āĻ°ā§āĻŸāĻŋ āĻĒā§‡āĻ¯āĻŧā§‡āĻ›āĻŋāĻ˛āĻžāĻŽ, āĻ¯āĻž āĻ…āĻĻā§āĻ­ā§āĻ¤ āĻ›āĻŋāĻ˛āĨ¤ āĻāĻ‡ āĻ¤āĻžāĻ˛āĻŋāĻ•āĻž āĻ¤āĻžāĻ°āĻĒāĻ° āĻĢāĻžāĻ‚āĻļāĻ¨ āĻĒāĻžāĻ¸ āĻ•āĻ°āĻž āĻšāĻ¯āĻŧ āĻ˛āĻŋāĻ–ā§āĻ¨ToBigQuery, āĻ¯āĻž āĻ¸āĻšāĻœāĻ­āĻžāĻŦā§‡ āĻŸā§‡āĻŦāĻŋāĻ˛ā§‡ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĄā§‡āĻŸāĻž āĻ¯ā§‹āĻ— āĻ•āĻ°ā§‡āĨ¤ āĻŦā§āĻ¯āĻžāĻš āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻœāĻŦ āĻāĻŦāĻ‚ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽāĻŋāĻ‚ āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻ•āĻžāĻœā§‡āĻ° āĻ•ā§‹āĻĄ āĻ¨ā§€āĻšā§‡ āĻĻā§‡āĻ“āĻ¯āĻŧāĻž āĻšāĻ˛āĨ¤ āĻŦā§āĻ¯āĻžāĻš āĻāĻŦāĻ‚ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽāĻŋāĻ‚ āĻ•ā§‹āĻĄā§‡āĻ° āĻŽāĻ§ā§āĻ¯ā§‡ āĻāĻ•āĻŽāĻžāĻ¤ā§āĻ° āĻĒāĻžāĻ°ā§āĻĨāĻ•ā§āĻ¯ āĻšāĻ˛ āĻ¯ā§‡ āĻŦā§āĻ¯āĻžāĻš āĻĨā§‡āĻ•ā§‡ āĻ†āĻŽāĻ°āĻž CSV āĻĒāĻĄāĻŧāĻŋ src_pathāĻĢāĻžāĻ‚āĻļāĻ¨ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ ReadFromText āĻŽāĻ°ā§€āĻšāĻŋ āĻĨā§‡āĻ•ā§‡āĨ¤

āĻŦā§āĻ¯āĻžāĻš āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻœāĻŦ (āĻŦā§āĻ¯āĻžāĻš āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚)

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import bigquery
import re
import logging
import sys

PROJECT='user-logs-237110'
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'


src_path = "user_log_fileC.txt"

def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")

        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'status': element[3],
            'body_bytes_sent': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main():

   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
      | "clean address" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )

   p.run()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽāĻŋāĻ‚ āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻœāĻŦ (āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚)

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="user-logs-237110"
schema = 'remote_addr:STRING, timelocal:STRING, request_type:STRING, status:STRING, body_bytes_sent:STRING, http_referer:STRING, http_user_agent:STRING'
TOPIC = "projects/user-logs-237110/topics/userlogs"


def regex_clean(data):

    PATTERNS =  [r'(^S+.[S+.]+S+)s',r'(?<=[).+?(?=])',
           r'"(S+)s(S+)s*(S*)"',r's(d+)s',r"(?<=[).d+(?=])",
           r'"[A-Z][a-z]+', r'"(http|https)://[a-z]+.[a-z]+.[a-z]+']
    result = []
    for match in PATTERNS:
      try:
        reg_match = re.search(match, data).group()
        if reg_match:
          result.append(reg_match)
        else:
          result.append(" ")
      except:
        print("There was an error with the regex search")
    result = [x.strip() for x in result]
    result = [x.replace('"', "") for x in result]
    res = ','.join(result)
    return res


class Split(beam.DoFn):

    def process(self, element):
        from datetime import datetime
        element = element.split(",")
        d = datetime.strptime(element[1], "%d/%b/%Y:%H:%M:%S")
        date_string = d.strftime("%Y-%m-%d %H:%M:%S")
        
        return [{ 
            'remote_addr': element[0],
            'timelocal': date_string,
            'request_type': element[2],
            'body_bytes_sent': element[3],
            'status': element[4],
            'http_referer': element[5],
            'http_user_agent': element[6]
    
        }]

def main(argv=None):

   parser = argparse.ArgumentParser()
   parser.add_argument("--input_topic")
   parser.add_argument("--output")
   known_args = parser.parse_known_args(argv)


   p = beam.Pipeline(options=PipelineOptions())

   (p
      | 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
      | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
      | "Clean Data" >> beam.Map(regex_clean)
      | 'ParseCSV' >> beam.ParDo(Split())
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:userlogs.logdata'.format(PROJECT), schema=schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
   )
   result = p.run()
   result.wait_until_finish()

if __name__ == '__main__':
  logger = logging.getLogger().setLevel(logging.INFO)
  main()

āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻšāĻžāĻ˛āĻžāĻšā§āĻ›ā§‡

āĻ†āĻŽāĻ°āĻž āĻŦāĻŋāĻ­āĻŋāĻ¨ā§āĻ¨ āĻ‰āĻĒāĻžāĻ¯āĻŧā§‡ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻšāĻžāĻ˛āĻžāĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤ āĻ†āĻŽāĻ°āĻž āĻšāĻžāĻ‡āĻ˛ā§‡, āĻĻā§‚āĻ°āĻŦāĻ°ā§āĻ¤ā§€āĻ­āĻžāĻŦā§‡ GCP-āĻ āĻ˛āĻ—āĻ‡āĻ¨ āĻ•āĻ°āĻžāĻ° āĻ¸āĻŽāĻ¯āĻŧ āĻ†āĻŽāĻ°āĻž āĻāĻŸāĻŋāĻ•ā§‡ āĻ¸ā§āĻĨāĻžāĻ¨ā§€āĻ¯āĻŧāĻ­āĻžāĻŦā§‡ āĻŸāĻžāĻ°ā§āĻŽāĻŋāĻ¨āĻžāĻ˛ āĻĨā§‡āĻ•ā§‡ āĻšāĻžāĻ˛āĻžāĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤

python -m main_pipeline_stream.py 
 --input_topic "projects/user-logs-237110/topics/userlogs" 
 --streaming

āĻ¯āĻžāĻ‡āĻšā§‹āĻ•, āĻ†āĻŽāĻ°āĻž DataFlow āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ āĻāĻŸāĻŋ āĻšāĻžāĻ˛āĻžāĻ¤ā§‡ āĻ¯āĻžāĻšā§āĻ›āĻŋāĨ¤ āĻ†āĻŽāĻ°āĻž āĻ¨āĻŋāĻŽā§āĻ¨āĻ˛āĻŋāĻ–āĻŋāĻ¤ āĻĒā§āĻ°āĻ¯āĻŧā§‹āĻœāĻ¨ā§€āĻ¯āĻŧ āĻĒā§āĻ¯āĻžāĻ°āĻžāĻŽāĻŋāĻŸāĻžāĻ° āĻ¸ā§‡āĻŸ āĻ•āĻ°ā§‡ āĻ¨ā§€āĻšā§‡āĻ° āĻ•āĻŽāĻžāĻ¨ā§āĻĄ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡ āĻāĻŸāĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤

  • project — āĻ†āĻĒāĻ¨āĻžāĻ° GCP āĻĒā§āĻ°āĻ•āĻ˛ā§āĻĒā§‡āĻ° āĻ†āĻ‡āĻĄāĻŋāĨ¤
  • runner āĻāĻ•āĻŸāĻŋ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ°āĻžāĻ¨āĻžāĻ° āĻ¯āĻž āĻ†āĻĒāĻ¨āĻžāĻ° āĻĒā§āĻ°ā§‹āĻ—ā§āĻ°āĻžāĻŽ āĻŦāĻŋāĻļā§āĻ˛ā§‡āĻˇāĻŖ āĻ•āĻ°āĻŦā§‡ āĻāĻŦāĻ‚ āĻ†āĻĒāĻ¨āĻžāĻ° āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¨āĻŋāĻ°ā§āĻŽāĻžāĻŖ āĻ•āĻ°āĻŦā§‡āĨ¤ āĻ•ā§āĻ˛āĻžāĻ‰āĻĄā§‡ āĻšāĻžāĻ˛āĻžāĻ¨ā§‹āĻ° āĻœāĻ¨ā§āĻ¯, āĻ†āĻĒāĻ¨āĻžāĻ•ā§‡ āĻ…āĻŦāĻļā§āĻ¯āĻ‡ āĻāĻ•āĻŸāĻŋ DataflowRunner āĻ¨āĻŋāĻ°ā§āĻĻāĻŋāĻˇā§āĻŸ āĻ•āĻ°āĻ¤ā§‡ āĻšāĻŦā§‡āĨ¤
  • staging_location — āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻ¸ā§āĻŸā§‹āĻ°ā§‡āĻœā§‡āĻ° āĻĒāĻžāĻĨ āĻ‡āĻ¨āĻĄā§‡āĻ•ā§āĻ¸āĻŋāĻ‚ āĻ•ā§‹āĻĄ āĻĒā§āĻ¯āĻžāĻ•ā§‡āĻœāĻ—ā§āĻ˛āĻŋāĻ° āĻœāĻ¨ā§āĻ¯ āĻĒā§āĻ°āĻ¯āĻŧā§‹āĻœāĻ¨ā§€āĻ¯āĻŧ āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻ°āĻ°āĻž āĻ•āĻžāĻœ āĻ¸āĻŽā§āĻĒāĻžāĻĻāĻ¨ āĻ•āĻ°ā§‡āĨ¤
  • temp_location — āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻšāĻ˛āĻžāĻ•āĻžāĻ˛ā§€āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻž āĻ…āĻ¸ā§āĻĨāĻžāĻ¯āĻŧā§€ āĻ•āĻžāĻœā§‡āĻ° āĻĢāĻžāĻ‡āĻ˛ āĻ¸āĻ‚āĻ°āĻ•ā§āĻˇāĻŖā§‡āĻ° āĻœāĻ¨ā§āĻ¯ āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻ•ā§āĻ˛āĻžāĻ‰āĻĄ āĻ¸ā§āĻŸā§‹āĻ°ā§‡āĻœā§‡āĻ° āĻĒāĻĨāĨ¤
  • streaming

python main_pipeline_stream.py 
--runner DataFlow 
--project $PROJECT 
--temp_location $BUCKET/tmp 
--staging_location $BUCKET/staging
--streaming

āĻāĻ‡ āĻ•āĻŽāĻžāĻ¨ā§āĻĄāĻŸāĻŋ āĻšāĻ˛āĻžāĻ•āĻžāĻ˛ā§€āĻ¨, āĻ†āĻŽāĻ°āĻž āĻ—ā§āĻ—āĻ˛ āĻ•āĻ¨āĻ¸ā§‹āĻ˛ā§‡ āĻĄā§‡āĻŸāĻžāĻĢā§āĻ˛ā§‹ āĻŸā§āĻ¯āĻžāĻŦā§‡ āĻ¯ā§‡āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ āĻāĻŦāĻ‚ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻĻā§‡āĻ–āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤ āĻ¯āĻ–āĻ¨ āĻ†āĻŽāĻ°āĻž āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ā§‡ āĻ•ā§āĻ˛āĻŋāĻ• āĻ•āĻ°āĻŋ, āĻ¤āĻ–āĻ¨ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻšāĻŋāĻ¤ā§āĻ° 4-āĻāĻ° āĻŽāĻ¤ā§‹ āĻ•āĻŋāĻ›ā§ āĻĻā§‡āĻ–āĻ¤ā§‡ āĻĒāĻžāĻ“āĻ¯āĻŧāĻž āĻ‰āĻšāĻŋāĻ¤āĨ¤ āĻĄāĻŋāĻŦāĻžāĻ—āĻŋāĻ‚āĻ¯āĻŧā§‡āĻ° āĻ‰āĻĻā§āĻĻā§‡āĻļā§āĻ¯ā§‡, āĻŦāĻŋāĻļāĻĻ āĻ˛āĻ—āĻ—ā§āĻ˛āĻŋ āĻĻā§‡āĻ–āĻ¤ā§‡ āĻ˛āĻ— āĻāĻŦāĻ‚ āĻ¤āĻžāĻ°āĻĒāĻ° āĻ¸ā§āĻŸā§āĻ¯āĻžāĻ•āĻĄā§āĻ°āĻžāĻ‡āĻ­āĻžāĻ°ā§‡ āĻ¯āĻžāĻ“āĻ¯āĻŧāĻž āĻ–ā§āĻŦ āĻ¸āĻšāĻžāĻ¯āĻŧāĻ• āĻšāĻ¤ā§‡ āĻĒāĻžāĻ°ā§‡āĨ¤ āĻāĻŸāĻŋ āĻ†āĻŽāĻžāĻ•ā§‡ āĻŦā§‡āĻļ āĻ•āĻ¯āĻŧā§‡āĻ•āĻŸāĻŋ āĻ•ā§āĻˇā§‡āĻ¤ā§āĻ°ā§‡ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ā§‡āĻ° āĻ¸āĻŽāĻ¸ā§āĻ¯āĻž āĻ¸āĻŽāĻžāĻ§āĻžāĻ¨ āĻ•āĻ°āĻ¤ā§‡ āĻ¸āĻžāĻšāĻžāĻ¯ā§āĻ¯ āĻ•āĻ°ā§‡āĻ›ā§‡āĨ¤

āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2
āĻšāĻŋāĻ¤ā§āĻ° 4: āĻŽāĻ°ā§€āĻšāĻŋ āĻĒāĻ°āĻŋāĻŦāĻžāĻšāĻ•

BigQuery-āĻ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĄā§‡āĻŸāĻž āĻ…ā§āĻ¯āĻžāĻ•ā§āĻ¸ā§‡āĻ¸ āĻ•āĻ°ā§āĻ¨

āĻ¸ā§āĻ¤āĻ°āĻžāĻ‚, āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ‡āĻ¤āĻŋāĻŽāĻ§ā§āĻ¯ā§‡āĻ‡ āĻāĻ•āĻŸāĻŋ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻĨāĻžāĻ•āĻž āĻ‰āĻšāĻŋāĻ¤ āĻ¯āĻžāĻ¤ā§‡ āĻĄā§‡āĻŸāĻž āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻŸā§‡āĻŦāĻŋāĻ˛ā§‡ āĻĒā§āĻ°āĻŦāĻžāĻšāĻŋāĻ¤ āĻšāĻ¯āĻŧāĨ¤ āĻāĻŸāĻŋ āĻĒāĻ°ā§€āĻ•ā§āĻˇāĻž āĻ•āĻ°āĻžāĻ° āĻœāĻ¨ā§āĻ¯, āĻ†āĻŽāĻ°āĻž BigQuery-āĻ āĻ¯ā§‡āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ āĻāĻŦāĻ‚ āĻĄā§‡āĻŸāĻž āĻĻā§‡āĻ–āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤ āĻ¨ā§€āĻšā§‡āĻ° āĻ•āĻŽāĻžāĻ¨ā§āĻĄāĻŸāĻŋ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°āĻžāĻ° āĻĒāĻ°ā§‡ āĻ†āĻĒāĻ¨āĻŋ āĻĄā§‡āĻŸāĻžāĻ¸ā§‡āĻŸā§‡āĻ° āĻĒā§āĻ°āĻĨāĻŽ āĻ•āĻ¯āĻŧā§‡āĻ•āĻŸāĻŋ āĻ¸āĻžāĻ°āĻŋ āĻĻā§‡āĻ–āĻ¤ā§‡ āĻĒāĻžāĻŦā§‡āĻ¨āĨ¤ āĻāĻ–āĻ¨ āĻ¯ā§‡āĻšā§‡āĻ¤ā§ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ•āĻžāĻ›ā§‡ BigQuery-āĻ āĻĄā§‡āĻŸāĻž āĻ¸āĻ‚āĻ°āĻ•ā§āĻˇāĻŋāĻ¤ āĻ†āĻ›ā§‡, āĻ†āĻŽāĻ°āĻž āĻ†āĻ°āĻ“ āĻŦāĻŋāĻļā§āĻ˛ā§‡āĻˇāĻŖ āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ, āĻ¸ā§‡āĻ‡āĻ¸āĻžāĻĨā§‡ āĻ¸āĻšāĻ•āĻ°ā§āĻŽā§€āĻĻā§‡āĻ° āĻ¸āĻžāĻĨā§‡ āĻĄā§‡āĻŸāĻž āĻļā§‡āĻ¯āĻŧāĻžāĻ° āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ āĻāĻŦāĻ‚ āĻŦā§āĻ¯āĻŦāĻ¸āĻžāĻ¯āĻŧāĻŋāĻ• āĻĒā§āĻ°āĻļā§āĻ¨ā§‡āĻ° āĻ‰āĻ¤ā§āĻ¤āĻ° āĻĻā§‡āĻ“āĻ¯āĻŧāĻž āĻļā§āĻ°ā§ āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤

SELECT * FROM `user-logs-237110.userlogs.logdata` LIMIT 10;

āĻ†āĻŽāĻ°āĻž āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽ āĻĄā§‡āĻŸāĻž āĻĒā§āĻ°āĻ¸ā§‡āĻ¸āĻŋāĻ‚ āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻŋāĨ¤ āĻ…āĻ‚āĻļ 2
āĻšāĻŋāĻ¤ā§āĻ° 5: BigQuery

āĻ‰āĻĒāĻ¸āĻ‚āĻšāĻžāĻ°

āĻ†āĻŽāĻ°āĻž āĻ†āĻļāĻž āĻ•āĻ°āĻŋ āĻāĻ‡ āĻĒā§‹āĻ¸ā§āĻŸāĻŸāĻŋ āĻāĻ•āĻŸāĻŋ āĻ¸ā§āĻŸā§āĻ°āĻŋāĻŽāĻŋāĻ‚ āĻĄā§‡āĻŸāĻž āĻĒāĻžāĻ‡āĻĒāĻ˛āĻžāĻ‡āĻ¨ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻžāĻ° āĻĒāĻžāĻļāĻžāĻĒāĻžāĻļāĻŋ āĻĄā§‡āĻŸāĻž āĻ†āĻ°āĻ“ āĻ…ā§āĻ¯āĻžāĻ•ā§āĻ¸ā§‡āĻ¸āĻ¯ā§‹āĻ—ā§āĻ¯ āĻ•āĻ°āĻžāĻ° āĻ‰āĻĒāĻžāĻ¯āĻŧāĻ—ā§āĻ˛āĻŋ āĻ–ā§āĻāĻœā§‡ āĻŦā§‡āĻ° āĻ•āĻ°āĻžāĻ° āĻāĻ•āĻŸāĻŋ āĻĻāĻ°āĻ•āĻžāĻ°ā§€ āĻ‰āĻĻāĻžāĻšāĻ°āĻŖ āĻšāĻŋāĻ¸āĻžāĻŦā§‡ āĻ•āĻžāĻœ āĻ•āĻ°ā§‡ā§ˇ āĻāĻ‡ āĻŦāĻŋāĻ¨ā§āĻ¯āĻžāĻ¸ā§‡ āĻĄā§‡āĻŸāĻž āĻ¸āĻ‚āĻ°āĻ•ā§āĻˇāĻŖ āĻ•āĻ°āĻž āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻ…āĻ¨ā§‡āĻ• āĻ¸ā§āĻŦāĻŋāĻ§āĻž āĻĻā§‡āĻ¯āĻŧāĨ¤ āĻāĻ–āĻ¨ āĻ†āĻŽāĻ°āĻž āĻ—ā§āĻ°ā§āĻ¤ā§āĻŦāĻĒā§‚āĻ°ā§āĻŖ āĻĒā§āĻ°āĻļā§āĻ¨ā§‡āĻ° āĻ‰āĻ¤ā§āĻ¤āĻ° āĻĻā§‡āĻ“āĻ¯āĻŧāĻž āĻļā§āĻ°ā§ āĻ•āĻ°āĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋ āĻ¯ā§‡āĻŽāĻ¨ āĻ•āĻ¤āĻœāĻ¨ āĻŽāĻžāĻ¨ā§āĻˇ āĻ†āĻŽāĻžāĻĻā§‡āĻ° āĻĒāĻŖā§āĻ¯ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°āĻ›ā§‡? āĻ†āĻĒāĻ¨āĻžāĻ° āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ°āĻ•āĻžāĻ°ā§€ āĻŦā§‡āĻ¸ āĻ¸āĻŽāĻ¯āĻŧā§‡āĻ° āĻ¸āĻžāĻĨā§‡ āĻŦāĻžāĻĄāĻŧāĻ›ā§‡? āĻĒāĻŖā§āĻ¯ā§‡āĻ° āĻ•ā§‹āĻ¨ āĻĻāĻŋāĻ•āĻ—ā§āĻ˛āĻŋāĻ° āĻ¸āĻžāĻĨā§‡ āĻ˛ā§‹āĻ•ā§‡āĻ°āĻž āĻ¸āĻŦāĻšā§‡āĻ¯āĻŧā§‡ āĻŦā§‡āĻļāĻŋ āĻ¯ā§‹āĻ—āĻžāĻ¯ā§‹āĻ— āĻ•āĻ°ā§‡? āĻāĻŦāĻ‚ āĻ¸ā§‡āĻ–āĻžāĻ¨ā§‡ āĻ¤ā§āĻ°ā§āĻŸāĻŋ āĻ†āĻ›ā§‡ āĻ¯ā§‡āĻ–āĻžāĻ¨ā§‡ āĻĨāĻžāĻ•āĻž āĻ‰āĻšāĻŋāĻ¤ āĻ¨āĻ¯āĻŧ? āĻāĻ‡ āĻĒā§āĻ°āĻļā§āĻ¨āĻ—ā§āĻ˛ā§‹ āĻĒā§āĻ°āĻ¤āĻŋāĻˇā§āĻ āĻžāĻ¨ā§‡āĻ° āĻ¸ā§āĻŦāĻžāĻ°ā§āĻĨā§‡āĻ° āĻšāĻŦā§‡āĨ¤ āĻāĻ‡ āĻĒā§āĻ°āĻļā§āĻ¨āĻ—ā§āĻ˛āĻŋāĻ° āĻ‰āĻ¤ā§āĻ¤āĻ° āĻĨā§‡āĻ•ā§‡ āĻ‰āĻĻā§āĻ­ā§‚āĻ¤ āĻ…āĻ¨ā§āĻ¤āĻ°ā§āĻĻā§ƒāĻˇā§āĻŸāĻŋāĻ—ā§āĻ˛āĻŋāĻ° āĻ‰āĻĒāĻ° āĻ­āĻŋāĻ¤ā§āĻ¤āĻŋ āĻ•āĻ°ā§‡, āĻ†āĻŽāĻ°āĻž āĻĒāĻŖā§āĻ¯āĻŸāĻŋāĻ•ā§‡ āĻ‰āĻ¨ā§āĻ¨āĻ¤ āĻ•āĻ°āĻ¤ā§‡ āĻāĻŦāĻ‚ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ°āĻ•āĻžāĻ°ā§€āĻ° āĻŦā§āĻ¯āĻ¸ā§āĻ¤āĻ¤āĻž āĻŦāĻžāĻĄāĻŧāĻžāĻ¤ā§‡ āĻĒāĻžāĻ°āĻŋāĨ¤

āĻ°āĻļā§āĻŽāĻŋ āĻāĻ‡ āĻ§āĻ°āĻ¨ā§‡āĻ° āĻŦā§āĻ¯āĻžāĻ¯āĻŧāĻžāĻŽā§‡āĻ° āĻœāĻ¨ā§āĻ¯ āĻ¸āĻ¤ā§āĻ¯āĻŋāĻ‡ āĻ‰āĻĒāĻ¯ā§‹āĻ—ā§€ āĻāĻŦāĻ‚ āĻ…āĻ¨ā§āĻ¯āĻžāĻ¨ā§āĻ¯ āĻ†āĻ•āĻ°ā§āĻˇāĻŖā§€āĻ¯āĻŧ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ°ā§‡āĻ° āĻ•ā§āĻˇā§‡āĻ¤ā§āĻ°ā§‡āĻ“ āĻ°āĻ¯āĻŧā§‡āĻ›ā§‡āĨ¤ āĻ‰āĻĻāĻžāĻšāĻ°āĻŖāĻ¸ā§āĻŦāĻ°ā§‚āĻĒ, āĻ†āĻĒāĻ¨āĻŋ āĻ°āĻŋāĻ¯āĻŧā§‡āĻ˛ āĻŸāĻžāĻ‡āĻŽā§‡ āĻ¸ā§āĻŸāĻ• āĻŸāĻŋāĻ• āĻĄā§‡āĻŸāĻž āĻŦāĻŋāĻļā§āĻ˛ā§‡āĻˇāĻŖ āĻ•āĻ°āĻ¤ā§‡ āĻāĻŦāĻ‚ āĻŦāĻŋāĻļā§āĻ˛ā§‡āĻˇāĻŖā§‡āĻ° āĻ‰āĻĒāĻ° āĻ­āĻŋāĻ¤ā§āĻ¤āĻŋ āĻ•āĻ°ā§‡ āĻŦā§āĻ¯āĻŦāĻ¸āĻž āĻ•āĻ°āĻ¤ā§‡ āĻšāĻžāĻ‡āĻ¤ā§‡ āĻĒāĻžāĻ°ā§‡āĻ¨, āĻ¸āĻŽā§āĻ­āĻŦāĻ¤ āĻ†āĻĒāĻ¨āĻžāĻ° āĻ•āĻžāĻ›ā§‡ āĻ¯āĻžāĻ¨āĻŦāĻžāĻšāĻ¨ āĻĨā§‡āĻ•ā§‡ āĻ¸ā§‡āĻ¨ā§āĻ¸āĻ° āĻĄā§‡āĻŸāĻž āĻ†āĻ¸āĻ›ā§‡ āĻāĻŦāĻ‚ āĻ†āĻĒāĻ¨āĻŋ āĻŸā§āĻ°ā§āĻ¯āĻžāĻĢāĻŋāĻ• āĻ¸ā§āĻ¤āĻ°ā§‡āĻ° āĻ—āĻŖāĻ¨āĻž āĻ•āĻ°āĻ¤ā§‡ āĻšāĻžāĻ¨āĨ¤ āĻ‰āĻĻāĻžāĻšāĻ°āĻŖāĻ¸ā§āĻŦāĻ°ā§‚āĻĒ, āĻ†āĻĒāĻ¨āĻŋ āĻāĻ•āĻŸāĻŋ āĻ—ā§‡āĻŽāĻŋāĻ‚ āĻ•ā§‹āĻŽā§āĻĒāĻžāĻ¨āĻŋ āĻšāĻ¤ā§‡ āĻĒāĻžāĻ°ā§‡āĻ¨ āĻ¯āĻž āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ°āĻ•āĻžāĻ°ā§€āĻ° āĻĄā§‡āĻŸāĻž āĻ¸āĻ‚āĻ—ā§āĻ°āĻš āĻ•āĻ°ā§‡ āĻāĻŦāĻ‚ āĻŽā§‚āĻ˛ āĻŽā§‡āĻŸā§āĻ°āĻŋāĻ•ā§āĻ¸ āĻŸā§āĻ°ā§āĻ¯āĻžāĻ• āĻ•āĻ°āĻ¤ā§‡ āĻĄā§āĻ¯āĻžāĻļāĻŦā§‹āĻ°ā§āĻĄ āĻ¤ā§ˆāĻ°āĻŋ āĻ•āĻ°āĻ¤ā§‡ āĻāĻŸāĻŋ āĻŦā§āĻ¯āĻŦāĻšāĻžāĻ° āĻ•āĻ°ā§‡āĨ¤ āĻ āĻŋāĻ• āĻ†āĻ›ā§‡, āĻ­āĻĻā§āĻ°āĻ˛ā§‹āĻ•, āĻāĻŸāĻŋ āĻ…āĻ¨ā§āĻ¯ āĻĒā§‹āĻ¸ā§āĻŸā§‡āĻ° āĻœāĻ¨ā§āĻ¯ āĻāĻ•āĻŸāĻŋ āĻŦāĻŋāĻˇāĻ¯āĻŧ, āĻĒāĻĄāĻŧāĻžāĻ° āĻœāĻ¨ā§āĻ¯ āĻ§āĻ¨ā§āĻ¯āĻŦāĻžāĻĻ, āĻāĻŦāĻ‚ āĻ¯āĻžāĻ°āĻž āĻ¸āĻŽā§āĻĒā§‚āĻ°ā§āĻŖ āĻ•ā§‹āĻĄ āĻĻā§‡āĻ–āĻ¤ā§‡ āĻšāĻžāĻ¨ āĻ¤āĻžāĻĻā§‡āĻ° āĻœāĻ¨ā§āĻ¯, āĻ¨ā§€āĻšā§‡ āĻ†āĻŽāĻžāĻ° āĻ—āĻŋāĻŸāĻšāĻžāĻŦā§‡āĻ° āĻ˛āĻŋāĻ™ā§āĻ• āĻ°āĻ¯āĻŧā§‡āĻ›ā§‡āĨ¤

https://github.com/DFoly/User_log_pipeline

āĻāĻ–āĻžāĻ¨ā§‡āĻ‡ āĻļā§‡āĻˇ. āĻĒā§āĻ°āĻĨāĻŽ āĻ…āĻ‚āĻļ āĻĒāĻĄāĻŧā§āĻ¨.

āĻ‰āĻ¤ā§āĻ¸: www.habr.com

āĻāĻ•āĻŸāĻŋ āĻŽāĻ¨ā§āĻ¤āĻŦā§āĻ¯ āĻœā§āĻĄāĻŧā§āĻ¨