Приклад event-driven програми на основі веб-хуків в об'єктному S3-сховищі Mail.ru Cloud Solutions

Приклад event-driven програми на основі веб-хуків в об'єктному S3-сховищі Mail.ru Cloud Solutions
Rube Goldberg coffee machine

Event-driven architecture підвищує цінову ефективність використовуваних ресурсів, оскільки вони задіяні лише тоді, коли вони потрібні. Існує безліч варіантів, як це реалізувати і не створювати додаткові хмарні сутності як worker-додатків. І сьогодні я розповім не про FaaS, а про веб-хуки. Я покажу навчальний приклад обробки подій за допомогою веб-хуків об'єктного сховища.

Пара слів про об'єктне сховище та про вебхуки. Об'єктні сховища дозволяють зберігати будь-які дані в хмарі у вигляді об'єктів, доступних S3 або іншому API (залежно від реалізації) через HTTP/HTTPS. Вебхуки (webhooks) в загальному випадку - це зворотні дзвінки користувача по HTTP. Зазвичай вони запускаються подією, наприклад, відправкою коду до репозиторію або коментарем, що публікується в блозі. Коли відбувається подія, вихідний сайт відправляє HTTP-запит на URL-адресу, вказану для вебхука. В результаті можна зробити так, щоб події на одному сайті викликали дії на іншому (вики). У випадку, коли вихідним сайтом є об'єктне сховище, в ролі подій виступають зміни його вмісту.

Приклади простих кейсів, коли можна використовувати таку автоматизацію:

  1. Створення копій усіх об'єктів в іншому хмарному сховищі. Копії повинні створюватися на льоту, при будь-якому додаванні або зміні файлів.
  2. Автоматичне створення мініатюр графічних файлів, додавання водяних знаків до фотографій, інші модифікації зображень.
  3. Оповіщення про надходження нових документів (наприклад, розподілена бухгалтерська служба викладає у хмару звіти, а фінмоніторинг отримує оповіщення про нові звіти, перевіряє та аналізує їх).
  4. Трохи більш складні кейси мають на увазі, наприклад, формування запиту до Kubernetes, який створює під потрібними контейнерами, передає в нього параметри завдання і після обробки згортає контейнер.

Як приклад, ми зробимо варіант задачі 1, коли зміни в бакеті об'єктного сховища Mail.ru Cloud Solutions (MCS) за допомогою вебхуків синхронізуються в об'єктному сховищі AWS. У реальному навантаженому кейсі слід передбачити асинхронну роботу за рахунок реєстрації веб-хуків у черзі, але для навчального завдання ми зробимо реалізацію без цього.

Схема роботи

Протокол взаємодії детально описаний у посібнику з S3-вебхукам на MCS. У схемі роботи є такі елементи:

  • Сервіс публікації, що знаходиться на стороні S3-сховища і публікує HTTP-запити при спрацюванні webnhook.
  • Сервер прийому веб-хуків, який слухає звернення сервісу публікації з HTTP та виконує відповідні дії. Сервер може бути написаний будь-якою мовою, у прикладі ми напишемо сервер на Go.

Особливість реалізації веб-хуків в S3 API - реєстрація сервера прийому веб-хуків на сервісі публікації. Зокрема, сервер прийому веб-хуків повинен підтвердити підписку на повідомлення сервісу публікації (в інших реалізаціях веб-хуків зазвичай підтверджувати передплату не потрібно).

Відповідно, сервер прийому веб-хуків повинен підтримувати дві основні операції:

  • відповідати на запит сервісу публікації про підтвердження реєстрації,
  • обробляти події, що приходять.

Встановлення сервера прийому веб-хуків

Для запуску сервера прийому веб-хуків потрібен Linux-сервер. У цій статті приклад використовуємо віртуальний інстанс, який розгортаємо на MCS.

Встановимо необхідне ПЗ та запустимо сервер прийому веб-хуків.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Клонуємо папку із сервером прийому веб-хуків:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Запустимо сервер:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Передплата сервісу публікації

Зареєструвати свій сервер прийому веб-хуків можна через API або web-інтерфейс. Для простоти реєструватимемо через web-інтерфейс:

  1. Ідемо до розділу бакетів у кабінеті управління.
  2. Заходимо в бакет, для якого налаштовуватимемо wеbhooks, і натискаємо на шестерню:

Приклад event-driven програми на основі веб-хуків в об'єктному S3-сховищі Mail.ru Cloud Solutions

Переходимо на вкладку Webhooks та натискаємо Додати:

Приклад event-driven програми на основі веб-хуків в об'єктному S3-сховищі Mail.ru Cloud Solutions
Заповнюємо поля:

Приклад event-driven програми на основі веб-хуків в об'єктному S3-сховищі Mail.ru Cloud Solutions

ID — назва веб-хука.

Event – ​​які події передавати. Ми задали передачу всіх подій, які відбуваються під час роботи з файлами (додавання та видалення).

URL - адреса сервера прийому веб-хуків.

Filter prefix/suffix — фільтр, який дозволяє генерувати веб-хуки лише на об'єкти, назви яких відповідають певним правилам. Наприклад, щоб вебхук спрацьовував тільки файли з розширенням .png, в Filter suffix треба написати "png".

В даний момент підтримуються для звернення до сервера прийому веб-хуків тільки порти 80 та 443.

Натиснемо Додати hook і побачимо таке:

Приклад event-driven програми на основі веб-хуків в об'єктному S3-сховищі Mail.ru Cloud Solutions
Hook додано.

Сервер прийому веб-хуків у логах показує проходження процесу реєстрації хука:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Реєстрація закінчено. У наступному розділі детальніше розглянемо алгоритм роботи сервера прийому веб-хуків.

Опис сервера прийому веб-хуків

У прикладі сервер написаний на Go. Розберемо основні засади його роботи.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Розглянемо основні функції:

  • Ping() — роут, що відповідає URL/ping, найпростіша реалізація liveness probe.
  • Webhook() — основний роут, обробник URL/вебхуку:
    • підтверджує реєстрацію на сервісі публікації (перехід у функцію SubscriptionConfirmation),
    • обробляє вебхуки, що приходять (функція Gotrecords).
  • Функції HmacSha256 та HmacSha256hex — реалізації алгоритмів шифрування HMAC-SHA256 та HMAC-SHA256 з виведенням у вигляді рядка 16-річних чисел для підрахунку сигнатури.
  • main — головна функція, що обробляє параметри командного рядка та реєструє обробники URL.

Параметри командного рядка, які приймає сервер:

  • -port — порт, на якому слухатиме сервер.
  • -address — IP-адреса, яка слухатиме сервер.
  • -script - Зовнішня програма, яка викликається на кожен хук, що прийшов.

Розглянемо докладніше деякі функції:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Ця функція визначає, що надійшло - запит на підтвердження реєстрації або вебхук. Як випливає з документації, у разі підтвердження реєстрації надходить наступна структура Json у запиті Post:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

На цей запит потрібно відповісти:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Де сигнатура обчислюється як:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Якщо ж приходить вебхук, то структура Post-запиту має такий вигляд:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Відповідно, залежно від запиту, потрібно зрозуміти, як обробляти дані. Я вибрав як індикатор запис "Type":"SubscriptionConfirmation", оскільки вона присутня у запиті на підтвердження підписки і не є у вебхуку. Виходячи з наявності/відсутності цього запису в POST-запиті, подальше виконання програми переходить або в функцію SubscriptionConfirmation, або функцію GotRecords.

Функцію SubscriptionConfirmation детально розглядати не будемо, вона реалізована за принципами, викладеними в документації. Вивчити вихідний код цієї функції можна в git-репозиторії проекту.

Функція GotRecords розбирає запит і для кожного об'єкта Record викликає зовнішній скрипт (ім'я якого було передано в параметрі -script) з параметрами:

  • ім'я бакета
  • ключ об'єкта
  • дія:
    • copy — якщо вихідний запит EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - якщо у вихідному запиті EventName = ObjectRemoved | DeleteObject

Таким чином, якщо прилетів хук з Post-запитом, як описано вище, і параметр -script=script.sh скрипт буде викликаний наступним чином:

script.sh  bucketA some-file-to-bucket copy

Слід розуміти, що даний сервер прийому веб-хуків - це не закінчене production-рішення, а спрощений приклад можливої ​​реалізації.

приклад роботи

Зробимо синхронізацію файлів основного бакета в MCS резервного бакету в AWS. Основний бакет називається myfiles-ash, резервний - myfiles-backup (конфігурація бакета в AWS виходить за межі цієї статті). Відповідно, коли файл кладеться в основний бакет, його копія повинна з'явитися в резервному, коли видаляється з основного - видалятися в резервному.

Працюватимемо з бакетами будемо утилітою awscli, з якою сумісне як хмарне сховище MCS, так і хмарне сховище AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Конфігуруємо доступ до API S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Конфігуруємо доступ до API S3 AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Перевіримо доступи:

До AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Для MCS, під час роботи команди треба додавати -endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Доступ отримано.

Тепер напишемо скрипт обробки хука, що приходить, назвемо його s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Запускаємо сервер:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Перевіряємо, як це спрацює. Через веб-інтерфейс MCS додамо файл test.txt в бакет myfiles-ash. У логах консолі видно, що був зроблений запит на сервер вебхуків:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Перевіримо вміст бакета myfiles-backup в AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Тепер через веб-інтерфейс видалимо файл із бакета myfiles-ash.

Логи сервера:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Вміст бакета:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Файл видалено, завдання вирішено.

Висновок та ToDo

Весь код, який використовується у цій статті, лежить у моєму репозиторії. Там же лежать приклади скриптів та приклади підрахунку сигнатур для реєстрації веб-хуків.

Цей код — не більше, ніж приклад того, як можна використовувати S3-вебхуки у своїй діяльності. Як я сказав на початку, якщо планувати використання такого сервера в продуктивності, необхідно як мінімум переписувати сервер під асинхронну роботу: вебхуки, що приходять, реєструвати в черзі (RabbitMQ або NATS), а звідти їх розбирати і обробляти worker-додатками. Інакше при масованому приході вебхуків можна зіткнутися з нестачею ресурсів сервера для виконання завдань. Наявність черг дозволяє розносити сервер і workers, а також вирішувати питання з повтором завдань у разі збоїв. Також бажано змінювати логування більш докладне і стандартизоване.

Успіхів!

Ще почитати на тему:

Джерело: habr.com

Додати коментар або відгук