Um exemplo de aplicativo orientado a eventos baseado em webhooks no armazenamento de objetos S3 Mail.ru Cloud Solutions

Um exemplo de aplicativo orientado a eventos baseado em webhooks no armazenamento de objetos S3 Mail.ru Cloud Solutions
Máquina de café Rube Goldberg

A arquitetura orientada a eventos aumenta a eficiência de custos dos recursos utilizados porque eles são utilizados apenas no momento em que são necessários. Existem muitas opções sobre como implementar isso e não criar entidades de nuvem adicionais como aplicativos de trabalho. E hoje não falarei sobre FaaS, mas sobre webhooks. Mostrarei um exemplo de tutorial de manipulação de eventos usando webhooks de armazenamento de objetos.

Algumas palavras sobre armazenamento de objetos e webhooks. O armazenamento de objetos permite armazenar quaisquer dados na nuvem na forma de objetos, acessíveis via S3 ou outra API (dependendo da implementação) via HTTP/HTTPS. Webhooks geralmente são retornos de chamada HTTP personalizados. Eles normalmente são acionados por um evento, como um código sendo enviado para um repositório ou um comentário postado em um blog. Quando ocorre um evento, o site de origem envia uma solicitação HTTP para o URL especificado para o webhook. Como resultado, você pode fazer com que eventos em um site acionem ações em outro (wiki). No caso em que o site de origem é um armazenamento de objetos, os eventos atuam como alterações em seu conteúdo.

Exemplos de casos simples em que tal automação pode ser usada:

  1. Criando cópias de todos os objetos em outro armazenamento em nuvem. As cópias devem ser criadas instantaneamente sempre que arquivos são adicionados ou alterados.
  2. Criação automática de uma série de miniaturas de arquivos gráficos, adição de marcas d’água às fotografias e outras modificações de imagens.
  3. Notificação sobre a chegada de novos documentos (por exemplo, um serviço de contabilidade distribuída carrega relatórios para a nuvem e o monitoramento financeiro recebe notificações sobre novos relatórios, verifica-os e analisa-os).
  4. Casos um pouco mais complexos envolvem, por exemplo, a geração de uma solicitação ao Kubernetes, que cria um pod com os contêineres necessários, passa parâmetros de tarefa para ele e, após o processamento, recolhe o contêiner.

Como exemplo, faremos uma variante da tarefa 1, quando as alterações no bucket de armazenamento de objetos Mail.ru Cloud Solutions (MCS) são sincronizadas no armazenamento de objetos AWS usando webhooks. Em um caso real carregado, o trabalho assíncrono deve ser fornecido registrando webhooks em uma fila, mas para a tarefa de treinamento faremos a implementação sem isso.

Esquema de trabalho

O protocolo de interação é descrito em detalhes em Guia para webhooks S3 no MCS. O esquema de trabalho contém os seguintes elementos:

  • Serviço de publicação, que está no lado do armazenamento S3 e publica solicitações HTTP quando o webnhook é acionado.
  • Servidor de recebimento de webhook, que atende solicitações do serviço de publicação HTTP e executa ações apropriadas. O servidor pode ser escrito em qualquer linguagem; em nosso exemplo, escreveremos o servidor em Go.

Uma característica especial da implementação de webhooks na API S3 é o registro do servidor receptor de webhook no serviço de publicação. Em particular, o servidor receptor do webhook deve confirmar a assinatura das mensagens do serviço de publicação (em outras implementações do webhook, geralmente não é necessária a confirmação da assinatura).

Conseqüentemente, o servidor receptor do webhook deve suportar duas operações principais:

  • responder à solicitação do serviço de publicação para confirmar o registro,
  • processar eventos recebidos.

Instalando um servidor de recebimento de webhook

Para executar o servidor de recebimento de webhook, você precisa de um servidor Linux. Neste artigo, como exemplo, usamos uma instância virtual que implantamos no MCS.

Vamos instalar o software necessário e iniciar o servidor de recebimento do webhook.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Clone a pasta com o servidor de recebimento do webhook:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Vamos iniciar o servidor:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Assine o serviço de publicação

Você pode registrar seu servidor de recebimento de webhook por meio da API ou da interface da web. Para simplificar, registraremos através da interface web:

  1. Vamos para a seção de baldes na sala de controle.
  2. Vá até o bucket para o qual configuraremos webhooks e clique na engrenagem:

Um exemplo de aplicativo orientado a eventos baseado em webhooks no armazenamento de objetos S3 Mail.ru Cloud Solutions

Vá para a guia Webhooks e clique em Adicionar:

Um exemplo de aplicativo orientado a eventos baseado em webhooks no armazenamento de objetos S3 Mail.ru Cloud Solutions
Preencha os campos:

Um exemplo de aplicativo orientado a eventos baseado em webhooks no armazenamento de objetos S3 Mail.ru Cloud Solutions

ID – o nome do webhook.

Evento - quais eventos transmitir. Definimos a transmissão de todos os eventos que ocorrem ao trabalhar com arquivos (adicionar e excluir).

URL — endereço do servidor de recebimento do webhook.

Filtro prefixo/sufixo é um filtro que permite gerar webhooks apenas para objetos cujos nomes correspondam a determinadas regras. Por exemplo, para que o webhook acione apenas arquivos com extensão .png, em Sufixo de filtro você precisa escrever “png”.

Atualmente, apenas as portas 80 e 443 são suportadas para acessar o servidor de recebimento do webhook.

Vamos clicar Adicionar gancho e veremos o seguinte:

Um exemplo de aplicativo orientado a eventos baseado em webhooks no armazenamento de objetos S3 Mail.ru Cloud Solutions
Gancho adicionado.

O servidor receptor do webhook mostra em seus logs o andamento do processo de registro do hook:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

O registro foi concluído. Na próxima seção, daremos uma olhada mais de perto no algoritmo de operação do servidor receptor de webhook.

Descrição do servidor de recebimento do webhook

No nosso exemplo, o servidor está escrito em Go. Vejamos os princípios básicos de seu funcionamento.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Veja as principais funções:

  • Ping() - uma rota que responde via URL/ping, a implementação mais simples de uma sonda de atividade.
  • Webhook() - rota principal, manipulador de URL/webhook:
    • confirma o registro no serviço de publicação (vá para a função SubscriptionConfirmation),
    • processa webhooks recebidos (função Gorecords).
  • As funções HmacSha256 e HmacSha256hex são implementações dos algoritmos de criptografia HMAC-SHA256 e HMAC-SHA256 com saída como uma sequência de números hexadecimais para calcular a assinatura.
  • main é a função principal, processa parâmetros de linha de comando e registra manipuladores de URL.

Parâmetros de linha de comando aceitos pelo servidor:

  • -port é a porta na qual o servidor irá escutar.
  • -address - endereço IP que o servidor irá escutar.
  • -script é um programa externo chamado para cada gancho recebido.

Vamos dar uma olhada em algumas das funções:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Esta função determina se chegou uma solicitação de confirmação de registro ou um webhook. Como segue de documentação, caso o cadastro seja confirmado, a seguinte estrutura Json é recebida na requisição Post:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

Esta pergunta precisa ser respondida:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Onde a assinatura é calculada como:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Se um webhook chegar, a estrutura da solicitação Post será semelhante a esta:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Dessa forma, dependendo da solicitação, você precisa entender como processar os dados. Eu escolhi a entrada como indicador "Type":"SubscriptionConfirmation", pois está presente na solicitação de confirmação de assinatura e não no webhook. Com base na presença/ausência desta entrada na solicitação POST, a execução posterior do programa vai para a função SubscriptionConfirmation, ou em uma função GotRecords.

Não consideraremos a função SubscriptionConfirmation em detalhes; ela é implementada de acordo com os princípios estabelecidos em documentação. Você pode visualizar o código-fonte desta função em repositórios git do projeto.

A função GotRecords analisa uma solicitação recebida e para cada objeto Record chama um script externo (cujo nome foi passado no parâmetro -script) com os parâmetros:

  • nome do intervalo
  • chave do objeto
  • açao:
    • copiar - se estiver na solicitação original EventName = ObjectCreated | ColocarObjeto | PutObjectCopy
    • delete - se estiver na solicitação original EventName = ObjectRemoved | Excluir Objeto

Assim, se um gancho chegar com uma solicitação Post, conforme descrito acima, e o parâmetro -script=script.sh então o script será chamado da seguinte forma:

script.sh  bucketA some-file-to-bucket copy

Deve-se entender que este servidor receptor de webhook não é uma solução completa de produção, mas um exemplo simplificado de uma possível implementação.

Exemplo de trabalho

Vamos sincronizar os arquivos do bucket principal no MCS com o bucket de backup na AWS. O bucket principal é chamado myfiles-ash, o de backup é chamado myfiles-backup (a configuração do bucket na AWS está além do escopo deste artigo). Assim, quando um arquivo é colocado no bucket principal, sua cópia deve aparecer no de backup, e quando for excluído do principal, deve ser excluído no de backup.

Trabalharemos com buckets usando o utilitário awscli, que é compatível com armazenamento em nuvem MCS e armazenamento em nuvem AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Vamos configurar o acesso à API S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Vamos configurar o acesso à API AWS S3:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Vamos verificar os acessos:

Para AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Para MCS, ao executar o comando você precisa adicionar —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Acessado.

Agora vamos escrever um script para processar o gancho de entrada, vamos chamá-lo de s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Vamos iniciar o servidor:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Vamos ver como isso funciona. Através Interface web MCS adicione o arquivo test.txt ao bucket myfiles-ash. Os logs do console mostram que uma solicitação foi feita ao servidor webhook:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Vamos verificar o conteúdo do bucket myfiles-backup na AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Agora, por meio da interface web, excluiremos o arquivo do bucket myfiles-ash.

Registros do servidor:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Conteúdo do balde:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

O arquivo foi excluído, o problema foi resolvido.

Conclusão e tarefas pendentes

Todo o código usado neste artigo é no meu repositório. Há também exemplos de scripts e exemplos de contagem de assinaturas para registro de webhooks.

Este código nada mais é do que um exemplo de como você pode usar webhooks S3 em suas atividades. Como eu disse no início, se você planeja usar esse servidor em produção, você precisa pelo menos reescrever o servidor para trabalho assíncrono: registrar webhooks recebidos em uma fila (RabbitMQ ou NATS) e, a partir daí, analisá-los e processá-los com aplicativos de trabalho. Caso contrário, quando os webhooks chegarem em massa, você poderá encontrar falta de recursos do servidor para concluir as tarefas. A presença de filas permite distribuir o servidor e os trabalhadores, bem como resolver problemas de repetição de tarefas em caso de falhas. Também é aconselhável alterar o registro para um mais detalhado e padronizado.

Boa sorte!

Mais leituras sobre o tema:

Fonte: habr.com

Adicionar um comentário