Un exemplo dunha aplicación dirixida por eventos baseada en webhooks no almacenamento de obxectos S3 Mail.ru Cloud Solutions

Un exemplo dunha aplicación dirixida por eventos baseada en webhooks no almacenamento de obxectos S3 Mail.ru Cloud Solutions
Máquina de café Rube Goldberg

A arquitectura impulsada por eventos aumenta a rendibilidade dos recursos empregados porque só se utilizan no momento en que son necesarios. Hai moitas opcións sobre como implementar isto e non crear entidades na nube adicionais como aplicacións de traballo. E hoxe non falarei de FaaS, senón de webhooks. Mostrarei un exemplo de titorial de manexo de eventos usando webhooks de almacenamento de obxectos.

Algunhas palabras sobre o almacenamento de obxectos e os webhooks. O almacenamento de obxectos permite almacenar calquera dato na nube en forma de obxectos, accesible a través de S3 ou outra API (dependendo da implementación) a través de HTTP/HTTPS. Os webhooks son xeralmente devolucións de chamada HTTP personalizadas. Normalmente desencadean un evento, como o código que se envía a un repositorio ou un comentario que se publica nun blog. Cando se produce un evento, o sitio de orixe envía unha solicitude HTTP ao URL especificado para o webhook. Como resultado, podes facer que eventos nun sitio desencadeen accións noutro (wiki). No caso de que o sitio de orixe sexa un almacenamento de obxectos, os eventos actúan como cambios no seu contido.

Exemplos de casos sinxelos nos que se pode utilizar esta automatización:

  1. Creando copias de todos os obxectos noutro almacenamento na nube. As copias deben crearse sobre a marcha sempre que se engadan ou cambien ficheiros.
  2. Creación automática dunha serie de miniaturas de ficheiros gráficos, engadindo marcas de auga ás fotografías e outras modificacións de imaxe.
  3. Notificación sobre a chegada de novos documentos (por exemplo, un servizo de contabilidade distribuída carga informes na nube e o seguimento financeiro recibe notificacións sobre novos informes, comproba e analízaos).
  4. Os casos un pouco máis complexos implican, por exemplo, xerar unha solicitude a Kubernetes, que crea un pod cos contedores necesarios, páselle os parámetros da tarefa e, tras o procesamento, colapsa o contenedor.

Como exemplo, faremos unha variante da tarefa 1, cando os cambios no depósito de almacenamento de obxectos de Mail.ru Cloud Solutions (MCS) se sincronicen no almacenamento de obxectos de AWS mediante webhooks. Nun caso real cargado, o traballo asíncrono debería proporcionarse rexistrando webhooks nunha cola, pero para a tarefa de adestramento faremos a implementación sen isto.

Réxime de traballo

O protocolo de interacción descríbese en detalle en Guía de webhooks S3 en MCS. O esquema de traballo contén os seguintes elementos:

  • Servizo de publicación, que está no lado do almacenamento S3 e publica solicitudes HTTP cando se activa o webnhook.
  • Servidor de recepción webhook, que escoita as solicitudes do servizo de publicación HTTP e realiza as accións adecuadas. O servidor pódese escribir en calquera idioma; no noso exemplo, escribiremos o servidor en Go.

Unha característica especial da implementación de webhooks na API S3 é o rexistro do servidor de recepción de webhooks no servizo de publicación. En particular, o servidor de recepción do webhook debe confirmar a subscrición ás mensaxes do servizo de publicación (noutras implementacións de webhook, a confirmación da subscrición normalmente non é necesaria).

En consecuencia, o servidor de recepción de webhook debe admitir dúas operacións principais:

  • responder á solicitude do servizo de publicacións para confirmar o rexistro,
  • procesar eventos entrantes.

Instalación dun servidor de recepción webhook

Para executar o servidor de recepción de webhook, necesitas un servidor Linux. Neste artigo, como exemplo, usamos unha instancia virtual que implementamos en MCS.

Instalemos o software necesario e lancemos o servidor de recepción webhook.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Clona o cartafol co servidor de recepción do webhook:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Imos iniciar o servidor:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Subscríbete ao servizo de publicacións

Podes rexistrar o teu servidor de recepción de webhook a través da API ou da interface web. Para simplificar, rexistrarémonos a través da interface web:

  1. Imos á sección de baldes na sala de control.
  2. Vai ao balde para o que configuraremos webhooks e fai clic na engrenaxe:

Un exemplo dunha aplicación dirixida por eventos baseada en webhooks no almacenamento de obxectos S3 Mail.ru Cloud Solutions

Vaia á pestana Webhooks e fai clic en Engadir:

Un exemplo dunha aplicación dirixida por eventos baseada en webhooks no almacenamento de obxectos S3 Mail.ru Cloud Solutions
Completa os campos:

Un exemplo dunha aplicación dirixida por eventos baseada en webhooks no almacenamento de obxectos S3 Mail.ru Cloud Solutions

ID — o nome do webhook.

Evento - que eventos transmitir. Establecemos a transmisión de todos os eventos que ocorren ao traballar con ficheiros (engadir e eliminar).

URL — webhook que recibe o enderezo do servidor.

O prefixo/sufixo de filtro é un filtro que che permite xerar webhooks só para obxectos cuxos nomes coinciden con determinadas regras. Por exemplo, para que o webhook active só ficheiros coa extensión .png, en Sufixo de filtro cómpre escribir "png".

Actualmente, só se admiten os portos 80 e 443 para acceder ao servidor de recepción de webhook.

Prememos Engadir gancho e veremos o seguinte:

Un exemplo dunha aplicación dirixida por eventos baseada en webhooks no almacenamento de obxectos S3 Mail.ru Cloud Solutions
Hook engadiu.

O servidor de recepción do webhook mostra nos seus rexistros o progreso do proceso de rexistro do gancho:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

O rexistro está rematado. Na seguinte sección, analizaremos o algoritmo de funcionamento do servidor receptor webhook.

Descrición do servidor de recepción de webhook

No noso exemplo, o servidor está escrito en Go. Vexamos os principios básicos do seu funcionamento.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Considere as principais funcións:

  • Ping() - unha ruta que responde mediante URL/ping, a implementación máis sinxela dunha sonda de animación.
  • Webhook() - ruta principal, controlador de URL/webhook:
    • confirma o rexistro no servizo de publicación (vai á función SubscriptionConfirmation),
    • procesa webhooks entrantes (función Gorecords).
  • As funcións HmacSha256 e HmacSha256hex son implementacións dos algoritmos de cifrado HMAC-SHA256 e HMAC-SHA256 coa saída como unha cadea de números hexadecimais para calcular a sinatura.
  • main é a función principal, procesa os parámetros da liña de comandos e rexistra os controladores de URL.

Parámetros da liña de comandos aceptados polo servidor:

  • -port é o porto no que o servidor escoitará.
  • -address: enderezo IP que escoitará o servidor.
  • -script é un programa externo que se chama para cada gancho entrante.

Vexamos con máis detalle algunhas das funcións:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Esta función determina se chegou unha solicitude para confirmar o rexistro ou un webhook. Como se desprende de documentación, se se confirma o rexistro, recibe a seguinte estrutura Json na solicitude de publicación:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

Hai que responder a esta pregunta:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Cando a sinatura se calcula como:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Se chega un webhook, a estrutura da solicitude de publicación é a seguinte:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

En consecuencia, dependendo da solicitude, cómpre comprender como procesar os datos. Escollín a entrada como indicador "Type":"SubscriptionConfirmation", xa que está presente na solicitude de confirmación da subscrición e non está presente no webhook. En función da presenza/ausencia desta entrada na solicitude POST, a execución posterior do programa vaise á función SubscriptionConfirmation, ou na función GotRecords.

Non consideraremos a función SubscriptionConfirmation en detalle; impléntanse segundo os principios establecidos en documentación. Podes ver o código fonte desta función en Repositorios git do proxecto.

A función GotRecords analiza a solicitude entrante e para cada obxecto Record chama a un script externo (cuxo nome se pasou no parámetro -script) cos parámetros:

  • nome do balde
  • chave de obxecto
  • acción:
    • copia - se na solicitude orixinal EventName = ObjectCreated | PutObject | PutObjectCopy
    • eliminar - se na solicitude orixinal EventName = ObjectRemoved | Eliminar obxecto

Así, se chega un gancho cunha solicitude de publicación, como se describe anterior, e o parámetro -script=script.sh entón o script chamarase do seguinte xeito:

script.sh  bucketA some-file-to-bucket copy

Debe entenderse que este servidor de recepción de webhook non é unha solución de produción completa, senón un exemplo simplificado dunha posible implementación.

Exemplo de traballo

Sincronicemos os ficheiros desde o depósito principal en MCS ata o depósito de seguranza en AWS. O depósito principal chámase myfiles-ash, o de copia de seguridade chámase myfiles-backup (a configuración do cubo en AWS está fóra do alcance deste artigo). En consecuencia, cando se coloca un ficheiro no depósito principal, a súa copia debería aparecer no de copia de seguranza e, cando se elimina do principal, debería eliminarse no de copia de seguridade.

Traballaremos con baldes mediante a utilidade awscli, que é compatible tanto co almacenamento na nube MCS como co almacenamento na nube AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Imos configurar o acceso á API de S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Configuramos o acceso á API de AWS S3:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Comprobamos os accesos:

Para AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Para MCS, ao executar o comando cómpre engadir —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Acceso.

Agora imos escribir un script para procesar o gancho entrante, chamémoslle s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Imos iniciar o servidor:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

A ver como funciona. A través Interface web MCS engade o ficheiro test.txt ao depósito myfiles-ash. Os rexistros da consola mostran que se realizou unha solicitude ao servidor webhook:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Comprobamos o contido do depósito de copia de seguranza de myfiles en AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Agora, a través da interface web, eliminaremos o ficheiro do cubo myfiles-ash.

Rexistros do servidor:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Contido do cubo:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Elimínase o ficheiro, solucionouse o problema.

Conclusión e tarefas pendentes

Todo o código usado neste artigo é no meu repositorio. Tamén hai exemplos de scripts e exemplos de conta de sinaturas para rexistrar webhooks.

Este código non é máis que un exemplo de como podes usar os webhooks S3 nas túas actividades. Como dixen ao principio, se pensas usar un servidor deste tipo en produción, debes polo menos reescribir o servidor para o traballo asíncrono: rexistrar os webhooks entrantes nunha cola (RabbitMQ ou NATS) e a partir de aí analizalos e procesalos. con aplicacións dos traballadores. En caso contrario, cando os webhooks chegan masivamente, podes atopar a falta de recursos do servidor para completar tarefas. A presenza de colas permítelle distribuír o servidor e os traballadores, así como resolver problemas coa repetición de tarefas en caso de fallos. Tamén é recomendable cambiar o rexistro por outro máis detallado e estandarizado.

Boa sorte!

Máis lecturas sobre o tema:

Fonte: www.habr.com

Engadir un comentario