Прыклад event-driven прыкладанні на аснове вэбхукаў у аб'ектным S3-сховішча Mail.ru Cloud Solutions

Прыклад event-driven прыкладанні на аснове вэбхукаў у аб'ектным S3-сховішча Mail.ru Cloud Solutions
Rube Goldberg coffee machine

Event-driven architecture павялічвае коштавую эфектыўнасць выкарыстоўваных рэсурсаў, таму што яны задзейнічаюцца толькі ў той момант, калі яны патрэбныя. Існуе маса варыянтаў, як гэта рэалізаваць і не ствараць дадатковыя хмарныя сутнасці ў якасці worker-прыкладанняў. І сёння я раскажу не пра FaaS, а пра вэбхукі. Я пакажу навучальны прыклад апрацоўкі падзей з дапамогай вэбхукаў аб'ектнага сховішча.

Пары слоў аб аб'ектным сховішчы і аб вебхуках. Аб'ектныя сховішчы дазваляюць захоўваць любыя дадзеныя ў воблаку ў выглядзе аб'ектаў, даступных па S3 ці іншаму API (у залежнасці ад рэалізацыі) праз HTTP/HTTPS. Вэбхукі (webhooks) у агульным выпадку – гэта карыстацкія зваротныя выклікі па HTTP. Звычайна яны запускаюцца падзеяй, напрыклад, адпраўкай кода ў рэпазітар або каментаром, публікуемым у блогу. Калі адбываецца падзея, зыходны сайт адпраўляе HTTP-запыт на URL-адрас, паказаны для вэбхука. У выніку можна зрабіць так, каб падзеі на адным сайце выклікалі дзеянні на іншым (вікі). У выпадку, калі зыходным сайтам выступае аб'ектнае сховішча, у ролі падзеяў выступаюць змены яго змесціва.

Прыклады простых кейсаў, калі можна выкарыстоўваць такую ​​аўтаматызацыю:

  1. Стварэнне копій усіх аб'ектаў у іншым хмарным сховішчы. Копіі павінны стварацца "на лёце", пры любым даданні ці змене файлаў.
  2. Аўтаматычнае стварэнне серый мініяцюр графічных файлаў, даданне вадзяных знакаў да фатаграфій, іншыя мадыфікацыі малюнкаў.
  3. Абвестка аб прыходзе новых дакументаў (напрыклад, размеркаваная бухгалтарская служба выкладвае ў воблака справаздачы, а финмониторинг атрымлівае абвесткі аб новых справаздачах, правярае і аналізуе іх).
  4. Крыху больш складаныя кейсы мяркуюць, напрыклад, фармаванне запыту да Kubernetes, які стварае пад з патрэбнымі кантэйнерамі, перадае ў яго параметры задачы і пасля апрацоўкі згортвае кантэйнер.

У якасці прыкладу мы зробім варыянт задачы 1, калі змены ў бакеце аб'ектнага сховішча Mail.ru Cloud Solutions (MCS) з дапамогай вебхуков сінхранізуюцца ў аб'ектным сховішчы AWS. У рэальным нагружаным кейсе варта прадугледзець асінхронную працу за рахунак рэгістрацыі вебхуков у чарзе, але для вучэбнай задачы мы зробім рэалізацыю без гэтага.

схема працы

Пратакол узаемадзеяння дэталёва апісаны ў кіраўніцтве па S3-вэбхукам на MCS. У схеме працы ёсць наступныя элементы:

  • Сэрвіс публікацыі, які знаходзіцца на баку S3-сховішча і публікуе HTTP-запыты пры спрацоўванні webnhook.
  • Сервер прыёму вэбхукаў, які слухае звароты сэрвісу публікацыі па HTTP і выконвае адпаведныя дзеянні. Сервер можа быць напісаны на любой мове, у нашым прыкладзе мы напішам сервер на Go.

Асаблівасць рэалізацыі вебхуков у S3 API - рэгістрацыя сервера прыёму вебхуков на сэрвісе публікацыі. У прыватнасці, сервер прыёму вэбхукаў павінен пацвердзіць падпіску на паведамленні сэрвісу публікацыі (у іншых рэалізацыях вэбхукаў звычайна пацвярджаць падпіску не патрабуецца).

Адпаведна, сервер прыёму вебхуков павінен падтрымліваць дзве асноўныя аперацыі:

  • адказваць на запыт сэрвісу публікацыі аб пацвярджэнні рэгістрацыі,
  • апрацоўваць прыходзяць падзеі.

Усталяванне сервера прыёму вэбхукаў

Для запуску сервера прыёму вебхуков патрэбен Linux-сервер. У дадзеным артыкуле для прыкладу выкарыстоўваем віртуальны інстанс, які разгортваем на MCS.

Усталюем неабходнае ПА і запусцім сервер прыёму вебхуков.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Кланаваны тэчку з серверам прыёму вебхуков:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Запусцім сервер:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Падпіска на сэрвіс публікацыі

Зарэгістраваць свой сервер прыёму вэбхукаў можна праз API або web-інтэрфейс. Для прастаты будзем рэгістраваць праз web-інтэрфейс:

  1. Ідзем у раздзел бакетаў у кабінеце кіравання.
  2. Заходзім у бакет, для якога будзем наладжваць wеbhooks, і націскаем на шасцярэньку:

Прыклад event-driven прыкладанні на аснове вэбхукаў у аб'ектным S3-сховішча Mail.ru Cloud Solutions

Пераходзім на ўкладку Webhooks і націскаем Дадаць:

Прыклад event-driven прыкладанні на аснове вэбхукаў у аб'ектным S3-сховішча Mail.ru Cloud Solutions
Запаўняем палі:

Прыклад event-driven прыкладанні на аснове вэбхукаў у аб'ектным S3-сховішча Mail.ru Cloud Solutions

ID - назва вебхука.

Event - якія падзеі перадаваць. Мы задалі перадачу ўсіх падзей, якія адбываюцца пры працы з файламі (даданне і выдаленне).

URL - адрас сервера прыёму вебхуков.

Filter prefix/suffix - фільтр, які дазваляе генераваць вебхуки толькі на аб'екты, назвы якіх адпавядаюць вызначаным правілам. Напрыклад, каб вебхук спрацоўваў толькі файлы з пашырэннем .png, у Filter suffix трэба напісаць "png".

У бягучы момант падтрымліваюцца для звароту да сервера прыёму вебхуков толькі порты 80 і 443.

Націснем Дадаць hook і ўбачым наступнае:

Прыклад event-driven прыкладанні на аснове вэбхукаў у аб'ектным S3-сховішча Mail.ru Cloud Solutions
Hook дададзены.

Сервер прыёму вэбхукоў у логах паказвае праходжанне працэсу рэгістрацыі хука:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Рэгістрацыя скончана. У наступнай частцы больш дэталёва разгледзім алгарытм працы сервера прыёму вебхуков.

Апісанне сервера прыёму вэбхукаў

У нашым прыкладзе сервер напісаны на Go. Разбяром асноўныя прынцыпы яго працы.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Разгледзім асноўныя функцыі:

  • Ping() - роўт, які адказвае па URL/ping, найпростая рэалізацыя liveness probe.
  • Webhook() — асноўны роўт, апрацоўшчык URL/вэбхука:
    • пацвярджае рэгістрацыю на сэрвісе публікацыі (пераход у функцыю SubscriptionConfirmation),
    • апрацоўвае прыходныя вебхуки (функцыя Gotrecords).
  • Функцыі HmacSha256 і HmacSha256hex – рэалізацыі алгарытмаў шыфравання HMAC-SHA256 і HMAC-SHA256 з высновай у выглядзе радка 16-рычных лікаў для падліку сігнатуры.
  • main - галоўная функцыя, апрацоўвае параметры каманднага радка і рэгіструе апрацоўшчыкі URL.

Параметры каманднага радка, якія прымаюцца серверам:

  • -port - порт, на якім сервер будзе слухаць.
  • -address - IP-адрас, які будзе слухаць сервер.
  • -script - знешняя праграма, якая выклікаецца на кожны прыйшоў хук.

Разгледзім падрабязней некаторыя функцыі:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Гэтая функцыя вызначае, што нетутэйша – запыт на пацверджанне рэгістрацыі або вебхук. Як вынікае з дакументацыі, у выпадку пацверджання рэгістрацыі прыходзіць наступная структура Json у запыце Post:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

На гэты запыт трэба адказаць:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Дзе сігнатура вылічаецца як:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Калі ж прыходзіць вебхук, то структура Post-запыту выглядае так:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Адпаведна, у залежнасці ад запыту трэба зразумець, як апрацоўваць дадзеныя. Я абраў у якасці індыкатара запіс "Type":"SubscriptionConfirmation", паколькі яна прысутнічае ў запыце на пацверджанне падпіскі і не прысутнічае ў вэбхуку. Зыходзячы з наяўнасці/адсутнасці гэтага запісу ў POST-запыце, далейшае выкананне праграмы пераходзіць альбо ў функцыю SubscriptionConfirmation, альбо ў функцыю GotRecords.

Функцыю SubscriptionConfirmation дэталёва разглядаць не будзем, яна рэалізавана па прынцыпах, выкладзеным у дакументацыі. Вывучыць зыходны код гэтай функцыі можна ў git-рэпазітары праекта.

Функцыя GotRecords разбірае прыходны запыт і для кожнага аб'екта Record выклікае вонкавы скрыпт (імя якога было перададзена ў параметры -script) з параметрамі:

  • імя бакета
  • ключ аб'екта
  • дзеянне:
    • copy - калі ў зыходным запыце EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - калі ў зыходным запыце EventName = ObjectRemoved | DeleteObject

Такім чынам, калі прыляцеў хук c Post-запытам, як апісана вышэй, і параметр -script=script.sh то скрыпт будзе выкліканы наступным чынам:

script.sh  bucketA some-file-to-bucket copy

Варта разумець, што дадзены сервер прыёму вебхуков – гэта не скончанае production-рашэнне, а спрошчаны прыклад магчымай рэалізацыі.

Прыклад працы

Зробім сінхранізацыю файлаў асноўнага бакета ў MCS у рэзервовы бакет у AWS. Асноўны бакет завецца myfiles-ash, рэзервовы - myfiles-backup (канфігурацыя бакета ў AWS выходзіць за межы дадзенага артыкула). Адпаведна, калі файл кладзецца ў асноўны бакет, яго копія павінна з'явіцца ў рэзервовым, калі выдаляецца з асноўнага - выдаляцца ў рэзервовым.

Працаваць з бакетамі будзем утылітай awscli, з якой сумяшчальна як хмарнае сховішча MCS, так і хмарнае сховішча AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Канфігуруем доступ да API S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Канфігуруем доступ да API S3 AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Праверым доступы:

Да AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Для MCS, пры працы каманды трэба дадаваць -endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Доступ атрыманы.

Цяпер напішам скрыпт апрацоўкі які прыходзіць хука, назавем яго s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Запускаем сервер:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Правяраем, як гэта спрацуе. Праз вэб-інтэрфейс MCS дадамо файл test.txt у бакет myfiles-ash. У логах у кансолі відаць, што быў зроблены запыт на сервер вебхуков:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Праверым змесціва бакета myfiles-backup у AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Цяпер праз вэб-інтэрфейс выдалім файл з бакета myfiles-ash.

Логі сервера:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Змесціва бакета:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Файл выдалены, задача вырашана.

Заключэнне і ToDo

Увесь код, які выкарыстоўваецца ў гэтым артыкуле, ляжыць у маім рэпазітары. Там жа ляжаць прыклады скрыптоў і прыклады падліку сігнатур для рэгістрацыі вэбхукаў.

Дадзены код – не больш за прыклад таго, як можна выкарыстоўваць S3-вебхукі ў сваёй дзейнасці. Як я сказаў у пачатку, калі планаваць выкарыстанне такога сервера ў прадуктыве, неабходна як мінімум перапісваць сервер пад асінхронную працу: якія прыходзяць вэбхукі рэгістраваць у чарзе (RabbitMQ ці NATS), а адтуль іх разбіраць і апрацоўваць worker-прыкладаннямі. Інакш пры масіраваным прыходзе вебхуков можна сутыкнуцца з недахопам рэсурсаў сервера для выканання задач. Наяўнасць жа чэргаў дазваляе разносіць сервер і workers, а таксама вырашаць пытанні з паўторам задач у выпадку збояў. Гэтак жа пажадана мяняць лагіраванне на больш падрабязнае і больш стандартызаванае.

Поспехаў!

Яшчэ пачытаць па тэме:

Крыніца: habr.com

Дадаць каментар