事件驱动架构提高了所用资源的成本效率,因为它们仅在需要时使用。 关于如何实现这一点而不是创建额外的云实体作为工作应用程序,有很多选择。 今天我要讲的不是 FaaS,而是 webhooks。 我将展示使用对象存储 Webhooks 处理事件的教程示例。
关于对象存储和 Webhooks 的几句话。 对象存储允许您以对象的形式在云中存储任何数据,可通过 S3 或其他 API(取决于实现)通过 HTTP/HTTPS 访问。 Webhooks 通常是自定义 HTTP 回调。 它们通常由事件触发,例如代码被推送到存储库或评论被发布在博客上。 当事件发生时,源站点将 HTTP 请求发送到为 Webhook 指定的 URL。 因此,您可以使一个站点上的事件触发另一个站点上的操作(
可以使用此类自动化的简单情况示例:
- 创建另一个云存储中所有对象的副本。 每当添加或更改文件时,都必须即时创建副本。
- 自动创建一系列图形文件的缩略图、为照片添加水印以及其他图像修改。
- 有关新文档到达的通知(例如,分布式会计服务将报告上传到云端,财务监控接收有关新报告的通知,对其进行检查和分析)。
- 例如,稍微复杂的情况涉及生成对 Kubernetes 的请求,Kubernetes 创建一个包含必要容器的 pod,将任务参数传递给它,并在处理后折叠容器。
例如,我们将制作任务 1 的变体,即使用 Webhooks 在 AWS 对象存储中同步 Mail.ru Cloud Solutions (MCS) 对象存储桶中的更改。 在实际加载的情况下,应该通过在队列中注册 webhook 来提供异步工作,但对于训练任务,我们将在没有此情况下进行实现。
工作计划
交互协议详细描述于
- 出版服务,位于S3存储端,当webnhook被触发时发布HTTP请求。
- Webhook 接收服务器,它侦听来自 HTTP 发布服务的请求并执行适当的操作。 服务器可以用任何语言编写;在我们的示例中,我们将用 Go 编写服务器。
S3 API 中 Webhook 实现的一个特殊功能是在发布服务上注册 Webhook 接收服务器。 特别是,Webhook 接收服务器必须确认对来自发布服务的消息的订阅(在其他 Webhook 实现中,通常不需要确认订阅)。
因此,webhook 接收服务器必须支持两个主要操作:
- 响应发布服务的请求以确认注册,
- 处理传入事件。
安装 webhook 接收服务器
要运行 webhook 接收服务器,您需要一台 Linux 服务器。 在本文中,我们使用部署在 MCS 上的虚拟实例作为示例。
让我们安装必要的软件并启动 webhook 接收服务器。
ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat
python3-click python3-constantly python3-hyperlink
python3-incremental python3-pam python3-pyasn1-modules
python3-service-identity python3-twisted python3-twisted-bin
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...
使用 webhook 接收服务器克隆文件夹:
ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.
让我们启动服务器:
ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
订阅发布服务
您可以通过 API 或 Web 界面注册您的 Webhook 接收服务器。 为简单起见,我们将通过网络界面注册:
让我们进入桶部分 在控制室里。- 转到我们将为其配置 Webhooks 的存储桶,然后单击齿轮:
转到 Webhooks 选项卡并单击添加:
填写字段:
ID — Webhook 的名称。
事件 - 要传输哪些事件。 我们已经设置了处理文件(添加和删除)时发生的所有事件的传输。
URL — webhook 接收服务器地址。
Filter prefix/suffix 是一个过滤器,允许您仅针对名称符合某些规则的对象生成 webhook。 例如,为了使 Webhook 仅触发扩展名为 .png 的文件,请在 过滤后缀 你需要写“png”。
目前仅支持80和443端口访问webhook接收服务器。
我们点击一下 添加挂钩 我们将看到以下内容:
胡克补充道。
Webhook 接收服务器在其日志中显示挂钩注册过程的进度:
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn:
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token:
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL:
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature:
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d
注册完成。 在下一节中,我们将仔细研究 webhook 接收服务器的操作算法。
webhook接收服务器的描述
在我们的示例中,服务器是用 Go 编写的。 我们来看看它的基本运作原理。
package main
// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}
// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}
// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}
// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}
// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
// log request
log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
fmt.Fprintf(w, "Pongn")
}
//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}
func main() {
// get command line args
bindPort := flag.Int("port", 80, "number between 1-65535")
bindAddr := flag.String("address", "", "ip address in dot format")
flag.StringVar(&actionScript, "script", "", "external script to execute")
flag.Parse()
http.HandleFunc("/ping", Ping)
http.HandleFunc("/webhook", Webhook)
log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}
考虑主要功能:
- Ping() - 通过 URL/ping 响应的路由,是活性探测的最简单实现。
- Webhook() - 主路由、URL/webhook 处理程序:
- 确认在发布服务上的注册(转到 SubscriptionConfirmation 函数),
- 处理传入的 webhook(Gorecords 函数)。
- 函数 HmacSha256 和 HmacSha256hex 是 HMAC-SHA256 和 HMAC-SHA256 加密算法的实现,其输出为用于计算签名的十六进制数字字符串。
- main 是主函数,处理命令行参数并注册 URL 处理程序。
服务器接受的命令行参数:
- -port 是服务器将侦听的端口。
- -address - 服务器将侦听的 IP 地址。
- -script 是为每个传入的钩子调用的外部程序。
让我们仔细看看其中的一些函数:
//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
// Read body
body, err := ioutil.ReadAll(req.Body)
defer req.Body.Close()
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// log request
log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
// check if we got subscription confirmation request
if strings.Contains(string(body),
""Type":"SubscriptionConfirmation"") {
SubscriptionConfirmation(w, req, body)
} else {
GotRecords(w, req, body)
}
}
此函数确定确认注册的请求或 Webhook 是否已到达。 如下来自
POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json
{
"Timestamp":"2019-12-26T19:29:12+03:00",
"Type":"SubscriptionConfirmation",
"Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
"TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
"SignatureVersion":1,
"Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}
需要回答这个问题:
content-type: application/json
{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}
其中签名计算如下:
signature = hmac_sha256(url, hmac_sha256(TopicArn,
hmac_sha256(Timestamp, Token)))
POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
{ "Records":
[
{
"s3": {
"object": {
"eTag":"aed563ecafb4bcc5654c597a421547b2",
"sequencer":1577453615,
"key":"some-file-to-bucket",
"size":100
},
"configurationId":"1",
"bucket": {
"name": "bucketA",
"ownerIdentity": {
"principalId":"mcs2883541269"}
},
"s3SchemaVersion":"1.0"
},
"eventVersion":"1.0",
"requestParameters":{
"sourceIPAddress":"185.6.245.156"
},
"userIdentity": {
"principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
},
"eventName":"s3:ObjectCreated:Put",
"awsRegion":"ru-msk",
"eventSource":"aws:s3",
"responseElements": {
"x-amz-request-id":"VGJR5rtJ"
}
}
]
}
因此,根据请求,您需要了解如何处理数据。 我选择条目作为指标 "Type":"SubscriptionConfirmation"
,因为它存在于订阅确认请求中,但不存在于 webhook 中。 根据 POST 请求中是否存在此条目,程序的进一步执行将转到函数 SubscriptionConfirmation
,或进入函数 GotRecords
.
我们不会详细考虑 SubscriptionConfirmation 函数;它是根据中规定的原则实现的
GotRecords 函数解析传入请求,并为每个 Record 对象调用一个带有以下参数的外部脚本(其名称在 -script 参数中传递):
- 存储桶名称
- 对象键
- 行动:
- 复制 - 如果在原始请求中 EventName = ObjectCreated | 放置对象 | 放置对象复制
- 删除 - 如果在原始请求中 EventName = ObjectRemoved | 删除对象
因此,如果钩子带着 Post 请求到达,如上所述
script.sh bucketA some-file-to-bucket copy
应该理解的是,这个 webhook 接收服务器不是一个完整的生产解决方案,而是一个可能实现的简化示例。
工作示例
让我们将文件从 MCS 中的主存储桶同步到 AWS 中的备份存储桶。 主存储桶称为 myfiles-ash,备份存储桶称为 myfiles-backup(AWS 中的存储桶配置超出了本文的范围)。 因此,当一个文件被放入主存储桶时,它的副本应该出现在备份存储桶中,而当它从主存储桶中删除时,它应该在备份存储桶中删除。
我们将使用 awscli 实用程序来处理存储桶,该实用程序与 MCS 云存储和 AWS 云存储兼容。
ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...
让我们配置对 S3 MCS API 的访问:
ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:
让我们配置对 AWS S3 API 的访问:
ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:
我们来检查一下访问情况:
至 AWS:
ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup
对于 MCS,运行命令时需要添加 —endpoint-url:
ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash
已访问。
现在让我们编写一个脚本来处理传入的钩子,我们将其命名为 s3_backup_mcs_aws.sh
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: #!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete"
exit 1
;;
esac
让我们启动服务器:
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh
让我们看看它是如何工作的。 通过
2020/07/06 09:43:08 [POST] incoming HTTP request from
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to
s3://myfiles-backup/test.txt
让我们检查一下 AWS 中 myfiles-backup 存储桶的内容:
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls
myfiles-backup
2020-07-06 09:43:10 1104 test.txt
现在,通过 Web 界面,我们将从 myfiles-ash 存储桶中删除该文件。
服务器日志:
2020/07/06 09:44:46 [POST] incoming HTTP request from
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt
桶内容:
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$
文件删除,问题解决。
结论和待办事项
本文使用的所有代码都是
此代码只不过是如何在活动中使用 S3 Webhook 的示例。 正如我在一开始所说的,如果您计划在生产中使用这样的服务器,您至少需要重写服务器以进行异步工作:在队列(RabbitMQ 或 NATS)中注册传入的 webhooks,然后从那里解析它们并处理它们与工人应用程序。 否则,当 webhooks 大量到达时,您可能会遇到缺乏服务器资源来完成任务的情况。 队列的存在使您可以分配服务器和工作人员,并解决出现故障时重复任务的问题。 还建议将日志记录更改为更详细、更标准化的日志记录。
祝你好运!
有关该主题的更多阅读:
来源: habr.com