Příklad aplikace řízené událostmi založené na webhoocích v úložišti objektů S3 Mail.ru Cloud Solutions

Příklad aplikace řízené událostmi založené na webhoocích v úložišti objektů S3 Mail.ru Cloud Solutions
Kávovar Rube Goldberg

Architektura řízená událostmi zvyšuje nákladovou efektivitu používaných zdrojů, protože jsou využívány pouze v okamžiku, kdy jsou potřeba. Existuje mnoho možností, jak to implementovat a nevytvářet další cloudové entity jako pracovní aplikace. A dnes nebudu mluvit o FaaS, ale o webhoocích. Ukážu výukový příklad zpracování událostí pomocí webhooků pro ukládání objektů.

Pár slov o ukládání objektů a webhoocích. Objektové úložiště umožňuje ukládat jakákoli data v cloudu ve formě objektů, přístupných přes S3 nebo jiné API (v závislosti na implementaci) přes HTTP/HTTPS. Webhooky jsou obecně vlastní zpětná volání HTTP. Obvykle jsou spouštěny událostí, jako je odeslání kódu do úložiště nebo zveřejnění komentáře na blogu. Když dojde k události, původní web odešle požadavek HTTP na adresu URL zadanou pro webhook. V důsledku toho můžete události na jednom webu spouštět akce na jiném (wiki). V případě, že je zdrojovým webem úložiště objektů, události fungují jako změny jeho obsahu.

Příklady jednoduchých případů, kdy lze takovou automatizaci použít:

  1. Vytváření kopií všech objektů v jiném cloudovém úložišti. Kopie musí být vytvářeny za chodu při každém přidání nebo změně souborů.
  2. Automatické vytváření série náhledů grafických souborů, přidávání vodoznaků k fotografiím a další úpravy obrázků.
  3. Upozornění na příchod nových dokumentů (například distribuovaná účetní služba nahraje sestavy do cloudu a finanční monitoring přijímá upozornění na nové sestavy, kontroluje je a analyzuje).
  4. O něco složitější případy zahrnují například vygenerování požadavku na Kubernetes, který vytvoří pod s potřebnými kontejnery, předá mu parametry úkolu a po zpracování kontejner sbalí.

Jako příklad uděláme variantu úlohy 1, kdy se změny v bucketu úložiště objektů Mail.ru Cloud Solutions (MCS) synchronizují v úložišti objektů AWS pomocí webhooků. Ve skutečně nabitém případě by asynchronní práce měla být zajištěna registrací webhooků ve frontě, ale pro tréninkový úkol provedeme implementaci bez tohoto.

Pracovní schéma

Interakční protokol je podrobně popsán v Průvodce webhooky S3 na MCS. Pracovní schéma obsahuje následující prvky:

  • Vydavatelská služba, který je na straně úložiště S3 a publikuje požadavky HTTP, když je spuštěn webnhook.
  • Webhookový přijímací server, která naslouchá požadavkům od služby publikování HTTP a provádí příslušné akce. Server může být napsán v libovolném jazyce, v našem příkladu napíšeme server v Go.

Zvláštností implementace webhooků v S3 API je registrace přijímajícího serveru webhooku na publikační službě. Přijímající server webhooku musí zejména potvrdit přihlášení k odběru zpráv ze služby publikování (v jiných implementacích webhooku se potvrzení předplatného obvykle nevyžaduje).

V souladu s tím musí přijímací server webhooku podporovat dvě hlavní operace:

  • reagovat na žádost vydavatelské služby o potvrzení registrace,
  • zpracovávat příchozí události.

Instalace přijímacího serveru webhooku

Ke spuštění přijímacího serveru webhooku potřebujete server Linux. V tomto článku jako příklad používáme virtuální instanci, kterou nasazujeme na MCS.

Nainstalujme potřebný software a spusťte přijímací server webhooku.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Naklonujte složku s přijímajícím serverem webhooku:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Spustíme server:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Přihlaste se k odběru publikační služby

Váš webhookový přijímací server můžete zaregistrovat přes API nebo webové rozhraní. Pro zjednodušení provedeme registraci přes webové rozhraní:

  1. Pojďme do sekce lopaty ve velínu.
  2. Přejděte do kbelíku, pro který budeme konfigurovat webhooky, a klikněte na ozubené kolo:

Příklad aplikace řízené událostmi založené na webhoocích v úložišti objektů S3 Mail.ru Cloud Solutions

Přejděte na kartu Webhooky a klikněte na Přidat:

Příklad aplikace řízené událostmi založené na webhoocích v úložišti objektů S3 Mail.ru Cloud Solutions
Vyplňte pole:

Příklad aplikace řízené událostmi založené na webhoocích v úložišti objektů S3 Mail.ru Cloud Solutions

ID — název webhooku.

Událost – které události přenést. Nastavili jsme přenos všech událostí, ke kterým dochází při práci se soubory (přidávání a mazání).

URL — adresa přijímajícího serveru webhooku.

Prefix/suffix filtru je filtr, který umožňuje generovat webhooky pouze pro objekty, jejichž názvy odpovídají určitým pravidlům. Například, aby webhook spouštěl pouze soubory s příponou .png, in Přípona filtru musíte napsat „png“.

V současné době jsou pro přístup k přijímacímu serveru webhooku podporovány pouze porty 80 a 443.

Pojďme kliknout Přidejte háček a uvidíme následující:

Příklad aplikace řízené událostmi založené na webhoocích v úložišti objektů S3 Mail.ru Cloud Solutions
dodal Hook.

Přijímající server webhooku zobrazuje ve svých protokolech průběh procesu registrace háku:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Registrace je dokončena. V další části se blíže podíváme na algoritmus fungování přijímacího serveru webhooku.

Popis přijímacího serveru webhooku

V našem příkladu je server napsán v Go. Podívejme se na základní principy jeho fungování.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Zvažte hlavní funkce:

  • Ping() – cesta, která odpovídá přes URL/ping, nejjednodušší implementace sondy živosti.
  • Webhook() – hlavní trasa, obslužný program URL/webhooku:
    • potvrdí registraci na vydavatelské službě (přejděte na funkci SubscriptionConfirmation),
    • zpracovává příchozí webhooky (funkce Gorecords).
  • Funkce HmacSha256 a HmacSha256hex jsou implementacemi šifrovacích algoritmů HMAC-SHA256 a HMAC-SHA256 s výstupem jako řetězec hexadecimálních čísel pro výpočet podpisu.
  • main je hlavní funkce, zpracovává parametry příkazového řádku a registruje obsluhu URL.

Parametry příkazového řádku akceptované serverem:

  • -port je port, na kterém bude server naslouchat.
  • -address - IP adresa, kterou bude server poslouchat.
  • -script je externí program, který je volán pro každý příchozí háček.

Podívejme se blíže na některé funkce:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Tato funkce určuje, zda přišel požadavek na potvrzení registrace nebo webhook. Jak vyplývá z dokumentace, pokud je registrace potvrzena, v žádosti o příspěvek je přijata následující struktura Json:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

Na tento dotaz je potřeba odpovědět:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Kde se podpis vypočítá takto:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Pokud dorazí webhook, struktura požadavku Post vypadá takto:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

Podle toho musíte v závislosti na požadavku pochopit, jak data zpracovávat. Jako indikátor jsem zvolil vstup "Type":"SubscriptionConfirmation", protože je přítomen v žádosti o potvrzení předplatného a není přítomen ve webhooku. Na základě přítomnosti/nepřítomnosti tohoto záznamu v požadavku POST přejde další provádění programu buď do funkce SubscriptionConfirmation, nebo do funkce GotRecords.

Funkci SubscriptionConfirmation nebudeme podrobně zvažovat, je implementována podle zásad uvedených v dokumentace. Zdrojový kód této funkce si můžete prohlédnout na projektové git repozitáře.

Funkce GotRecords analyzuje příchozí požadavek a pro každý objekt Record zavolá externí skript (jehož jméno bylo předáno v parametru -script) s parametry:

  • název kbelíku
  • objektový klíč
  • akce:
    • kopie - pokud je v původním požadavku EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - pokud v původním požadavku EventName = ObjectRemoved | SmazatObjekt

Pokud tedy přijde hák s požadavkem Post, jak je popsáno nada parametr -script=script.sh, pak bude skript volán následovně:

script.sh  bucketA some-file-to-bucket copy

Mělo by být zřejmé, že tento webhookový přijímací server není kompletním produkčním řešením, ale zjednodušeným příkladem možné implementace.

Pracovní příklad

Pojďme synchronizovat soubory z hlavního bucketu v MCS do záložního bucketu v AWS. Hlavní bucket se nazývá myfiles-ash, záložní se nazývá myfiles-backup (konfigurace bucketu v AWS je nad rámec tohoto článku). V souladu s tím, když je soubor umístěn do hlavního segmentu, jeho kopie by se měla objevit v záložním, a když je odstraněn z hlavního, měl by být odstraněn v záložním.

S buckety budeme pracovat pomocí utility awscli, která je kompatibilní jak s cloudovým úložištěm MCS, tak s cloudovým úložištěm AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Pojďme nakonfigurovat přístup k S3 MCS API:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Pojďme nakonfigurovat přístup k AWS S3 API:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Zkontrolujeme přístupy:

Do AWS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

Pro MCS musíte při spuštění příkazu přidat —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Zpřístupněno.

Nyní napíšeme skript pro zpracování příchozího háku, nazvěme ho s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Spustíme server:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Pojďme se podívat, jak to funguje. Přes webové rozhraní MCS přidejte soubor test.txt do kbelíku myfiles-ash. Protokoly konzoly ukazují, že byl odeslán požadavek na server webhooku:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Pojďme zkontrolovat obsah kbelíku zálohování myfiles v AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Nyní prostřednictvím webového rozhraní smažeme soubor z kbelíku myfiles-ash.

Protokoly serveru:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Obsah kbelíku:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Soubor je smazán, problém je vyřešen.

Závěr a úkol

Veškerý kód použitý v tomto článku je v mém úložišti. Nechybí ani ukázky skriptů a příklady počítání podpisů pro registraci webhooků.

Tento kód není nic jiného než příklad toho, jak můžete webhooky S3 využít ve svých aktivitách. Jak jsem řekl na začátku, pokud plánujete používat takový server v produkci, musíte server alespoň přepsat pro asynchronní práci: zaregistrovat příchozí webhooky do fronty (RabbitMQ nebo NATS) a odtud je analyzovat a zpracovat s pracovními aplikacemi. V opačném případě, když webhooky dorazí masivně, můžete narazit na nedostatek serverových zdrojů k dokončení úkolů. Přítomnost front vám umožňuje distribuovat server a pracovníky a také řešit problémy s opakujícími se úkoly v případě selhání. Je také vhodné změnit protokolování na podrobnější a standardizovanější.

Good Luck!

Další čtení k tématu:

Zdroj: www.habr.com

Přidat komentář