Przykład aplikacji sterowanej zdarzeniami opartej na webhookach w obiektowej pamięci masowej S3 Mail.ru Cloud Solutions

Przykład aplikacji sterowanej zdarzeniami opartej na webhookach w obiektowej pamięci masowej S3 Mail.ru Cloud Solutions
Ekspres do kawy Rube Goldberg

Architektura sterowana zdarzeniami zwiększa efektywność kosztową wykorzystywanych zasobów, ponieważ są one wykorzystywane tylko w momencie, gdy są potrzebne. Istnieje wiele opcji wdrożenia tego rozwiązania i uniknięcia tworzenia dodatkowych jednostek w chmurze jako aplikacji roboczych. A dzisiaj nie będę mówić o FaaS, ale o webhookach. Pokażę samouczek dotyczący obsługi zdarzeń przy użyciu webhooków pamięci obiektowej.

Kilka słów o przechowywaniu obiektów i webhookach. Object Storage pozwala na przechowywanie dowolnych danych w chmurze w postaci obiektów dostępnych poprzez S3 lub inne API (w zależności od implementacji) poprzez HTTP/HTTPS. Elementy webhook to zazwyczaj niestandardowe wywołania zwrotne HTTP. Zwykle są wywoływane przez zdarzenie, takie jak przesłanie kodu do repozytorium lub opublikowanie komentarza na blogu. Po wystąpieniu zdarzenia witryna źródłowa wysyła żądanie HTTP na adres URL określony dla elementu webhook. W rezultacie możesz sprawić, że zdarzenia w jednej witrynie będą wyzwalać działania w innej (wiki). W przypadku, gdy witryną źródłową jest magazyn obiektowy, zdarzenia działają jako zmiany w jej zawartości.

Przykłady prostych przypadków, w których można zastosować taką automatyzację:

  1. Tworzenie kopii wszystkich obiektów w innym magazynie w chmurze. Kopie należy tworzyć na bieżąco za każdym razem, gdy pliki są dodawane lub zmieniane.
  2. Automatyczne tworzenie serii miniatur plików graficznych, dodawanie znaków wodnych do zdjęć i inne modyfikacje obrazu.
  3. Powiadomienia o nadejściu nowych dokumentów (np. rozproszona usługa księgowa przesyła raporty do chmury, a monitoring finansowy otrzymuje powiadomienia o nowych raportach, sprawdza je i analizuje).
  4. Nieco bardziej skomplikowane przypadki polegają np. na wygenerowaniu żądania do Kubernetesa, który tworzy pod z niezbędnymi kontenerami, przekazuje do niego parametry zadania, a po przetworzeniu zwija kontener.

Jako przykład zrobimy wariant zadania 1, gdy zmiany w zasobniku przechowywania obiektów Mail.ru Cloud Solutions (MCS) są synchronizowane w magazynie obiektów AWS za pomocą webhooków. W przypadku realnie obciążonym pracę asynchroniczną należy zapewnić rejestrując webhooki w kolejce, natomiast w przypadku zadania szkoleniowego implementację wykonamy bez tego.

Schemat pracy

Protokół interakcji opisano szczegółowo w Przewodnik po webhookach S3 w MCS. Schemat pracy zawiera następujące elementy:

  • Usługa wydawnicza, który znajduje się po stronie magazynu S3 i publikuje żądania HTTP po uruchomieniu webnhooka.
  • Serwer odbierający webhook, który nasłuchuje żądań z usługi publikowania HTTP i wykonuje odpowiednie akcje. Serwer można napisać w dowolnym języku, w naszym przykładzie napiszemy serwer w Go.

Cechą szczególną implementacji webhooków w API S3 jest rejestracja serwera odbierającego webhook w serwisie wydawniczym. W szczególności serwer odbiorczy webhooka musi potwierdzić subskrypcję wiadomości z usługi publikowania (w innych implementacjach webhooka potwierdzenie subskrypcji zwykle nie jest wymagane).

W związku z tym serwer odbierający webhook musi obsługiwać dwie główne operacje:

  • odpowiedzieć na prośbę serwisu wydawniczego o potwierdzenie rejestracji,
  • przetwarzać nadchodzące zdarzenia.

Instalowanie serwera odbierającego webhook

Aby uruchomić serwer odbierający webhook, potrzebujesz serwera Linux. W tym artykule jako przykład wykorzystaliśmy instancję wirtualną, którą wdrażamy w MCS.

Zainstalujmy niezbędne oprogramowanie i uruchommy serwer odbiorczy webhooka.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...

Sklonuj folder z serwerem odbiorczym webhooka:

ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.

Uruchommy serwer:

ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

Zapisz się na usługę wydawniczą

Możesz zarejestrować swój serwer odbierający webhook za pośrednictwem interfejsu API lub interfejsu internetowego. Dla uproszczenia zarejestrujemy się poprzez interfejs WWW:

  1. Przejdźmy do sekcji wiader w sterowni.
  2. Przejdź do segmentu, dla którego skonfigurujemy webhooki i kliknij koło zębate:

Przykład aplikacji sterowanej zdarzeniami opartej na webhookach w obiektowej pamięci masowej S3 Mail.ru Cloud Solutions

Przejdź do zakładki Webhooks i kliknij Dodaj:

Przykład aplikacji sterowanej zdarzeniami opartej na webhookach w obiektowej pamięci masowej S3 Mail.ru Cloud Solutions
Wypełnij pola:

Przykład aplikacji sterowanej zdarzeniami opartej na webhookach w obiektowej pamięci masowej S3 Mail.ru Cloud Solutions

ID — nazwa webhooka.

Zdarzenie – które zdarzenia mają zostać przesłane. Ustawiliśmy transmisję wszystkich zdarzeń występujących podczas pracy z plikami (dodawanie i usuwanie).

URL — webhook odbierający adres serwera.

Prefiks/sufiks filtra to filtr pozwalający na generowanie webhooków tylko dla obiektów, których nazwy odpowiadają określonym regułom. Na przykład, aby webhook uruchamiał tylko pliki z rozszerzeniem .png, w Sufiks filtra musisz napisać „png”.

Obecnie obsługiwane są tylko porty 80 i 443 w celu uzyskania dostępu do serwera odbierającego element webhook.

Kliknijmy Dodaj hak i zobaczymy co następuje:

Przykład aplikacji sterowanej zdarzeniami opartej na webhookach w obiektowej pamięci masowej S3 Mail.ru Cloud Solutions
Dodano hak.

Serwer odbierający webhook pokazuje w swoich logach postęp procesu rejestracji hooka:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

Rejestracja została zakończona. W kolejnym podrozdziale przyjrzymy się bliżej algorytmowi działania serwera odbiorczego webhooka.

Opis serwera odbierającego element webhook

W naszym przykładzie serwer jest napisany w Go. Przyjrzyjmy się podstawowym zasadom jego działania.

package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %sn", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pongn")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}

Rozważ główne funkcje:

  • Ping() - trasa odpowiadająca poprzez adres URL/ping, najprostsza implementacja sondy na żywo.
  • Webhook() - główna trasa, obsługa adresu URL/webhooka:
    • potwierdza rejestrację w serwisie wydawniczym (przejdź do funkcji Potwierdzenie subskrypcji),
    • przetwarza przychodzące webhooki (funkcja Gorecords).
  • Funkcje HmacSha256 i HmacSha256hex są implementacjami algorytmów szyfrowania HMAC-SHA256 i HMAC-SHA256 z wyjściem w postaci ciągu liczb szesnastkowych do obliczenia podpisu.
  • main jest funkcją główną, przetwarza parametry wiersza poleceń i rejestruje procedury obsługi adresów URL.

Parametry wiersza poleceń akceptowane przez serwer:

  • -port to port, na którym serwer będzie nasłuchiwał.
  • -adres - adres IP, którego serwer będzie nasłuchiwał.
  • -script to zewnętrzny program wywoływany dla każdego przychodzącego haka.

Przyjrzyjmy się bliżej niektórym funkcjom:

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %sn", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
""Type":"SubscriptionConfirmation"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}

Ta funkcja określa, czy nadeszła prośba o potwierdzenie rejestracji lub webhooka. Jak wynika z dokumentacja, jeśli rejestracja zostanie potwierdzona, w żądaniu Post otrzymana zostanie następująca struktura Json:

POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}

Należy odpowiedzieć na to zapytanie:

content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

Gdzie podpis jest obliczany jako:

signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))

Jeśli nadejdzie webhook, struktura żądania Post wygląda następująco:

POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}

W związku z tym, w zależności od żądania, musisz zrozumieć, w jaki sposób przetwarzane są dane. Wybrałem wpis jako wskaźnik "Type":"SubscriptionConfirmation", ponieważ jest on obecny w żądaniu potwierdzenia subskrypcji i nie jest obecny w webhooku. Na podstawie obecności/braku tego wpisu w żądaniu POST dalsza realizacja programu przechodzi albo do funkcji SubscriptionConfirmationlub do funkcji GotRecords.

Nie będziemy szczegółowo omawiać funkcji SubscriptionConfirmation, jest ona realizowana zgodnie z zasadami określonymi w dokumentacja. Kod źródłowy tej funkcji można zobaczyć pod adresem repozytoria git projektu.

Funkcja GotRecords analizuje przychodzące żądanie i dla każdego obiektu Record wywołuje zewnętrzny skrypt (którego nazwa została przekazana w parametrze -script) z parametrami:

  • nazwa wiadra
  • klucz obiektu
  • akcja:
    • copy - jeśli w pierwotnym żądaniu EventName = ObjectCreated | UmieśćObiekt | Umieść kopię obiektu
    • usuń - jeśli w pierwotnym żądaniu EventName = ObjectRemoved | Usuń obiekt

Tak więc, jeśli hak nadejdzie z żądaniem Post, zgodnie z opisem powyżeji parametrem -script=script.sh wówczas skrypt zostanie wywołany w następujący sposób:

script.sh  bucketA some-file-to-bucket copy

Należy rozumieć, że ten serwer odbierający webhook nie jest kompletnym rozwiązaniem produkcyjnym, ale uproszczonym przykładem możliwej implementacji.

Przykład pracy

Zsynchronizujmy pliki z głównego zasobnika w MCS z zasobnikiem zapasowym w AWS. Główny zasobnik nazywa się myfiles-ash, zapasowy nazywa się myfiles-backup (konfiguracja zasobnika w AWS wykracza poza zakres tego artykułu). Odpowiednio, gdy plik zostanie umieszczony w głównym zasobniku, jego kopia powinna pojawić się w zapasowym, a gdy zostanie usunięty z głównego, powinien zostać usunięty w zapasowym.

Będziemy pracować z zasobnikami za pomocą narzędzia awscli, które jest kompatybilne zarówno z chmurą MCS, jak i chmurą AWS.

ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...

Skonfigurujmy dostęp do API S3 MCS:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:

Skonfigurujmy dostęp do API AWS S3:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:

Sprawdźmy dostępy:

Do AWS-a:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup

W przypadku MCS podczas uruchamiania polecenia należy dodać —endpoint-url:

ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash

Dostęp.

Napiszmy teraz skrypt przetwarzający przychodzący hak, nazwijmy go s3_backup_mcs_aws.sh

#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET=""
SOURCE_FILE=""
ACTION=""

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: 
#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"
SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"
SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
case ${ACTION} in
"copy")
${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
rm ${TEMP}
;;
"delete")
${AWSCLI_AWS} rm ${TARGET}
;;
*)
echo "Usage: ${0} sourcebucket sourcefile copy/delete"
exit 1
;;
esac
sourcebucket sourcefile copy/delete" exit 1 ;; esac

Uruchommy serwer:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh

Zobaczmy jak to działa. Poprzez Interfejs sieciowy MCS dodaj plik test.txt do wiadra myfiles-ash. Dzienniki konsoli pokazują, że do serwera webhooka wysłano żądanie:

2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt

Sprawdźmy zawartość segmentu myfiles-backup w AWS:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt

Teraz za pośrednictwem interfejsu internetowego usuniemy plik z wiadra myfiles-ash.

Logi serwera:

2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt

Zawartość wiadra:

ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$

Plik został usunięty, problem rozwiązany.

Wnioski i zadania do wykonania

Cały kod użyty w tym artykule to w moim repozytorium. Znajdują się tam również przykłady skryptów i przykłady zliczania podpisów do rejestracji webhooków.

Ten kod to nic innego jak przykład wykorzystania webhooków S3 w swoich działaniach. Jak mówiłem na początku, jeśli planujesz używać takiego serwera w produkcji, musisz przynajmniej przepisać serwer do pracy asynchronicznej: rejestrować przychodzące webhooki w kolejce (RabbitMQ lub NATS), a stamtąd je analizować i przetwarzać z aplikacjami pracowniczymi. W przeciwnym razie, gdy webhooki pojawią się masowo, może wystąpić brak zasobów serwera do wykonania zadań. Obecność kolejek pozwala na dystrybucję serwera i pracowników, a także rozwiązywanie problemów z powtarzaniem zadań w przypadku awarii. Wskazana jest także zmiana sposobu logowania na bardziej szczegółowy i ustandaryzowany.

Good Luck!

Więcej lektury na ten temat:

Źródło: www.habr.com

Dodaj komentarz