Un exemple d'una aplicació basada en esdeveniments basada en webhooks a l'emmagatzematge d'objectes S3 Mail.ru Cloud Solutions

Un exemple d'una aplicació basada en esdeveniments basada en webhooks a l'emmagatzematge d'objectes S3 Mail.ru Cloud Solutions
Màquina de cafè Rube Goldberg

L'arquitectura basada en esdeveniments augmenta la rendibilitat dels recursos utilitzats perquè només s'utilitzen en el moment en què es necessiten. Hi ha moltes opcions sobre com implementar-ho i no crear entitats al núvol addicionals com a aplicacions de treball. I avui no parlaré de FaaS, sinó de webhooks. Mostraré un exemple tutorial de gestió d'esdeveniments mitjançant webhooks d'emmagatzematge d'objectes.

Unes quantes paraules sobre l'emmagatzematge d'objectes i els webhooks. L'emmagatzematge d'objectes permet emmagatzemar qualsevol dada al núvol en forma d'objectes, accessibles mitjançant S3 o una altra API (segons la implementació) mitjançant HTTP/HTTPS. Els webhooks són generalment devolucions de trucada HTTP personalitzades. Normalment es desencadenen per un esdeveniment, com ara l'enviament de codi a un repositori o un comentari publicat en un bloc. Quan es produeix un esdeveniment, el lloc d'origen envia una sol·licitud HTTP a l'URL especificat per al webhook. Com a resultat, podeu fer que els esdeveniments en un lloc desencadenin accions en un altre (wiki). En el cas en què el lloc d'origen és un emmagatzematge d'objectes, els esdeveniments actuen com a canvis en el seu contingut.

Exemples de casos senzills en què es pot utilitzar aquesta automatització:

  1. Creació de còpies de tots els objectes en un altre emmagatzematge al núvol. Les còpies s'han de crear sobre la marxa sempre que s'afegeixin o es modifiquin fitxers.
  2. Creació automàtica d'una sèrie de miniatures de fitxers gràfics, afegint filigranes a les fotografies i altres modificacions d'imatge.
  3. Notificació sobre l'arribada de nous documents (per exemple, un servei de comptabilitat distribuït carrega informes al núvol i el seguiment financer rep notificacions sobre nous informes, els comprova i els analitza).
  4. Els casos una mica més complexos impliquen, per exemple, generar una sol·licitud a Kubernetes, que crea un pod amb els contenidors necessaris, li passa els paràmetres de la tasca i, després del processament, col·lapsa el contenidor.

Com a exemple, farem una variant de la tasca 1, quan els canvis al cub d'emmagatzematge d'objectes de Mail.ru Cloud Solutions (MCS) es sincronitzin a l'emmagatzematge d'objectes AWS mitjançant webhooks. En un cas real de càrrega, el treball asíncron s'hauria de proporcionar registrant webhooks en una cua, però per a la tasca d'entrenament farem la implementació sense això.

Esquema de treball

El protocol d'interacció es descriu detalladament a Guia dels webhooks S3 a MCS. El pla de treball conté els elements següents:

  • Servei de publicacions, que es troba al costat d'emmagatzematge S3 i publica sol·licituds HTTP quan s'activa el webnhook.
  • Servidor receptor webhook, que escolta les sol·licituds del servei de publicació HTTP i realitza les accions adequades. El servidor es pot escriure en qualsevol idioma; en el nostre exemple, escriurem el servidor a Go.

Una característica especial de la implementació de webhooks a l'API S3 és el registre del servidor de recepció de webhooks al servei de publicació. En particular, el servidor receptor de webhook ha de confirmar la subscripció als missatges del servei de publicació (en altres implementacions de webhook, normalment no cal confirmar la subscripció).

En conseqüència, el servidor de recepció de webhook ha de suportar dues operacions principals:

  • respondre a la sol·licitud del servei de publicació per confirmar el registre,
  • processar esdeveniments entrants.

Instal·lació d'un servidor de recepció de webhook

Per executar el servidor de recepció del webhook, necessiteu un servidor Linux. En aquest article, com a exemple, fem servir una instància virtual que despleguem a MCS.

Instal·lem el programari necessari i iniciem el servidor de recepció del webhook.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Clonar la carpeta amb el servidor de recepció del webhook:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Comencem el servidor:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Subscriu-te al servei de publicacions

Podeu registrar el vostre servidor de recepció de webhook mitjançant l'API o la interfície web. Per simplificar, registrarem a través de la interfície web:

  1. Anem a la secció de cubs a la sala de control.
  2. Aneu al bucket per al qual configurarem els webhooks i feu clic a l'engranatge:

Un exemple d'una aplicació basada en esdeveniments basada en webhooks a l'emmagatzematge d'objectes S3 Mail.ru Cloud Solutions

Aneu a la pestanya Webhooks i feu clic a Afegeix:

Un exemple d'una aplicació basada en esdeveniments basada en webhooks a l'emmagatzematge d'objectes S3 Mail.ru Cloud Solutions
Ompliu els camps:

Un exemple d'una aplicació basada en esdeveniments basada en webhooks a l'emmagatzematge d'objectes S3 Mail.ru Cloud Solutions

ID: el nom del webhook.

Esdeveniment: quins esdeveniments cal transmetre. Hem configurat la transmissió de tots els esdeveniments que es produeixen quan es treballa amb fitxers (afegir i eliminar).

URL: webhook que rep l'adreça del servidor.

El prefix/sufix del filtre és un filtre que us permet generar webhooks només per a objectes els noms dels quals coincideixen amb determinades regles. Per exemple, per tal que el webhook només activi fitxers amb l'extensió .png, a Sufix de filtre heu d'escriure "png".

Actualment, només els ports 80 i 443 són compatibles per accedir al servidor de recepció del webhook.

Fem clic Afegiu ganxo i veurem el següent:

Un exemple d'una aplicació basada en esdeveniments basada en webhooks a l'emmagatzematge d'objectes S3 Mail.ru Cloud Solutions
Hook afegit.

El servidor de recepció del webhook mostra als seus registres el progrés del procés de registre del ganxo:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

La inscripció s'ha completat. A la següent secció, veurem més de prop l'algoritme de funcionament del servidor receptor webhook.

Descripció del servidor de recepció del webhook

En el nostre exemple, el servidor està escrit a Go. Vegem els principis bàsics del seu funcionament.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Considereu les funcions principals:

  • Ping() - una ruta que respon mitjançant URL/ping, la implementació més senzilla d'una sonda de vida.
  • Webhook() - ruta principal, gestor d'URL/webhook:
    • confirma el registre al servei de publicació (anar a la funció SubscriptionConfirmation),
    • processa els webhooks entrants (funció Gorecords).
  • Les funcions HmacSha256 i HmacSha256hex són implementacions dels algorismes de xifratge HMAC-SHA256 i HMAC-SHA256 amb sortida com una cadena de números hexadecimals per calcular la signatura.
  • main és la funció principal, processa els paràmetres de la línia d'ordres i registra els controladors d'URL.

Paràmetres de la línia d'ordres acceptats pel servidor:

  • -port és el port on el servidor escoltarà.
  • -address: adreça IP que el servidor escoltarà.
  • -script és un programa extern que es crida per a cada ganxo entrant.

Vegem més de prop algunes de les funcions:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Aquesta funció determina si ha arribat una sol·licitud per confirmar el registre o un webhook. Com es desprèn de documentació, si es confirma el registre, es rep la següent estructura Json a la sol·licitud de publicació:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

Aquesta consulta s'ha de respondre:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Quan la signatura es calcula com:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Si arriba un webhook, l'estructura de la sol·licitud de publicació és la següent:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

En conseqüència, depenent de la sol·licitud, cal entendre com processar les dades. Vaig triar l'entrada com a indicador "Type":"SubscriptionConfirmation", ja que està present a la sol·licitud de confirmació de la subscripció i no està present al webhook. En funció de la presència/absència d'aquesta entrada a la sol·licitud POST, l'execució posterior del programa passa a la funció SubscriptionConfirmation, o a la funció GotRecords.

No considerarem detalladament la funció SubscriptionConfirmation; s'implementa d'acord amb els principis establerts a documentació. Podeu veure el codi font d'aquesta funció a repositoris git del projecte.

La funció GotRecords analitza una sol·licitud entrant i per a cada objecte Record crida un script extern (el nom del qual s'ha passat al paràmetre -script) amb els paràmetres:

  • nom de la galleda
  • clau d'objecte
  • acció:
    • còpia - si a la sol·licitud original EventName = ObjectCreated | PutObject | PutObjectCopy
    • suprimir - si a la sol·licitud original EventName = ObjectRemoved | DeleteObject

Així, si arriba un ganxo amb una sol·licitud de publicació, tal com es descriu dalt, i el paràmetre -script=script.sh, l'script s'anomenarà de la següent manera:

script.sh  bucketA some-file-to-bucket copy

Cal entendre que aquest servidor receptor webhook no és una solució de producció completa, sinó un exemple simplificat d'una possible implementació.

Exemple de treball

Sincronitzem els fitxers des del cub principal a MCS al cub de còpia de seguretat a AWS. El cub principal s'anomena myfiles-ash, el de còpia de seguretat s'anomena myfiles-backup (la configuració del cub a AWS està fora de l'abast d'aquest article). En conseqüència, quan es col·loca un fitxer a la cubeta principal, la seva còpia hauria d'aparèixer a la còpia de seguretat i, quan s'elimina de la principal, s'hauria d'eliminar a la còpia de seguretat.

Treballarem amb cubs mitjançant la utilitat awscli, que és compatible tant amb l'emmagatzematge en núvol MCS com amb l'emmagatzematge en núvol AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Configurem l'accés a l'API S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Configurem l'accés a l'API AWS S3:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Comprovem els accessos:

A AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Per a MCS, quan executeu l'ordre, heu d'afegir —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Accés.

Ara escrivim un script per processar el ganxo entrant, anomenem-lo s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Comencem el servidor:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Vegem com funciona. A través de Interfície web MCS afegiu el fitxer test.txt al cub myfiles-ash. Els registres de la consola mostren que s'ha fet una sol·licitud al servidor webhook:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Comprovem el contingut del bucket myfiles-backup a AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Ara, a través de la interfície web, suprimirem el fitxer del cub myfiles-ash.

Registres del servidor:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Contingut de la galleda:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

El fitxer s'esborra, el problema s'ha resolt.

Conclusió i pendents

Tot el codi utilitzat en aquest article és al meu repositori. També hi ha exemples d'scripts i exemples de recompte de signatures per registrar webhooks.

Aquest codi no és més que un exemple de com podeu utilitzar els webhooks S3 a les vostres activitats. Com he dit al principi, si teniu previst utilitzar aquest servidor en producció, haureu de reescriure almenys el servidor per a un treball asíncron: registreu els webhooks entrants en una cua (RabbitMQ o NATS) i a partir d'aquí analitzeu-los i processeu-los. amb aplicacions dels treballadors. En cas contrari, quan els webhooks arriben massivament, és possible que trobeu una manca de recursos del servidor per completar les tasques. La presència de cues permet distribuir el servidor i els treballadors, així com resoldre problemes amb la repetició de tasques en cas de fallades. També és recomanable canviar el registre per un de més detallat i estandarditzat.

Bona sort!

Més lectura sobre el tema:

Font: www.habr.com

Afegeix comentari