ప్రోహోస్టర్ > బ్లాగ్ > పరిపాలన > స్పార్క్ స్ట్రీమింగ్తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్
స్పార్క్ స్ట్రీమింగ్తో అపాచీ కాఫ్కా మరియు స్ట్రీమింగ్ డేటా ప్రాసెసింగ్
హలో, హబ్ర్! ఈ రోజు మనం స్పార్క్ స్ట్రీమింగ్ని ఉపయోగించి అపాచీ కాఫ్కా సందేశ ప్రసారాలను ప్రాసెస్ చేసే సిస్టమ్ను రూపొందిస్తాము మరియు ప్రాసెసింగ్ ఫలితాలను AWS RDS క్లౌడ్ డేటాబేస్కు వ్రాస్తాము.
ఒక నిర్దిష్ట క్రెడిట్ సంస్థ తన శాఖలన్నింటిలో "ఫ్లైలో" ఇన్కమింగ్ లావాదేవీలను ప్రాసెస్ చేసే పనిని సెట్ చేస్తుందని ఊహించుకుందాం. ట్రెజరీ కోసం ఓపెన్ కరెన్సీ స్థానం, పరిమితులు లేదా లావాదేవీల కోసం ఆర్థిక ఫలితాలు మొదలైనవాటిని తక్షణమే లెక్కించడం కోసం ఇది చేయవచ్చు.
మేజిక్ మరియు మాయా మంత్రాలను ఉపయోగించకుండా ఈ కేసును ఎలా అమలు చేయాలి - కట్ కింద చదవండి! వెళ్ళండి!
వాస్తవానికి, నిజ సమయంలో పెద్ద మొత్తంలో డేటాను ప్రాసెస్ చేయడం ఆధునిక వ్యవస్థలలో ఉపయోగించడానికి పుష్కల అవకాశాలను అందిస్తుంది. దీనికి అత్యంత ప్రజాదరణ పొందిన కలయికలలో ఒకటి అపాచీ కాఫ్కా మరియు స్పార్క్ స్ట్రీమింగ్ యొక్క టెన్డం, ఇక్కడ కాఫ్కా ఇన్కమింగ్ మెసేజ్ ప్యాకెట్ల స్ట్రీమ్ను సృష్టిస్తుంది మరియు స్పార్క్ స్ట్రీమింగ్ ఈ ప్యాకెట్లను ఇచ్చిన సమయ వ్యవధిలో ప్రాసెస్ చేస్తుంది.
అప్లికేషన్ యొక్క తప్పు సహనాన్ని పెంచడానికి, మేము తనిఖీ కేంద్రాలను ఉపయోగిస్తాము. ఈ మెకానిజంతో, స్పార్క్ స్ట్రీమింగ్ ఇంజిన్ కోల్పోయిన డేటాను తిరిగి పొందవలసి వచ్చినప్పుడు, అది చివరి చెక్పాయింట్కి తిరిగి వెళ్లి, అక్కడ నుండి గణనలను పునఃప్రారంభించాలి.
అభివృద్ధి చెందిన వ్యవస్థ యొక్క ఆర్కిటెక్చర్
ఉపయోగించిన భాగాలు:
అపాచీ కాఫ్కా పంపిణీ చేయబడిన ప్రచురణ-చందా సందేశ వ్యవస్థ. ఆఫ్లైన్ మరియు ఆన్లైన్ సందేశ వినియోగం రెండింటికీ అనుకూలం. డేటా నష్టాన్ని నివారించడానికి, కాఫ్కా సందేశాలు డిస్క్లో నిల్వ చేయబడతాయి మరియు క్లస్టర్లో ప్రతిరూపం చేయబడతాయి. కాఫ్కా సిస్టమ్ ZooKeeper సింక్రొనైజేషన్ సర్వీస్ పైన నిర్మించబడింది;
అపాచీ స్పార్క్ స్ట్రీమింగ్ - స్ట్రీమింగ్ డేటాను ప్రాసెస్ చేయడానికి స్పార్క్ భాగం. స్పార్క్ స్ట్రీమింగ్ మాడ్యూల్ మైక్రో-బ్యాచ్ ఆర్కిటెక్చర్ను ఉపయోగించి నిర్మించబడింది, ఇక్కడ డేటా స్ట్రీమ్ చిన్న డేటా ప్యాకెట్ల యొక్క నిరంతర క్రమం వలె వివరించబడుతుంది. స్పార్క్ స్ట్రీమింగ్ వివిధ మూలాల నుండి డేటాను తీసుకుంటుంది మరియు దానిని చిన్న ప్యాకేజీలుగా మిళితం చేస్తుంది. కొత్త ప్యాకేజీలు క్రమ వ్యవధిలో సృష్టించబడతాయి. ప్రతి సమయ విరామం ప్రారంభంలో, కొత్త ప్యాకెట్ సృష్టించబడుతుంది మరియు ఆ వ్యవధిలో అందుకున్న ఏదైనా డేటా ప్యాకెట్లో చేర్చబడుతుంది. విరామం ముగింపులో, ప్యాకెట్ పెరుగుదల ఆగిపోతుంది. విరామం యొక్క పరిమాణం బ్యాచ్ విరామం అని పిలువబడే పరామితి ద్వారా నిర్ణయించబడుతుంది;
అపాచీ స్పార్క్ SQL - స్పార్క్ ఫంక్షనల్ ప్రోగ్రామింగ్తో రిలేషనల్ ప్రాసెసింగ్ను మిళితం చేస్తుంది. స్ట్రక్చర్డ్ డేటా అంటే స్కీమా ఉన్న డేటా, అంటే అన్ని రికార్డ్ల కోసం ఒకే సెట్ ఫీల్డ్లు. Spark SQL వివిధ నిర్మాణాత్మక డేటా మూలాల నుండి ఇన్పుట్కు మద్దతు ఇస్తుంది మరియు స్కీమా సమాచారం యొక్క లభ్యతకు ధన్యవాదాలు, ఇది అవసరమైన రికార్డ్ల ఫీల్డ్లను మాత్రమే సమర్ధవంతంగా తిరిగి పొందగలదు మరియు డేటాఫ్రేమ్ APIలను కూడా అందిస్తుంది;
AWS RDS సాపేక్షంగా చవకైన క్లౌడ్-ఆధారిత రిలేషనల్ డేటాబేస్, సెటప్, ఆపరేషన్ మరియు స్కేలింగ్ను సులభతరం చేసే వెబ్ సేవ మరియు నేరుగా Amazon ద్వారా నిర్వహించబడుతుంది.
కాఫ్కా సర్వర్ను ఇన్స్టాల్ చేయడం మరియు అమలు చేయడం
కాఫ్కాను నేరుగా ఉపయోగించే ముందు, మీరు జావాను కలిగి ఉన్నారని నిర్ధారించుకోవాలి, ఎందుకంటే... JVM పని కోసం ఉపయోగించబడుతుంది:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
తదుపరి దశ ఐచ్ఛికం. వాస్తవం ఏమిటంటే డిఫాల్ట్ సెట్టింగ్లు అపాచీ కాఫ్కా యొక్క అన్ని లక్షణాలను పూర్తిగా ఉపయోగించడానికి మిమ్మల్ని అనుమతించవు. ఉదాహరణకు, సందేశాలను ప్రచురించగల అంశం, వర్గం, సమూహాన్ని తొలగించండి. దీన్ని మార్చడానికి, కాన్ఫిగరేషన్ ఫైల్ని ఎడిట్ చేద్దాం:
vim ~/kafka/config/server.properties
ఫైల్ చివర కింది వాటిని జోడించండి:
delete.topic.enable = true
కాఫ్కా సర్వర్ని ప్రారంభించే ముందు, మీరు ZooKeeper సర్వర్ని ప్రారంభించాలి; మేము కాఫ్కా పంపిణీతో వచ్చే సహాయక స్క్రిప్ట్ని ఉపయోగిస్తాము:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper విజయవంతంగా ప్రారంభించిన తర్వాత, కాఫ్కా సర్వర్ను ప్రత్యేక టెర్మినల్లో ప్రారంభించండి:
కొత్తగా సృష్టించబడిన అంశం కోసం నిర్మాత మరియు వినియోగదారుని పరీక్షించే క్షణాలను మిస్ చేద్దాం. మీరు సందేశాలను పంపడం మరియు స్వీకరించడం ఎలా పరీక్షించవచ్చనే దాని గురించి మరిన్ని వివరాలు అధికారిక డాక్యుమెంటేషన్లో వ్రాయబడ్డాయి - కొన్ని సందేశాలు పంపండి. సరే, మేము కాఫ్కాప్రొడ్యూసర్ APIని ఉపయోగించి పైథాన్లో ప్రొడ్యూసర్ని వ్రాయడం కొనసాగిస్తాము.
నిర్మాత రచన
నిర్మాత యాదృచ్ఛిక డేటాను రూపొందిస్తారు - ప్రతి సెకనుకు 100 సందేశాలు. యాదృచ్ఛిక డేటా ద్వారా మేము మూడు ఫీల్డ్లతో కూడిన నిఘంటువు అని అర్థం:
బ్రాంచ్ - క్రెడిట్ ఇన్స్టిట్యూషన్ పాయింట్ ఆఫ్ సేల్ పేరు;
కరెన్సీ - లావాదేవీ కరెన్సీ;
మొత్తం - లావాదేవి మొత్తం. బ్యాంక్ ద్వారా కరెన్సీని కొనుగోలు చేసినట్లయితే మొత్తం సానుకూల సంఖ్యగా ఉంటుంది మరియు అమ్మకం అయితే ప్రతికూల సంఖ్యగా ఉంటుంది.
తరువాత, పంపే పద్ధతిని ఉపయోగించి, మేము సర్వర్కు, మనకు అవసరమైన అంశానికి, JSON ఆకృతిలో సందేశాన్ని పంపుతాము:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
స్క్రిప్ట్ను అమలు చేస్తున్నప్పుడు, మేము టెర్మినల్లో క్రింది సందేశాలను అందుకుంటాము:
అంటే అన్నీ మనం కోరుకున్నట్లే పనిచేస్తాయని అర్థం - ప్రొడ్యూసర్ మనకు అవసరమైన టాపిక్కి మెసేజ్లను రూపొందించి పంపుతాడు.
తదుపరి దశ స్పార్క్ని ఇన్స్టాల్ చేసి, ఈ సందేశ ప్రసారాన్ని ప్రాసెస్ చేయడం.
Apache Sparkని ఇన్స్టాల్ చేస్తోంది
అపాచీ స్పార్క్ సార్వత్రిక మరియు అధిక-పనితీరు గల క్లస్టర్ కంప్యూటింగ్ ప్లాట్ఫారమ్.
ఇంటరాక్టివ్ ప్రశ్నలు మరియు స్ట్రీమ్ ప్రాసెసింగ్తో సహా విస్తృత శ్రేణి గణన రకాలకు మద్దతునిస్తూ MapReduce మోడల్ యొక్క జనాదరణ పొందిన అమలుల కంటే స్పార్క్ మెరుగ్గా పని చేస్తుంది. పెద్ద మొత్తంలో డేటాను ప్రాసెస్ చేసేటప్పుడు వేగం ఒక ముఖ్యమైన పాత్ర పోషిస్తుంది, ఎందుకంటే ఇది నిమిషాలు లేదా గంటలు వేచి ఉండకుండా ఇంటరాక్టివ్గా పని చేయడానికి మిమ్మల్ని అనుమతిస్తుంది. స్పార్క్ యొక్క అతిపెద్ద బలాల్లో ఒకటి మెమరీలో గణనలను నిర్వహించగల సామర్థ్యం.
ఈ ఫ్రేమ్వర్క్ స్కాలాలో వ్రాయబడింది, కాబట్టి మీరు దీన్ని ముందుగా ఇన్స్టాల్ చేయాలి:
sudo apt-get install scala
అధికారిక వెబ్సైట్ నుండి స్పార్క్ పంపిణీని డౌన్లోడ్ చేయండి:
ఎందుకంటే ఈ ఉదాహరణ విద్యా ప్రయోజనాల కోసం మాత్రమే; మేము "కనీసం" (ఉచిత టైర్) ఉచిత సర్వర్ని ఉపయోగిస్తాము:
తరువాత, మేము ఫ్రీ టైర్ బ్లాక్లో టిక్ను ఉంచాము మరియు ఆ తర్వాత మనకు స్వయంచాలకంగా t2.మైక్రో క్లాస్ యొక్క ఉదాహరణ అందించబడుతుంది - బలహీనంగా ఉన్నప్పటికీ, ఇది ఉచితం మరియు మా పనికి చాలా అనుకూలంగా ఉంటుంది:
తర్వాత చాలా ముఖ్యమైన విషయాలు వస్తాయి: డేటాబేస్ ఉదాహరణ పేరు, మాస్టర్ యూజర్ పేరు మరియు అతని పాస్వర్డ్. ఉదాహరణకి పేరు పెట్టండి: myHabrTest, మాస్టర్ యూజర్: హాబ్ర్, పాస్వర్డ్: habr12345 మరియు తదుపరి బటన్పై క్లిక్ చేయండి:
తరువాతి పేజీలో బయటి నుండి మా డేటాబేస్ సర్వర్ యొక్క యాక్సెసిబిలిటీకి (పబ్లిక్ యాక్సెసిబిలిటీ) మరియు పోర్ట్ లభ్యతకు బాధ్యత వహించే పారామితులు ఉన్నాయి:
VPC భద్రతా సమూహం కోసం కొత్త సెట్టింగ్ని క్రియేట్ చేద్దాం, ఇది పోర్ట్ 5432 (PostgreSQL) ద్వారా మా డేటాబేస్ సర్వర్కు బాహ్య ప్రాప్యతను అనుమతిస్తుంది.
VPC డాష్బోర్డ్ -> సెక్యూరిటీ గ్రూప్లు -> క్రియేట్ సెక్యూరిటీ గ్రూప్ సెక్షన్కి ప్రత్యేక బ్రౌజర్ విండోలోని AWS కన్సోల్కి వెళ్దాం:
మేము భద్రతా సమూహానికి పేరును సెట్ చేసాము - PostgreSQL, ఒక వివరణ, ఈ సమూహం ఏ VPCతో అనుబంధించబడాలో సూచించండి మరియు సృష్టించు బటన్ను క్లిక్ చేయండి:
దిగువ చిత్రంలో చూపిన విధంగా, కొత్తగా సృష్టించబడిన సమూహం కోసం పోర్ట్ 5432 కోసం ఇన్బౌండ్ నియమాలను పూరించండి. మీరు పోర్ట్ను మాన్యువల్గా పేర్కొనలేరు, కానీ టైప్ డ్రాప్-డౌన్ జాబితా నుండి PostgreSQLని ఎంచుకోండి.
ఖచ్చితంగా చెప్పాలంటే, విలువ ::/0 అంటే ప్రపంచం నలుమూలల నుండి సర్వర్కి ఇన్కమింగ్ ట్రాఫిక్ లభ్యత, ఇది కానానికల్గా పూర్తిగా నిజం కాదు, కానీ ఉదాహరణను విశ్లేషించడానికి, ఈ విధానాన్ని ఉపయోగించడానికి మనల్ని మనం అనుమతించుకుందాం:
మేము బ్రౌజర్ పేజీకి తిరిగి వస్తాము, ఇక్కడ "అధునాతన సెట్టింగ్లను కాన్ఫిగర్ చేయండి" తెరిచి, VPC భద్రతా సమూహాల విభాగంలో ఎంచుకోండి -> ఇప్పటికే ఉన్న VPC భద్రతా సమూహాలను ఎంచుకోండి -> PostgreSQL:
తరువాత, డేటాబేస్ ఎంపికలలో -> డేటాబేస్ పేరు -> పేరును సెట్ చేయండి - habrDB.
మేము డిఫాల్ట్గా బ్యాకప్ (బ్యాకప్ నిలుపుదల వ్యవధి - 0 రోజులు), పర్యవేక్షణ మరియు పనితీరు అంతర్దృష్టులను నిలిపివేయడం మినహా మిగిలిన పారామితులను వదిలివేయవచ్చు. బటన్ పై క్లిక్ చేయండి డేటాబేస్ సృష్టించండి:
థ్రెడ్ హ్యాండ్లర్
చివరి దశ స్పార్క్ జాబ్ అభివృద్ధి అవుతుంది, ఇది ప్రతి రెండు సెకన్లకు కాఫ్కా నుండి వచ్చే కొత్త డేటాను ప్రాసెస్ చేస్తుంది మరియు ఫలితాన్ని డేటాబేస్లోకి నమోదు చేస్తుంది.
పైన పేర్కొన్నట్లుగా, చెక్పాయింట్లు స్పార్క్స్ట్రీమింగ్లో ఒక ప్రధాన మెకానిజం, ఇది తప్పును సహించడాన్ని నిర్ధారించడానికి తప్పనిసరిగా కాన్ఫిగర్ చేయబడాలి. మేము చెక్పాయింట్లను ఉపయోగిస్తాము మరియు ప్రక్రియ విఫలమైతే, స్పార్క్ స్ట్రీమింగ్ మాడ్యూల్ చివరి చెక్పాయింట్కు తిరిగి వెళ్లి, కోల్పోయిన డేటాను పునరుద్ధరించడానికి దాని నుండి గణనలను పునఃప్రారంభించవలసి ఉంటుంది.
చెక్పాయింట్ సమాచారం నిల్వ చేయబడే తప్పు-తట్టుకునే, విశ్వసనీయ ఫైల్ సిస్టమ్ (HDFS, S3, మొదలైనవి)పై డైరెక్టరీని సెట్ చేయడం ద్వారా చెక్పాయింటింగ్ ప్రారంభించబడుతుంది. ఇది ఉపయోగించి చేయబడుతుంది, ఉదాహరణకు:
streamingContext.checkpoint(checkpointDirectory)
మా ఉదాహరణలో, మేము ఈ క్రింది విధానాన్ని ఉపయోగిస్తాము, అవి చెక్పాయింట్ డైరెక్టరీ ఉంటే, ఆ సందర్భం చెక్పాయింట్ డేటా నుండి మళ్లీ సృష్టించబడుతుంది. డైరెక్టరీ ఉనికిలో లేకుంటే (అనగా మొదటి సారి అమలు చేయబడింది), అప్పుడు కొత్త సందర్భాన్ని సృష్టించడానికి మరియు DStreams కాన్ఫిగర్ చేయడానికి functionToCreateContext అంటారు:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
KafkaUtils లైబ్రరీ యొక్క createDirectStream పద్ధతిని ఉపయోగించి "లావాదేవీ" అంశానికి కనెక్ట్ చేయడానికి మేము డైరెక్ట్స్ట్రీమ్ ఆబ్జెక్ట్ను సృష్టిస్తాము:
Spark SQLని ఉపయోగించి, మేము ఒక సాధారణ సమూహాన్ని చేస్తాము మరియు కన్సోల్లో ఫలితాన్ని ప్రదర్శిస్తాము:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
ప్రశ్న వచనాన్ని పొందడం మరియు స్పార్క్ SQL ద్వారా దాన్ని అమలు చేయడం:
ఆపై మేము ఫలితంగా సమగ్ర డేటాను AWS RDSలోని పట్టికలో సేవ్ చేస్తాము. అగ్రిగేషన్ ఫలితాలను డేటాబేస్ పట్టికలో సేవ్ చేయడానికి, మేము డేటాఫ్రేమ్ ఆబ్జెక్ట్ యొక్క వ్రాత పద్ధతిని ఉపయోగిస్తాము:
AWS RDSకి కనెక్షన్ని సెటప్ చేయడం గురించి కొన్ని మాటలు. "AWS PostgreSQLని అమలు చేస్తోంది" దశలో మేము దాని కోసం వినియోగదారు మరియు పాస్వర్డ్ను సృష్టించాము. మీరు ఎండ్పాయింట్ని డేటాబేస్ సర్వర్ urlగా ఉపయోగించాలి, ఇది కనెక్టివిటీ & సెక్యూరిటీ విభాగంలో ప్రదర్శించబడుతుంది:
స్పార్క్ మరియు కాఫ్కాను సరిగ్గా కనెక్ట్ చేయడానికి, మీరు ఆర్టిఫ్యాక్ట్ని ఉపయోగించి స్మార్క్-సబ్మిట్ ద్వారా జాబ్ని అమలు చేయాలి స్పార్క్-స్ట్రీమింగ్-కాఫ్కా-0-8_2.11. అదనంగా, మేము PostgreSQL డేటాబేస్తో పరస్పర చర్య చేయడానికి ఒక కళాఖండాన్ని కూడా ఉపయోగిస్తాము; మేము వాటిని --packages ద్వారా బదిలీ చేస్తాము.
స్క్రిప్ట్ యొక్క సౌలభ్యం కోసం, మేము సందేశ సర్వర్ పేరు మరియు మేము డేటాను స్వీకరించాలనుకుంటున్న అంశాన్ని కూడా ఇన్పుట్ పారామీటర్లుగా చేర్చుతాము.
కాబట్టి, సిస్టమ్ యొక్క కార్యాచరణను ప్రారంభించి, తనిఖీ చేయడానికి ఇది సమయం:
అంతా పని చేసింది! మీరు దిగువ చిత్రంలో చూడగలిగినట్లుగా, అప్లికేషన్ రన్ అవుతున్నప్పుడు, కొత్త అగ్రిగేషన్ ఫలితాలు ప్రతి 2 సెకన్లకు అవుట్పుట్ చేయబడతాయి, ఎందుకంటే మేము StreamingContext ఆబ్జెక్ట్ని సృష్టించినప్పుడు బ్యాచింగ్ విరామాన్ని 2 సెకన్లకు సెట్ చేసాము:
తరువాత, పట్టికలో రికార్డుల ఉనికిని తనిఖీ చేయడానికి మేము డేటాబేస్కు ఒక సాధారణ ప్రశ్న చేస్తాము లావాదేవీ_ప్రవాహం:
తీర్మానం
ఈ కథనం Apache Kafka మరియు PostgreSQLతో కలిపి స్పార్క్ స్ట్రీమింగ్ని ఉపయోగించి సమాచార ప్రసార ప్రాసెసింగ్ యొక్క ఉదాహరణను చూసింది. వివిధ మూలాల నుండి డేటా పెరుగుదలతో, స్ట్రీమింగ్ మరియు రియల్ టైమ్ అప్లికేషన్లను రూపొందించడానికి స్పార్క్ స్ట్రీమింగ్ యొక్క ఆచరణాత్మక విలువను అతిగా అంచనా వేయడం కష్టం.
మీరు నా రిపోజిటరీలో పూర్తి సోర్స్ కోడ్ని కనుగొనవచ్చు గ్యాలరీలు.
నేను ఈ కథనాన్ని చర్చించడానికి సంతోషంగా ఉన్నాను, నేను మీ వ్యాఖ్యల కోసం ఎదురు చూస్తున్నాను మరియు శ్రద్ధగల పాఠకులందరి నుండి నిర్మాణాత్మక విమర్శలను కూడా ఆశిస్తున్నాను.
మీరు విజయం సాధించాలని కోరుకుంటున్నాను!
కీర్త. మొదట్లో ఇది స్థానిక PostgreSQL డేటాబేస్ను ఉపయోగించాలని ప్రణాళిక చేయబడింది, కానీ AWS పట్ల నాకున్న ప్రేమను బట్టి, నేను డేటాబేస్ను క్లౌడ్కి తరలించాలని నిర్ణయించుకున్నాను. ఈ అంశంపై తదుపరి కథనంలో, AWS కినిసిస్ మరియు AWS EMR ఉపయోగించి AWSలో పైన వివరించిన మొత్తం సిస్టమ్ను ఎలా అమలు చేయాలో నేను చూపుతాను. వార్తలను అనుసరించండి!