Một ví dụ về ứng dụng hướng sự kiện dựa trên webhooks trong bộ lưu trữ đối tượng S3 Giải pháp đám mây Mail.ru

Một ví dụ về ứng dụng hướng sự kiện dựa trên webhooks trong bộ lưu trữ đối tượng S3 Giải pháp đám mây Mail.ru
Rube Goldberg coffee machine

Event-driven architecture повышает ценовую эффективность используемых ресурсов, потому что они задействуются только в тот момент, когда они нужны. Существует масса вариантов, как это реализовать и не создавать дополнительные облачные сущности в качестве worker-приложений. И сегодня я расскажу не про FaaS, а про вебхуки. Я покажу учебный пример обработки событий с помощью вебхуков объектного хранилища.

Пара слов об объектном хранилище и о вебхуках. Объектные хранилища позволяют хранить любые данные в облаке в виде объектов, доступных по S3 или другому API (в зависимости от реализации) через HTTP/HTTPS. Вебхуки (webhooks) в общем случае — это пользовательские обратные вызовы по HTTP. Обычно они запускаются событием, например, отправкой кода в репозиторий или комментарием, публикуемым в блоге. Когда происходит событие, исходный сайт отправляет HTTP-запрос на URL-адрес, указанный для вебхука. В результате можно сделать так, чтобы события на одном сайте вызывали действия на другом (wiki). В случае, когда исходным сайтом выступает объектное хранилище, в роли событий выступают изменения его содержимого.

Примеры простых кейсов, когда можно использовать такую автоматизацию:

  1. Создание копий всех объектов в другом облачном хранилище. Копии должны создаваться «на лету», при любом добавлении или изменении файлов.
  2. Автоматическое создание серий миниатюр графических файлов, добавление водяных знаков к фотографиям, другие модификации изображений.
  3. Оповещение о приходе новых документов (к примеру, распределенная бухгалтерская служба выкладывает в облако отчеты, а финмониторинг получает оповещения о новых отчетах, проверяет и анализирует их).
  4. Чуть более сложные кейсы подразумевают, к примеру, формирование запроса к Kubernetes, который создает под с нужными контейнерами, передает в него параметры задачи и после обработки сворачивает контейнер.

В качестве примера мы сделаем вариант задачи 1, когда изменения в бакете объектного хранилища Mail.ru Cloud Solutions (MCS) с помощью вебхуков синхронизируются в объектном хранилище AWS. В реальном нагруженном кейсе следует предусмотреть асинхронную работу за счет регистрации вебхуков в очереди, но для учебной задачи мы сделаем реализацию без этого.

Đề án làm việc

Протокол взаимодействия детально описан в руководстве по S3-вебхукам на MCS. В схеме работы есть следующие элементы:

  • Сервис публикации, который находится на стороне S3-хранилища и публикует HTTP-запросы при срабатывании webnhook.
  • Сервер приема вебхуков, который слушает обращения сервиса публикации по HTTP и выполняет соответствующие действия. Сервер может быть написан на любом языке, в нашем примере мы напишем сервер на Go.

Особенность реализации вебхуков в S3 API — регистрация сервера приема вебхуков на сервисе публикации. В частности, сервер приема вебхуков должен подтвердить подписку на сообщения сервиса публикации (в других реализациях вебхуков обычно подтверждать подписку не требуется).

Соответственно, сервер приема вебхуков должен поддерживать две основных операции:

  • отвечать на запрос сервиса публикации о подтверждении регистрации,
  • обрабатывать приходящие события.

Установка сервера приема вебхуков

Для запуска сервера приема вебхуков нужен Linux-сервер. В данной статье для примера используем виртуальный инстанс, который развертываем на MCS.

Установим необходимое ПО и запустим сервер приема вебхуков.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Клонируем папку с сервером приема вебхуков:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Запустим сервер:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Подписка на сервис публикации

Зарегистрировать свой сервер приема вебхуков можно через API либо web-интерфейс. Для простоты будем регистрировать через web-интерфейс:

  1. Идем в раздел бакетов в кабинете управления.
  2. Заходим в бакет, для которого будем настраивать wеbhooks, и нажимаем на шестеренку:

Một ví dụ về ứng dụng hướng sự kiện dựa trên webhooks trong bộ lưu trữ đối tượng S3 Giải pháp đám mây Mail.ru

Переходим на вкладку Webhooks и нажимаем Добавить:

Một ví dụ về ứng dụng hướng sự kiện dựa trên webhooks trong bộ lưu trữ đối tượng S3 Giải pháp đám mây Mail.ru
Заполняем поля:

Một ví dụ về ứng dụng hướng sự kiện dựa trên webhooks trong bộ lưu trữ đối tượng S3 Giải pháp đám mây Mail.ru

ID — название вебхука.

Event — какие события передавать. Мы задали передачу всех событий, которые происходят при работе с файлами (добавление и удаление).

URL — адрес сервера приема вебхуков.

Filter prefix/suffix — фильтр, который позволяет генерировать вебхуки только на объекты, названия которых соответствуют определенным правилам. Например, чтобы вебхук срабатывал только файлы с расширением .png, в Filter suffix надо написать «png».

В текущий момент поддерживаются для обращения к серверу приема вебхуков только порты 80 и 443.

Нажмем Добавить hook и увидим следующее:

Một ví dụ về ứng dụng hướng sự kiện dựa trên webhooks trong bộ lưu trữ đối tượng S3 Giải pháp đám mây Mail.ru
Hook добавлен.

Сервер приема вебхуков в логах показывает прохождение процесса регистрации хука:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Регистрация закончена. В следующем разделе более детально рассмотрим алгоритм работы сервера приема вебхуков.

Описание сервера приема вебхуков

В нашем примере сервер написан на Go. Разберем основные принципы его работы.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Hãy xem xét các chức năng chính:

  • Ping() — роут, отвечающий по URL/ping, простейшая реализация liveness probe.
  • Webhook() — основной роут, обработчик URL/вебхука:
    • подтверждает регистрацию на сервисе публикации (переход в функцию SubscriptionConfirmation),
    • обрабатывает приходящие вебхуки (функция Gotrecords).
  • Функции HmacSha256 и HmacSha256hex — реализации алгоритмов шифрования HMAC-SHA256 и HMAC-SHA256 с выводом в виде строки 16-ричных чисел для подстчета сигнатуры.
  • main — главная функция, обрабатывает параметры командной строки и регистрирует обработчики URL.

Параметры командной строки, принимаемые сервером:

  • -port — порт, на котором сервер будет слушать.
  • -address — IP-адрес, который будет слушать сервер.
  • -script — внешняя программа, которая вызывается на каждый пришедший хук.

Рассмотрим подробнее некоторые функции:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Эта функция определяет, что пришло — запрос на подтверждение регистрации либо вебхук. Как следует из tài liệu, в случае подтверждения регистрации приходит следующая структура Json в запросе Post:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

На этот запрос нужно ответить:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Где сигнатура вычисляется как:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Если же приходит вебхук, то структура Post-запроса выглядит так:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Соответственно, в зависимости от запроса нужно понять, как обрабатывать данные. Я выбрал в качестве индикатора запись "Type":"SubscriptionConfirmation", поскольку она присутствует в запросе на подтверждение подписки и не присутствует в вебхуке. Исходя из наличия/отсутствия этой записи в POST-запросе, дальнейшее выполнение программы переходит либо в функцию SubscriptionConfirmation, либо в функцию GotRecords.

Функцию SubscriptionConfirmation детально рассматривать не будем, она реализована по принципам, изложенным в tài liệu. Изучить исходный код этой функции можно в git-репозитории проекта.

Функция GotRecords разбирает приходящий запрос и для каждого объекта Record вызывает внешний скрипт (имя которого было передано в параметре -script) с параметрами:

  • имя бакета
  • ключ объекта
  • hoạt động:
    • copy — если в исходном запросе EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete — если в исходном запросе EventName = ObjectRemoved | DeleteObject

Таким образом, если прилетел хук c Post-запросом, как описано trên đây, и параметр -script=script.sh то скрипт будет вызван следующим образом:

script.sh  bucketA some-file-to-bucket copy

Следует понимать, что данный сервер приема вебхуков — это не законченное production-решение, а упрощенный пример возможной реализации.

Ví dụ công việc

Сделаем синхронизацию файлов основного бакета в MCS в резервный бакет в AWS. Основной бакет называется myfiles-ash, резервный — myfiles-backup (конфигурация бакета в AWS выходит за пределы данной статьи). Соответственно, когда файл кладется в основной бакет, его копия должна появиться в резервном, когда удаляется из основного — удаляться в резервном.

Работать с бакетами будем утилитой awscli, с которой совместимо как облачное хранилище MCS, так и облачное хранилище AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Сконфигурируем доступ к API S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Сконфигурируем доступ к API S3 AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Проверим доступы:

К AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Для MCS, при работе команды надо добавлять —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Доступ получен.

Теперь напишем скрипт обработки приходящего хука, назовем его s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Запускаем сервер:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Проверяем, как это сработает. Через веб-интерфейс MCS добавим файл test.txt в бакет myfiles-ash. В логах в консоли видно, что был сделан запрос на сервер вебхуков:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Проверим содержимое бакета myfiles-backup в AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Теперь через веб-интерфейс удалим файл из бакета myfiles-ash.

Логи сервера:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Содержимое бакета:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Файл удален, задача решена.

Заключение и ToDo

Весь код, используемый в этой статье, лежит trong kho lưu trữ của tôi. Там же лежат примеры скриптов и примеры подсчета сигнатур для регистрации вебхуков.

Данный код — не более чем пример того, как можно использовать S3-вебхуки в своей деятельности. Как я сказал в начале, если планировать использование такого сервера в продуктиве, необходимо как минимум переписывать сервер под асинхронную работу: приходящие вебхуки регистрировать в очереди (RabbitMQ или NATS), а оттуда их разбирать и обрабатывать worker-приложениями. Иначе при массированном приходе вебхуков можно столкнуться с нехваткой ресурсов сервера для выполнения задач. Наличие же очередей позволяет разносить сервер и workers, а также решать вопросы с повтором задач в случае сбоев. Так же желательно менять логирование на более подробное и более стандартизованное.

Chúc may mắn!

Еще почитать по теме:

Nguồn: www.habr.com

Thêm một lời nhận xét