S3 对象存储中基于 Webhook 的事件驱动应用程序示例 Mail.ru Cloud Solutions

S3 对象存储中基于 Webhook 的事件驱动应用程序示例 Mail.ru Cloud Solutions
鲁布·戈德堡咖啡机

事件驱动架构提高了所用资源的成本效率,因为它们仅在需要时使用。 关于如何实现这一点而不是创建额外的云实体作为工作应用程序,有很多选择。 今天我要讲的不是 FaaS,而是 webhooks。 我将展示使用对象存储 Webhooks 处理事件的教程示例。

关于对象存储和 Webhooks 的几句话。 对象存储允许您以对象的形式在云中存储任何数据,可通过 S3 或其他 API(取决于实现)通过 HTTP/HTTPS 访问。 Webhooks 通常是自定义 HTTP 回调。 它们通常由事件触发,例如代码被推送到存储库或评论被发布在博客上。 当事件发生时,源站点将 HTTP 请求发送到为 Webhook 指定的 URL。 因此,您可以使一个站点上的事件触发另一个站点上的操作(维基)。 在源站点是对象存储的情况下,事件充当其内容的更改。

可以使用此类自动化的简单情况示例:

  1. 创建另一个云存储中所有对象的副本。 每当添加或更改文件时,都必须即时创建副本。
  2. 自动创建一系列图形文件的缩略图、为照片添加水印以及其他图像修改。
  3. 有关新文档到达的通知(例如,分布式会计服务将报告上传到云端,财务监控接收有关新报告的通知,对其进行检查和分析)。
  4. 例如,稍微复杂的情况涉及生成对 Kubernetes 的请求,Kubernetes 创建一个包含必要容器的 pod,将任务参数传递给它,并在处理后折叠容器。

例如,我们将制作任务 1 的变体,即使用 Webhooks 在 AWS 对象存储中同步 Mail.ru Cloud Solutions (MCS) 对象存储桶中的更改。 在实际加载的情况下,应该通过在队列中注册 webhook 来提供异步工作,但对于训练任务,我们将在没有此情况下进行实现。

工作计划

交互协议详细描述于 MCS 上的 S3 webhook 指南。 工作方案包含以下内容:

  • 出版服务,位于S3存储端,当webnhook被触发时发布HTTP请求。
  • Webhook 接收服务器,它侦听来自 HTTP 发布服务的请求并执行适当的操作。 服务器可以用任何语言编写;在我们的示例中,我们将用 Go 编写服务器。

S3 API 中 Webhook 实现的一个特殊功能是在发布服务上注册 Webhook 接收服务器。 特别是,Webhook 接收服务器必须确认对来自发布服务的消息的订阅(在其他 Webhook 实现中,通常不需要确认订阅)。

因此,webhook 接收服务器必须支持两个主要操作:

  • 响应发布服务的请求以确认注册,
  • 处理传入事件。

安装 webhook 接收服务器

要运行 webhook 接收服务器,您需要一台 Linux 服务器。 在本文中,我们使用部署在 MCS 上的虚拟实例作为示例。

让我们安装必要的软件并启动 webhook 接收服务器。

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

使用 webhook 接收服务器克隆文件夹:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

让我们启动服务器:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

订阅发布服务

您可以通过 API 或 Web 界面注册您的 Webhook 接收服务器。 为简单起见,我们将通过网络界面注册:

  1. 让我们进入桶部分 在控制室里。
  2. 转到我们将为其配置 Webhooks 的存储桶,然后单击齿轮:

S3 对象存储中基于 Webhook 的事件驱动应用程序示例 Mail.ru Cloud Solutions

转到 Webhooks 选项卡并单击添加:

S3 对象存储中基于 Webhook 的事件驱动应用程序示例 Mail.ru Cloud Solutions
填写字段:

S3 对象存储中基于 Webhook 的事件驱动应用程序示例 Mail.ru Cloud Solutions

ID — Webhook 的名称。

事件 - 要传输哪些事件。 我们已经设置了处理文件(添加和删除)时发生的所有事件的传输。

URL — webhook 接收服务器地址。

Filter prefix/suffix 是一个过滤器,允许您仅针对名称符合某些规则的对象生成 webhook。 例如,为了使 Webhook 仅触发扩展名为 .png 的文件,请在 过滤后缀 你需要写“png”。

目前仅支持80和443端口访问webhook接收服务器。

我们点击一​​下 添加挂钩 我们将看到以下内容:

S3 对象存储中基于 Webhook 的事件驱动应用程序示例 Mail.ru Cloud Solutions
胡克补充道。

Webhook 接收服务器在其日志中显示挂钩注册过程的进度:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

注册完成。 在下一节中,我们将仔细研究 webhook 接收服务器的操作算法。

webhook接收服务器的描述

在我们的示例中,服务器是用 Go 编写的。 我们来看看它的基本运作原理。

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

考虑主要功能:

  • Ping() - 通过 URL/ping 响应的路由,是活性探测的最简单实现。
  • Webhook() - 主路由、URL/webhook 处理程序:
    • 确认在发布服务上的注册(转到 SubscriptionConfirmation 函数),
    • 处理传入的 webhook(Gorecords 函数)。
  • 函数 HmacSha256 和 HmacSha256hex 是 HMAC-SHA256 和 HMAC-SHA256 加密算法的实现,其输出为用于计算签名的十六进制数字字符串。
  • main 是主函数,处理命令行参数并注册 URL 处理程序。

服务器接受的命令行参数:

  • -port 是服务器将侦听的端口。
  • -address - 服务器将侦听的 IP 地址。
  • -script 是为每个传入的钩子调用的外部程序。

让我们仔细看看其中的一些函数:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

此函数确定确认注册的请求或 Webhook 是否已到达。 如下来自 文件资料,如果注册成功,Post请求中会收到如下Json结构:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

需要回答这个问题:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

其中签名计算如下:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

如果 webhook 到达,Post 请求的结构如下所示:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

因此,根据请求,您需要了解如何处理数据。 我选择条目作为指标 "Type":"SubscriptionConfirmation",因为它存在于订阅确认请求中,但不存在于 webhook 中。 根据 POST 请求中是否存在此条目,程序的进一步执行将转到函数 SubscriptionConfirmation,或进入函数 GotRecords.

我们不会详细考虑 SubscriptionConfirmation 函数;它是根据中规定的原则实现的 文件资料。 您可以在以下位置查看该函数的源代码 项目 git 存储库.

GotRecords 函数解析传入请求,并为每个 Record 对象调用一个带有以下参数的外部脚本(其名称在 -script 参数中传递):

  • 存储桶名称
  • 对象键
  • 行动:
    • 复制 - 如果在原始请求中 EventName = ObjectCreated | 放置对象 | 放置对象复制
    • 删除 - 如果在原始请求中 EventName = ObjectRemoved | 删除对象

因此,如果钩子带着 Post 请求到达,如上所述 更高,以及参数-script=script.sh则脚本将被调用,如下:

script.sh  bucketA some-file-to-bucket copy

应该理解的是,这个 webhook 接收服务器不是一个完整的生产解决方案,而是一个可能实现的简化示例。

工作示例

让我们将文件从 MCS 中的主存储桶同步到 AWS 中的备份存储桶。 主存储桶称为 myfiles-ash,备份存储桶称为 myfiles-backup(AWS 中的存储桶配置超出了本文的范围)。 因此,当一个文件被放入主存储桶时,它的副本应该出现在备份存储桶中,而当它从主存储桶中删除时,它应该在备份存储桶中删除。

我们将使用 awscli 实用程序来处理存储桶,该实用程序与 MCS 云存储和 AWS 云存储兼容。

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

让我们配置对 S3 MCS API 的访问:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

让我们配置对 AWS S3 API 的访问:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

我们来检查一下访问情况:

至 AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

对于 MCS,运行命令时需要添加 —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

已访问。

现在让我们编写一个脚本来处理传入的钩子,我们将其命名为 s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

让我们启动服务器:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

让我们看看它是如何工作的。 通过 MCS 网络界面 将 test.txt 文件添加到 myfiles-ash 存储桶中。 控制台日志显示已向 Webhook 服务器发出请求:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

让我们检查一下 AWS 中 myfiles-backup 存储桶的内容:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

现在,通过 Web 界面,我们将从 myfiles-ash 存储桶中删除该文件。

服务器日志:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

桶内容:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

文件删除,问题解决。

结论和待办事项

本文使用的所有代码都是 在我的存储库中。 还有用于注册 Webhook 的脚本示例和计数签名示例。

此代码只不过是如何在活动中使用 S3 Webhook 的示例。 正如我在一开始所说的,如果您计划在生产中使用这样的服务器,您至少需要重写服务器以进行异步工作:在队列(RabbitMQ 或 NATS)中注册传入的 webhooks,然后从那里解析它们并处理它们与工人应用程序。 否则,当 webhooks 大量到达时,您可能会遇到缺乏服务器资源来完成任务的情况。 队列的存在使您可以分配服务器和工作人员,并解决出现故障时重复任务的问题。 还建议将日志记录更改为更详细、更标准化的日志记录。

祝你好运!

有关该主题的更多阅读:

来源: habr.com

添加评论