S3 物件儲存中基於 Webhook 的事件驅動應用程式範例 Mail.ru Cloud Solutions

S3 物件儲存中基於 Webhook 的事件驅動應用程式範例 Mail.ru Cloud Solutions
魯布‧戈德堡咖啡機

事件驅動架構提高了所用資源的成本效率,因為它們只在需要時使用。 關於如何實現這一點而不是創建額外的雲端實體作為工作應用程序,有很多選擇。 今天要講的不是 FaaS,而是 webhooks。 我將展示使用物件儲存 Webhooks 處理事件的教學範例。

關於物件儲存和 Webhooks 的幾句話。 物件儲存允許您以物件的形式在雲端中儲存任何數據,可透過 S3 或其他 API(取決於實作)透過 HTTP/HTTPS 存取。 Webhooks 通常是自訂 HTTP 回呼。 它們通常由事件觸發,例如程式碼被推送到儲存庫或評論被發佈在部落格上。 當事件發生時,來源網站將 HTTP 請求傳送到為 Webhook 指定的 URL。 因此,您可以使一個站點上的事件觸發另一個站點上的操作(維基)。 在來源網站是物件儲存的情況下,事件充當其內容的變更。

可以使用此類自動化的簡單情況範例:

  1. 建立另一個雲端儲存中所有物件的副本。 每當新增或變更檔案時,都必須即時建立副本。
  2. 自動建立一系列圖形檔案的縮圖、為照片添加浮水印以及其他影像修改。
  3. 有關新文件到達的通知(例如,分散式會計服務將報告上傳到雲端,財務監控接收有關新報告的通知,對其進行檢查和分析)。
  4. 例如,稍微複雜的情況涉及產生對 Kubernetes 的請求,Kubernetes 會建立一個包含必要容器的 pod,將任務參數傳遞給它,並在處理後折疊容器。

例如,我們將製作任務 1 的變體,即使用 Webhooks 在 AWS 物件儲存中同步 Mail.ru Cloud Solutions (MCS) 物件儲存桶中的變更。 在實際載入的情況下,應該透過在佇列中註冊 webhook 來提供非同步工作,但對於訓練任務,我們將在沒有此情況下進行實作。

工作計劃

互動協議詳細描述於 MCS 上的 S3 webhook 指南。 工作方案包含以下內容:

  • 出版服務,位於S3儲存端,當webnhook被觸發時發布HTTP請求。
  • Webhook 接收伺服器,它會偵聽來自 HTTP 發布服務的請求並執行適當的操作。 伺服器可以用任何語言編寫;在我們的範例中,我們將用 Go 編寫伺服器。

S3 API 中 Webhook 實作的一個特殊功能是在發布服務上註冊 Webhook 接收伺服器。 特別是,Webhook 接收伺服器必須確認對來自發布服務的訊息的訂閱(在其他 Webhook 實作中,通常不需要確認訂閱)。

因此,webhook 接收伺服器必須支援兩個主要操作:

  • 回應發布服務的請求以確認註冊,
  • 處理傳入事件。

安裝 webhook 接收伺服器

要運行 webhook 接收伺服器,您需要一台 Linux 伺服器。 在本文中,我們使用部署在 MCS 上的虛擬實例作為範例。

讓我們安裝必要的軟體並啟動 webhook 接收伺服器。

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

使用 webhook 接收伺服器克隆資料夾:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

讓我們啟動伺服器:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

訂閱發布服務

您可以透過 API 或 Web 介面註冊您的 Webhook 接收伺服器。 為簡單起見,我們將透過網路介面註冊:

  1. 讓我們進入桶部分 在控制室裡。
  2. 轉到我們將為其配置 Webhooks 的儲存桶,然後按一下齒輪:

S3 物件儲存中基於 Webhook 的事件驅動應用程式範例 Mail.ru Cloud Solutions

前往 Webhooks 標籤並點擊新增:

S3 物件儲存中基於 Webhook 的事件驅動應用程式範例 Mail.ru Cloud Solutions
填寫欄位:

S3 物件儲存中基於 Webhook 的事件驅動應用程式範例 Mail.ru Cloud Solutions

ID — Webhook 的名稱。

事件 - 要傳輸哪些事件。 我們已經設定了處理文件(新增和刪除)時發生的所有事件的傳輸。

URL — webhook 接收伺服器位址。

Filter prefix/suffix 是一個過濾器,可讓您僅針對名稱符合某些規則的物件產生 webhook。 例如,為了使 Webhook 僅觸發副檔名為 .png 的文件,請在 過濾後綴 你需要寫“png”。

目前僅支援80和443連接埠存取webhook接收伺服器。

我們點擊一下 添加掛鉤 我們將看到以下內容:

S3 物件儲存中基於 Webhook 的事件驅動應用程式範例 Mail.ru Cloud Solutions
胡克補充道。

Webhook 接收伺服器在其日誌中顯示掛鉤註冊過程的進度:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

註冊完成。 在下一節中,我們將仔細研究 webhook 接收伺服器的操作演算法。

webhook接收伺服器的描述

在我們的範例中,伺服器是用 Go 編寫的。 讓我們來看看它的基本運作原理。

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

考慮主要功能:

  • Ping() - 透過 URL/ping 回應的路由,是活性探測的最簡單實作。
  • Webhook() - 主路由、URL/webhook 處理程序:
    • 確認在發布服務上的註冊(前往 SubscriptionConfirmation 函數),
    • 處理傳入的 webhook(Gorecords 函數)。
  • 函數 HmacSha256 和 HmacSha256hex 是 HMAC-SHA256 和 HMAC-SHA256 加密演算法的實現,其輸出為用於計算簽章的十六進位數位字串。
  • main 是主函數,處理命令列參數並註冊 URL 處理程序。

伺服器接受的命令列參數:

  • -port 是伺服器將偵聽的連接埠。
  • -address - 伺服器將偵聽的 IP 位址。
  • -script 是為每個傳入的鉤子呼叫的外部程式。

讓我們仔細看看其中的一些函數:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

此函數決定確認註冊的請求或 Webhook 是否已到達。 如下來自 文件,如果註冊成功,Post請求中會收到如下Json結構:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

需要回答這個問題:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

其中簽章計算如下:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

如果 webhook 到達,Post 請求的結構如下所示:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

因此,根據請求,您需要了解如何處理資料。 我選擇條目作為指標 "Type":"SubscriptionConfirmation",因為它存在於訂閱確認請求中,但不存在於 webhook 中。 根據 POST 請求中是否存在此條目,程式的進一步執行將轉到函數 SubscriptionConfirmation,或進入一個函數 GotRecords.

我們不會詳細考慮 SubscriptionConfirmation 函數;它是根據中規定的原則實現的 文件。 您可以在以下位置查看該函數的原始程式碼 專案 git 儲存庫.

GotRecords 函數解析傳入請求,並為每個 Record 物件呼叫一個帶有以下參數的外部腳本(其名稱在 -script 參數中傳遞):

  • 儲存桶名稱
  • 物件鍵
  • 行動:
    • 複製 - 如果在原始請求中 EventName = ObjectCreated | 放置對象 | 放置物件複製
    • 刪除 - 如果在原始請求中 EventName = ObjectRemoved | 刪除對象

因此,如果鉤子帶著 Post 請求到達,如上所述 以上,以及參數-script=script.sh則腳本將被調用,如下:

script.sh  bucketA some-file-to-bucket copy

應該要理解的是,這個 webhook 接收伺服器不是一個完整的生產解決方案,而是一個可能實現的簡化範例。

工作範例

讓我們將檔案從 MCS 中的主儲存桶同步到 AWS 中的備份儲存桶。 主儲存桶稱為 myfiles-ash,備份儲存桶稱為 myfiles-backup(AWS 中的儲存桶配置超出了本文的範圍)。 因此,當一個檔案被放入主儲存桶時,它的副本應該出現在備份儲存桶中,而當它從主儲存桶中刪除時,它應該在備份儲存桶中刪除。

我們將使用 awscli 實用程式來處理儲存桶,該公用程式與 MCS 雲端儲存和 AWS 雲端儲存相容。

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

讓我們配置對 S3 MCS API 的存取:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

讓我們配置對 AWS S3 API 的存取:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

我們來檢查一下訪問情況:

至 AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

對於 MCS,運行命令時需要新增 —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

已訪問。

現在讓我們編寫一個腳本來處理傳入的鉤子,我們將其命名為 s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

讓我們啟動伺服器:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

讓我們看看它是如何工作的。 透過 MCS 網路介面 將 test.txt 檔案新增至 myfiles-ash 儲存桶中。 控制台日誌顯示已向 Webhook 伺服器發出請求:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

讓我們檢查一下 AWS 中 myfiles-backup 儲存桶的內容:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

現在,透過 Web 介面,我們將從 myfiles-ash 儲存桶中刪除該檔案。

伺服器日誌:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

桶內容:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

文件刪除,問題解決。

結論和待辦事項

本文所使用的所有程式碼都是 在我的儲存庫中。 還有用於註冊 Webhook 的腳本範例和計數簽名範例。

此程式碼只不過是如何在活動中使用 S3 Webhook 的範例。 正如我在一開始所說的,如果您打算在生產中使用這樣的伺服器,您至少需要重寫伺服器以進行非同步工作:在佇列(RabbitMQ 或NATS)中註冊傳入的webhooks,然後從那裡解析它們並處理它們與工人應用程式。 否則,當 webhooks 大量到達時,您可能會遇到缺乏伺服器資源來完成任務的情況。 佇列的存在使您可以分配伺服器和工作人員,並解決出現故障時重複任務的問題。 也建議將日誌記錄變更為更詳細、更標準化的日誌記錄。

祝你好運!

有關該主題的更多閱讀:

來源: www.habr.com

添加評論