An example of an event-driven application based on webhooks in the S3 object storage of Mail.ru Cloud Solutions

An example of an event-driven application based on webhooks in the S3 object storage of Mail.ru Cloud Solutions
Rube Goldberg coffee machine

Event-driven architecture increases the cost efficiency of the resources used, because they are activated only when they are needed. There are many options for how to implement this and not create additional cloud entities as worker applications. And today I will not talk about FaaS, but about webhooks. I'll show a case study of event handling with object storage webhooks.

A few words about object storage and webhooks. Object storage allows you to store any data in the cloud in the form of objects available via S3 or another API (depending on implementation) via HTTP/HTTPS. Webhooks are generally custom HTTP callbacks. They are usually triggered by an event, such as a code push to a repository or a comment posted on a blog. When an event occurs, the originating site sends an HTTP request to the URL specified for the webhook. As a result, you can make events on one site trigger actions on another (wiki). In the case when the source site is an object storage, the changes in its content act as events.

Examples of simple cases where such automation can be used:

  1. Create copies of all objects in another cloud storage. Copies should be created "on the fly", with any addition or change of files.
  2. Automatic creation of a series of thumbnails of graphic files, adding watermarks to photos, other image modifications.
  3. Notification of the arrival of new documents (for example, a distributed accounting service uploads reports to the cloud, and financial monitoring receives notifications about new reports, checks and analyzes them).
  4. Slightly more complex cases involve, for example, generating a request to Kubernetes, which creates a pod with the necessary containers, passes task parameters to it, and collapses the container after processing.

As an example, we will make a variant of task 1, when changes in the Mail.ru Cloud Solutions (MCS) object storage bucket using webhooks are synchronized in the AWS object storage. In a real loaded case, asynchronous work should be provided by registering webhooks in a queue, but for the training task we will do the implementation without it.

Way of working

The interaction protocol is described in detail in guide to S3 webhooks on MCS. The scheme of work has the following elements:

  • Publishing Service, which is on the side of the S3 storage and publishes HTTP requests when the webnhook fires.
  • Webhook serverA that listens for requests from the publishing service over HTTP and takes the appropriate action. The server can be written in any language, in our example we will write the server in Go.

A feature of the implementation of webhooks in the S3 API is the registration of the webhook receiving server on the publishing service. In particular, the webhook receiver server must acknowledge the subscription to the publish service messages (in other webhook implementations, it is usually not required to acknowledge the subscription).

Accordingly, the webhook receiving server must support two main operations:

  • respond to a publishing service request for registration confirmation,
  • handle incoming events.

Setting up a webhook receiver

To run the webhook receiving server, you need a Linux server. In this article, we use a virtual instance as an example, which we deploy on the MCS.

Install the necessary software and start the webhook receiving server.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Clone the folder with the webhook receiving server:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Let's start the server:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Subscribing to a publishing service

You can register your webhook receiving server through the API or web interface. For simplicity, we will register through the web interface:

  1. Go to the section of buckets in the control room.
  2. We go into the bucket, for which we will configure webhooks, and click on the gear:

An example of an event-driven application based on webhooks in the S3 object storage of Mail.ru Cloud Solutions

Go to the Webhooks tab and click Add:

An example of an event-driven application based on webhooks in the S3 object storage of Mail.ru Cloud Solutions
Fill in the fields:

An example of an event-driven application based on webhooks in the S3 object storage of Mail.ru Cloud Solutions

ID - the name of the webhook.

Event - what events to send. We set the transmission of all events that occur when working with files (adding and deleting).

URL - the address of the webhook receiving server.

Filter prefix/suffix - a filter that allows you to generate webhooks only for objects whose names match certain rules. For example, in order for the webhook to work only on files with a .png extension, in Filter suffix you need to write "png".

Currently, only ports 80 and 443 are supported for accessing the webhook receiving server.

Let's press Add hook and we will see the following:

An example of an event-driven application based on webhooks in the S3 object storage of Mail.ru Cloud Solutions
Hook added.

The webhook receiving server in the logs shows the progress of the hook registration process:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Registration is over. In the next section, we will consider in more detail the algorithm of the webhook receiving server.

Description of the webhook server

In our example, the server is written in Go. Let's analyze the basic principles of its work.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Consider the main functions:

  • Ping() - a route that responds to a URL / ping, the simplest implementation of a liveness probe.
  • Webhook() - main route, URL/webhook handler:
    • confirms registration on the publishing service (going to the SubscriptionConfirmation function),
    • handles incoming webhooks (Gorecords function).
  • The HmacSha256 and HmacSha256hex functions are implementations of the HMAC-SHA256 and HMAC-SHA256 encryption algorithms with output as a string of hexadecimal numbers to calculate the signature.
  • main is the main function, handles command line options and registers URL handlers.

Command line options accepted by the server:

  • -port is the port on which the server will listen.
  • -address is the IP address that the server will listen on.
  • -script - an external program that is called on each incoming hook.

Let's take a closer look at some of the features:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

This function determines whether it is a registration confirmation request or a webhook. As follows from documentation, in case of registration confirmation, the following Json structure is received in the Post request:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":Β«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tAΒ»
}

This request must be answered:

content-type: application/json

{"signature":Β«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37afΒ»}

Where the signature is calculated as:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

If a webhook arrives, then the Post request structure looks like this:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Accordingly, depending on the request, you need to understand how to process the data. I chose the record as the indicator "Type":"SubscriptionConfirmation", since it is present in the subscription confirmation request and not present in the webhook. Based on the presence/absence of this entry in the POST request, further execution of the program goes either to the function SubscriptionConfirmation, or into a function GotRecords.

We will not consider the SubscriptionConfirmation function in detail; it is implemented according to the principles set forth in documentation. You can view the source code for this function at project git repositories.

The GotRecords function parses the incoming request and for each Record object calls an external script (whose name was passed in the -script parameter) with the following parameters:

  • bucket name
  • object key
  • action:
    • copy - if in the original request EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - if in the original request EventName = ObjectRemoved | DeleteObject

Thus, if a hook arrives with a Post request, as described above, and parameter -script=script.sh then the script will be called as follows:

script.sh  bucketA some-file-to-bucket copy

It should be understood that this webhook receiving server is not a complete production solution, but a simplified example of a possible implementation.

Work example

Let's synchronize the files of the main bucket in MCS to the backup bucket in AWS. The main bucket is called myfiles-ash, the backup bucket is called myfiles-backup (configuring the bucket in AWS is outside the scope of this article). Accordingly, when a file is placed in the main bucket, its copy should appear in the backup, when it is removed from the main one, it should be deleted in the backup.

We will work with buckets using the awscli utility, which is compatible with both MCS cloud storage and AWS cloud storage.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Let's configure access to the S3 MCS API:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Let's configure access to the AWS S3 API:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Let's check the access:

To AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

For MCS, when running the command, add --endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Accessed.

Now let's write a script for processing the incoming hook, let's call it s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added β€” copy it to backup bucket
# if file removed β€” remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added β€” copy it to backup bucket
# if file removed β€” remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

We start the server:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Let's check how it works. Through MCS web interface add the test.txt file to the myfiles-ash bucket. The logs in the console show that a request was made to the webhook server:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Let's check the contents of the myfiles-backup bucket in AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Now, through the web interface, delete the file from the myfiles-ash bucket.

Server logs:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Bucket content:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

File deleted, problem solved.

Conclusion and ToDo

All code used in this article is in my repository. There are also examples of scripts and examples of counting signatures for registering webhooks.

This code is nothing more than an example of how you can use S3 webhooks in your activities. As I said at the beginning, if you plan to use such a server in production, you need to at least rewrite the server for asynchronous work: register incoming webhooks in a queue (RabbitMQ or NATS), and from there they can be parsed and processed by worker applications. Otherwise, with a massive arrival of webhooks, you may encounter a lack of server resources to complete tasks. The presence of queues allows you to spread the server and workers, as well as solve issues with repeating tasks in case of failures. It is also desirable to change the logging to a more detailed and more standardized one.

Good Luck!

Read more on the topic:

Source: habr.com

Add a comment