Un exemple d'application basée sur des événements basée sur des webhooks dans le stockage d'objets S3 Mail.ru Cloud Solutions

Un exemple d'application basée sur des événements basée sur des webhooks dans le stockage d'objets S3 Mail.ru Cloud Solutions
Machine à café Rube Goldberg

L'architecture événementielle augmente la rentabilité des ressources utilisées car elles ne sont utilisées qu'au moment où elles sont nécessaires. Il existe de nombreuses options pour mettre en œuvre cela sans créer d'entités cloud supplémentaires en tant qu'applications de travail. Et aujourd'hui, je ne parlerai pas de FaaS, mais de webhooks. Je vais montrer un exemple de didacticiel de gestion d'événements à l'aide de webhooks de stockage d'objets.

Quelques mots sur le stockage objet et les webhooks. Le stockage objet permet de stocker n'importe quelle donnée dans le cloud sous forme d'objets, accessibles via S3 ou une autre API (selon implémentation) via HTTP/HTTPS. Les webhooks sont généralement des rappels HTTP personnalisés. Ils sont généralement déclenchés par un événement, tel que le transfert de code vers un référentiel ou la publication d'un commentaire sur un blog. Lorsqu'un événement se produit, le site d'origine envoie une requête HTTP à l'URL spécifiée pour le webhook. Par conséquent, vous pouvez faire en sorte que des événements sur un site déclenchent des actions sur un autre (wiki). Dans le cas où le site source est un stockage objet, les événements agissent comme des modifications de son contenu.

Exemples de cas simples où une telle automatisation peut être utilisée :

  1. Création de copies de tous les objets dans un autre stockage cloud. Des copies doivent être créées à la volée chaque fois que des fichiers sont ajoutés ou modifiés.
  2. Création automatique d'une série de vignettes de fichiers graphiques, ajout de filigranes aux photographies et autres modifications d'image.
  3. Notification de l'arrivée de nouveaux documents (par exemple, un service de comptabilité distribué télécharge des rapports sur le cloud et la surveillance financière reçoit des notifications sur les nouveaux rapports, les vérifie et les analyse).
  4. Des cas légèrement plus complexes impliquent, par exemple, la génération d'une requête vers Kubernetes, qui crée un pod avec les conteneurs nécessaires, lui transmet les paramètres de la tâche et, après le traitement, réduit le conteneur.

À titre d'exemple, nous créerons une variante de la tâche 1, lorsque les modifications apportées au compartiment de stockage d'objets Mail.ru Cloud Solutions (MCS) sont synchronisées dans le stockage d'objets AWS à l'aide de webhooks. Dans un cas réel chargé, le travail asynchrone devrait être assuré en enregistrant les webhooks dans une file d'attente, mais pour la tâche de formation, nous effectuerons l'implémentation sans cela.

Façon de travailler

Le protocole d'interaction est décrit en détail dans Guide des webhooks S3 sur MCS. Le plan de travail contient les éléments suivants :

  • Service de publication, qui se trouve du côté du stockage S3 et publie les requêtes HTTP lorsque le webnhook est déclenché.
  • Serveur de réception de webhook, qui écoute les requêtes du service de publication HTTP et exécute les actions appropriées. Le serveur peut être écrit dans n'importe quelle langue ; dans notre exemple, nous écrirons le serveur en Go.

Une particularité de l'implémentation des webhooks dans l'API S3 est l'enregistrement du serveur de réception du webhook sur le service de publication. En particulier, le serveur de réception du webhook doit confirmer l'abonnement aux messages du service de publication (dans d'autres implémentations de webhook, la confirmation de l'abonnement n'est généralement pas requise).

En conséquence, le serveur de réception du webhook doit prendre en charge deux opérations principales :

  • répondre à la demande de confirmation d’inscription du service édition,
  • traiter les événements entrants.

Installation d'un serveur de réception de webhook

Pour exécuter le serveur de réception du webhook, vous avez besoin d'un serveur Linux. Dans cet article, à titre d'exemple, nous utilisons une instance virtuelle que nous déployons sur MCS.

Installons le logiciel nécessaire et lançons le serveur de réception du webhook.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Clonez le dossier avec le serveur de réception du webhook :

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Démarrons le serveur :

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Abonnez-vous au service de publication

Vous pouvez enregistrer votre serveur de réception de webhook via l'API ou l'interface Web. Pour plus de simplicité, nous nous inscrireons via l'interface web :

  1. Passons à la section seaux dans la salle de contrôle.
  2. Accédez au bucket pour lequel nous allons configurer les webhooks et cliquez sur l'engrenage :

Un exemple d'application basée sur des événements basée sur des webhooks dans le stockage d'objets S3 Mail.ru Cloud Solutions

Accédez à l'onglet Webhooks et cliquez sur Ajouter :

Un exemple d'application basée sur des événements basée sur des webhooks dans le stockage d'objets S3 Mail.ru Cloud Solutions
Remplissez les champs :

Un exemple d'application basée sur des événements basée sur des webhooks dans le stockage d'objets S3 Mail.ru Cloud Solutions

ID : le nom du webhook.

Événement - quels événements transmettre. Nous avons défini la transmission de tous les événements qui se produisent lors du travail avec des fichiers (ajout et suppression).

URL : adresse du serveur de réception du webhook.

Le préfixe/suffixe de filtre est un filtre qui vous permet de générer des webhooks uniquement pour les objets dont les noms correspondent à certaines règles. Par exemple, pour que le webhook déclenche uniquement les fichiers portant l'extension .png, dans Suffixe de filtre vous devez écrire « png ».

Actuellement, seuls les ports 80 et 443 sont pris en charge pour accéder au serveur de réception du webhook.

Cliquons Ajouter un crochet et nous verrons ce qui suit :

Un exemple d'application basée sur des événements basée sur des webhooks dans le stockage d'objets S3 Mail.ru Cloud Solutions
Crochet ajouté.

Le serveur de réception du webhook affiche dans ses journaux la progression du processus d'enregistrement du hook :

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

L'inscription est terminée. Dans la section suivante, nous examinerons de plus près l'algorithme de fonctionnement du serveur de réception du webhook.

Description du serveur de réception du webhook

Dans notre exemple, le serveur est écrit en Go. Regardons les principes de base de son fonctionnement.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Considérez les fonctions principales:

  • Ping() - une route qui répond via URL/ping, l'implémentation la plus simple d'une sonde d'activité.
  • Webhook() - route principale, gestionnaire d'URL/webhook :
    • confirme l'inscription sur le service de publication (se rendre dans la fonction SubscriptionConfirmation),
    • traite les webhooks entrants (fonction Gorecords).
  • Les fonctions HmacSha256 et HmacSha256hex sont des implémentations des algorithmes de chiffrement HMAC-SHA256 et HMAC-SHA256 avec sortie sous forme de chaîne de nombres hexadécimaux pour calculer la signature.
  • main est la fonction principale, traite les paramètres de ligne de commande et enregistre les gestionnaires d'URL.

Paramètres de ligne de commande acceptés par le serveur :

  • -port est le port sur lequel le serveur écoutera.
  • -address - Adresse IP que le serveur écoutera.
  • -script est un programme externe appelé pour chaque hook entrant.

Examinons de plus près certaines des fonctions :

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Cette fonction détermine si une demande de confirmation d'enregistrement ou un webhook est arrivé. Comme suit de documentation, si l'inscription est confirmée, la structure Json suivante est reçue dans la requête Post :

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

Il faut répondre à cette question :

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Où la signature est calculée comme suit :

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Si un webhook arrive, la structure de la requête Post ressemble à ceci :

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Ainsi, en fonction de la demande, vous devez comprendre comment traiter les données. J'ai choisi l'entrée comme indicateur "Type":"SubscriptionConfirmation", puisqu'il est présent dans la demande de confirmation d'abonnement et n'est pas présent dans le webhook. En fonction de la présence/absence de cette entrée dans la requête POST, la poursuite de l'exécution du programme passe soit par la fonction SubscriptionConfirmation, ou dans une fonction GotRecords.

Nous ne détaillerons pas la fonction SubscriptionConfirmation, elle est implémentée selon les principes énoncés dans documentation. Vous pouvez consulter le code source de cette fonction sur dépôts git du projet.

La fonction GotRecords analyse une requête entrante et pour chaque objet Record appelle un script externe (dont le nom a été passé dans le paramètre -script) avec les paramètres :

  • nom du compartiment
  • clé d'objet
  • action:
    • copier - si dans la demande d'origine EventName = ObjectCreated | PutObject | PutObjectCopie
    • delete - si dans la demande d'origine EventName = ObjectRemoved | Supprimer l'objet

Ainsi, si un hook arrive avec une requête Post, comme décrit au-dessus, et le paramètre -script=script.sh alors le script sera appelé comme suit :

script.sh  bucketA some-file-to-bucket copy

Il faut bien comprendre que ce serveur de réception de webhook n'est pas une solution de production complète, mais un exemple simplifié d'implémentation possible.

Exemple de travail

Synchronisons les fichiers du compartiment principal dans MCS avec le compartiment de sauvegarde dans AWS. Le bucket principal s'appelle myfiles-ash, celui de sauvegarde s'appelle myfiles-backup (la configuration du bucket dans AWS dépasse le cadre de cet article). Par conséquent, lorsqu'un fichier est placé dans le compartiment principal, sa copie doit apparaître dans celui de sauvegarde, et lorsqu'il est supprimé du compartiment principal, il doit être supprimé dans celui de sauvegarde.

Nous travaillerons avec des buckets à l'aide de l'utilitaire awscli, qui est compatible à la fois avec le stockage cloud MCS et le stockage cloud AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Configurons l'accès à l'API S3 MCS :

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Configurons l'accès à l'API AWS S3 :

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Vérifions les accès :

Vers AWS :

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Pour MCS, lors de l'exécution de la commande, vous devez ajouter —endpoint-url :

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Accédé.

Écrivons maintenant un script pour traiter le hook entrant, appelons-le s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Démarrons le serveur :

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Voyons voir comment ça fonctionne. À travers Interface Web MCS ajoutez le fichier test.txt au compartiment myfiles-ash. Les journaux de la console montrent qu'une requête a été adressée au serveur webhook :

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Vérifions le contenu du bucket myfiles-backup dans AWS :

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Maintenant, via l'interface Web, nous allons supprimer le fichier du compartiment myfiles-ash.

Journaux du serveur :

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Contenu du seau :

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Le fichier est supprimé, le problème est résolu.

Conclusion et ToDo

Tout le code utilisé dans cet article est dans mon dépôt. Il existe également des exemples de scripts et des exemples de comptage de signatures pour l'enregistrement de webhooks.

Ce code n'est rien de plus qu'un exemple de la façon dont vous pouvez utiliser les webhooks S3 dans vos activités. Comme je l'ai dit au début, si vous envisagez d'utiliser un tel serveur en production, vous devez au moins réécrire le serveur pour un travail asynchrone : enregistrer les webhooks entrants dans une file d'attente (RabbitMQ ou NATS), puis les analyser et les traiter. avec les applications des travailleurs. Sinon, lorsque les webhooks arrivent massivement, vous risquez de rencontrer un manque de ressources serveur pour accomplir les tâches. La présence de files d'attente vous permet de répartir le serveur et les travailleurs, ainsi que de résoudre les problèmes de tâches répétitives en cas de panne. Il est également conseillé de modifier la journalisation pour une journalisation plus détaillée et plus standardisée.

Bonne chance!

Plus de lectures sur le sujet :

Source: habr.com

Ajouter un commentaire