Un exemplu de aplicație bazată pe evenimente bazate pe webhook-uri în stocarea obiectelor S3 Mail.ru Cloud Solutions

Un exemplu de aplicație bazată pe evenimente bazate pe webhook-uri în stocarea obiectelor S3 Mail.ru Cloud Solutions
aparat de cafea Rube Goldberg

Arhitectura bazată pe evenimente mărește eficiența costurilor resurselor utilizate deoarece acestea sunt utilizate doar în momentul în care sunt necesare. Există multe opțiuni despre cum să implementați acest lucru și să nu creați entități cloud suplimentare ca aplicații de lucru. Și astăzi voi vorbi nu despre FaaS, ci despre webhook-uri. Voi arăta un exemplu tutorial de gestionare a evenimentelor folosind webhook-uri de stocare a obiectelor.

Câteva cuvinte despre stocarea obiectelor și webhook-uri. Stocarea obiectelor vă permite să stocați orice date în cloud sub formă de obiecte, accesibile prin S3 sau alt API (în funcție de implementare) prin HTTP/HTTPS. Webhook-urile sunt, în general, apeluri HTTP personalizate. Acestea sunt de obicei declanșate de un eveniment, cum ar fi transmiterea unui cod într-un depozit sau postarea unui comentariu pe un blog. Când are loc un eveniment, site-ul de origine trimite o solicitare HTTP la adresa URL specificată pentru webhook. Ca rezultat, puteți face evenimente pe un site să declanșeze acțiuni pe altul (Wiki). În cazul în care site-ul sursă este un obiect de stocare, evenimentele acționează ca modificări ale conținutului său.

Exemple de cazuri simple în care poate fi utilizată o astfel de automatizare:

  1. Crearea de copii ale tuturor obiectelor într-un alt spațiu de stocare în cloud. Copiile trebuie create din mers ori de câte ori sunt adăugate sau modificate fișiere.
  2. Crearea automată a unei serii de miniaturi de fișiere grafice, adăugarea de filigrane la fotografii și alte modificări ale imaginii.
  3. Notificare despre sosirea de noi documente (de exemplu, un serviciu de contabilitate distribuit încarcă rapoarte în cloud, iar monitorizarea financiară primește notificări despre noi rapoarte, le verifică și le analizează).
  4. Cazurile puțin mai complexe implică, de exemplu, generarea unei cereri către Kubernetes, care creează un pod cu containerele necesare, îi transmite parametrii sarcinii și, după procesare, restrânge containerul.

De exemplu, vom face o variantă a sarcinii 1, atunci când modificările din compartimentul de stocare a obiectelor Mail.ru Cloud Solutions (MCS) sunt sincronizate în stocarea obiectelor AWS folosind webhook-uri. Într-un caz real încărcat, munca asincronă ar trebui asigurată prin înregistrarea webhook-urilor într-o coadă, dar pentru sarcina de antrenament vom face implementarea fără aceasta.

Schema de lucru

Protocolul de interacțiune este descris în detaliu în Ghid pentru webhook-urile S3 pe MCS. Schema de lucru conține următoarele elemente:

  • Serviciul de editare, care se află pe partea de stocare S3 și publică solicitări HTTP atunci când webnhook-ul este declanșat.
  • Server de recepție Webhook, care ascultă solicitările de la serviciul de publicare HTTP și efectuează acțiuni adecvate. Serverul poate fi scris în orice limbă; în exemplul nostru, vom scrie serverul în Go.

O caracteristică specială a implementării webhook-urilor în API-ul S3 este înregistrarea serverului de recepție webhook pe serviciul de publicare. În special, serverul de primire webhook trebuie să confirme abonamentul la mesajele de la serviciul de publicare (în alte implementări webhook, confirmarea abonamentului nu este de obicei necesară).

În consecință, serverul de recepție webhook trebuie să accepte două operațiuni principale:

  • să răspundă la cererea serviciului de publicare de a confirma înregistrarea,
  • procesează evenimentele primite.

Instalarea unui server de recepție webhook

Pentru a rula serverul de recepție webhook, aveți nevoie de un server Linux. În acest articol, ca exemplu, folosim o instanță virtuală pe care o implementăm pe MCS.

Să instalăm software-ul necesar și să lansăm serverul de primire webhook.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Clonează folderul cu serverul de primire webhook:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Să pornim serverul:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Abonați-vă la serviciul de publicare

Vă puteți înregistra serverul de recepție webhook prin intermediul API-ului sau al interfeței web. Pentru simplitate, ne vom înregistra prin interfața web:

  1. Să mergem la secțiunea găleți în camera de control.
  2. Mergeți la găleata pentru care vom configura webhook-uri și faceți clic pe roată:

Un exemplu de aplicație bazată pe evenimente bazate pe webhook-uri în stocarea obiectelor S3 Mail.ru Cloud Solutions

Accesați fila Webhooks și faceți clic pe Adăugați:

Un exemplu de aplicație bazată pe evenimente bazate pe webhook-uri în stocarea obiectelor S3 Mail.ru Cloud Solutions
Completați câmpurile:

Un exemplu de aplicație bazată pe evenimente bazate pe webhook-uri în stocarea obiectelor S3 Mail.ru Cloud Solutions

ID — numele webhook-ului.

Eveniment - ce evenimente să transmită. Am setat transmiterea tuturor evenimentelor care apar atunci când lucrăm cu fișiere (adăugarea și ștergerea).

URL — webhook care primește adresa serverului.

Prefixul/sufixul filtrului este un filtru care vă permite să generați webhook-uri numai pentru obiectele ale căror nume se potrivesc cu anumite reguli. De exemplu, pentru ca webhook-ul să declanșeze numai fișiere cu extensia .png, în Sufixul de filtrare trebuie să scrieți „png”.

În prezent, doar porturile 80 și 443 sunt acceptate pentru accesarea serverului de recepție webhook.

Hai să facem clic Adăugați cârlig si vom vedea urmatoarele:

Un exemplu de aplicație bazată pe evenimente bazate pe webhook-uri în stocarea obiectelor S3 Mail.ru Cloud Solutions
Hook a adăugat.

Serverul de primire webhook arată în jurnalele sale progresul procesului de înregistrare a hook:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Înregistrarea este finalizată. În secțiunea următoare, vom arunca o privire mai atentă asupra algoritmului de funcționare al serverului de recepție webhook.

Descrierea serverului de primire webhook

În exemplul nostru, serverul este scris în Go. Să ne uităm la principiile de bază ale funcționării sale.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Luați în considerare principalele funcții:

  • Ping() - o rută care răspunde prin URL/ping, cea mai simplă implementare a unei probe de liveness.
  • Webhook() - ruta principală, handler URL/webhook:
    • confirmă înregistrarea la serviciul de publicare (accesați funcția SubscriptionConfirmation),
    • procesează webhook-urile primite (funcția Gorecords).
  • Funcțiile HmacSha256 și HmacSha256hex sunt implementări ale algoritmilor de criptare HMAC-SHA256 și HMAC-SHA256 cu ieșire ca un șir de numere hexazecimale pentru calcularea semnăturii.
  • main este funcția principală, procesează parametrii liniei de comandă și înregistrează handlerele URL.

Parametrii liniei de comandă acceptați de server:

  • -port este portul pe care serverul va asculta.
  • -address - adresa IP pe care serverul o va asculta.
  • -script este un program extern care este apelat pentru fiecare hook de intrare.

Să aruncăm o privire mai atentă asupra unora dintre funcții:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Această funcție determină dacă a sosit o cerere de confirmare a înregistrării sau un webhook. După cum rezultă din documentație, dacă înregistrarea este confirmată, următoarea structură Json este primită în cererea Post:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

La această întrebare trebuie să se răspundă:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

În cazul în care semnătura este calculată ca:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Dacă sosește un webhook, structura solicitării Post arată astfel:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

În consecință, în funcție de cerere, trebuie să înțelegeți cum să procesați datele. Am ales intrarea ca indicator "Type":"SubscriptionConfirmation", deoarece este prezent în cererea de confirmare a abonamentului și nu este prezent în webhook. Pe baza prezenței/absenței acestei intrări în cererea POST, execuția ulterioară a programului se duce fie la funcție SubscriptionConfirmation, sau în funcție GotRecords.

Nu vom lua în considerare funcția SubscriptionConfirmation în detaliu; aceasta este implementată conform principiilor stabilite în documentație. Puteți vizualiza codul sursă pentru această funcție la arhivele de proiect git.

Funcția GotRecords analizează o cerere de intrare și pentru fiecare obiect Record apelează un script extern (al cărui nume a fost trecut în parametrul -script) cu parametrii:

  • numele găleții
  • cheie obiect
  • acțiune:
    • copie - dacă în cererea originală EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - dacă în cererea originală EventName = ObjectRemoved | DeleteObject

Astfel, dacă sosește un cârlig cu o cerere Post, așa cum este descris mai sus, și parametrul -script=script.sh, apoi scriptul va fi apelat după cum urmează:

script.sh  bucketA some-file-to-bucket copy

Trebuie înțeles că acest server de recepție webhook nu este o soluție completă de producție, ci un exemplu simplificat al unei posibile implementări.

Exemplu de lucru

Să sincronizăm fișierele din compartimentul principal din MCS cu compartimentul de rezervă din AWS. Bucket-ul principal se numește myfiles-ash, cel de rezervă se numește myfiles-backup (configurarea compartimentului în AWS depășește domeniul de aplicare al acestui articol). În consecință, atunci când un fișier este plasat în compartimentul principal, copia acestuia ar trebui să apară în cel de rezervă, iar atunci când este șters din cel principal, ar trebui să fie ștearsă în cel de rezervă.

Vom lucra cu găleți folosind utilitarul awscli, care este compatibil atât cu stocarea în cloud MCS, cât și cu stocarea în cloud AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Să configuram accesul la API-ul S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Să configuram accesul la API-ul AWS S3:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Să verificăm accesele:

Către AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Pentru MCS, atunci când rulați comanda, trebuie să adăugați —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Accesat.

Acum să scriem un script pentru procesarea cârligului de intrare, să-l numim s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Să pornim serverul:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Să vedem cum funcționează. Prin Interfață web MCS adăugați fișierul test.txt în găleata myfiles-ash. Jurnalele consolei arată că a fost făcută o solicitare către serverul webhook:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Să verificăm conținutul compartimentului myfiles-backup din AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Acum, prin interfața web, vom șterge fișierul din bucket-ul myfiles-ash.

Jurnalele serverului:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Conținutul găleții:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Fișierul este șters, problema este rezolvată.

Concluzie și ToDo

Tot codul folosit în acest articol este în depozitul meu. Există, de asemenea, exemple de scripturi și exemple de numărare a semnăturilor pentru înregistrarea webhook-urilor.

Acest cod nu este altceva decât un exemplu despre cum puteți utiliza webhook-urile S3 în activitățile dvs. După cum am spus la început, dacă intenționați să utilizați un astfel de server în producție, trebuie să rescrieți cel puțin serverul pentru lucrul asincron: înregistrați webhook-urile primite într-o coadă (RabbitMQ sau NATS) și de acolo să le analizați și să le procesați cu aplicații pentru lucrători. În caz contrar, atunci când webhook-urile ajung masiv, este posibil să întâmpinați o lipsă de resurse de server pentru a finaliza sarcinile. Prezența cozilor vă permite să distribuiți serverul și lucrătorii, precum și să rezolvați problemele cu sarcini repetate în caz de eșecuri. De asemenea, este recomandabil să schimbați înregistrarea cu una mai detaliată și mai standardizată.

Succes!

Mai multe lecturi pe tema:

Sursa: www.habr.com

Adauga un comentariu