將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

前言

我的網站是我的業餘愛好,旨在託管有趣的主頁和個人網站。 在我的程式設計之旅一開始,我就開始對這個主題感興趣;在那一刻,我著迷於尋找那些寫下自己、他們的愛好和專案的偉大專業人士。 自己發現它們的習慣仍然存在:在幾乎所有商業網站和非商業網站上,我繼續在頁腳中找到作者的連結。

想法的實施

第一個版本只是我個人網站上的一個 html 頁面,我將帶有簽名的連結放入 ul 列表中。 在一段時間內打了 20 頁後,我開始認為這不是很有效,並決定嘗試自動化這個過程。 在stackoverflow 上,我注意到很多人在他們的個人資料中指明了站點,所以我用php 編寫了一個解析器,它只是簡單地瀏覽了個人資料,從第一個開始(到目前為止的地址是這樣的:`/users/1` ),從所需標籤中提取連結並將其新增至 SQLite 中。

這可以稱為第二個版本:在 SQLite 表中收集數萬個 URL,它取代了 HTML 中的靜態清單。 我對此列表進行了簡單的搜尋。 因為只有 URL,搜尋就只是基於它們。

在這個階段我放棄了這個專案並在很長一段時間後重新開始。 這個階段我的工作經驗已經三年多了,我覺得我可以做一些更認真的事。 此外,人們非常渴望掌握相對較新的技術。

現代版

項目 部署在Docker中,資料庫轉移到了mongoDb,最近又加了蘿蔔,一開始只是為了快取。 使用 PHP 微框架之一作為基礎。

問題

新網站透過控制台命令添加,該命令同步執行以下操作:

  • 透過 URL 下載內容
  • 設定一個標誌來指示 HTTPS 是否可用
  • 保留網站的本質
  • 來源 HTML 和標頭保存在「索引」歷史記錄中
  • 解析內容,擷取標題和描述
  • 將資料保存到單獨的集合中

這足以簡單地儲存網站並將其顯示在清單中:

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

但自動對所有內容進行索引、分類和排名、保持所有內容最新的想法並不適合這種範式。 即使只是簡單地新增一個 Web 方法來新增頁面,也需要進行程式碼重複和阻止,以避免潛在的 DDoS。

當然,一般來說,一切都可以同步完成,並且在 Web 方法中,您可以簡單地保存 URL,以便可怕的守護程序執行清單中 URL 的所有任務。 但即使在這裡,「隊列」這個詞還是很明顯的。 而如果實作了佇列,那麼所有任務至少可以被分割並非同步執行。

解決方法

實作佇列並建立一個事件驅動系統來處理所有任務。 我很早就想嘗試 Redis Streams。

在 PHP 中使用 Redis 串流

因為由於我的框架不是 Symfony、Laravel、Yii 三大巨頭之一,所以我想找一個獨立的函式庫。 但是,事實證明(第一次檢查),不可能找到個別嚴肅的圖書館。 與隊列相關的所有內容要么是五年前 3 次提交的項目,要么與框架相關。

我聽說過很多關於 Symfony 作為單一有用組件的供應商的消息,並且我已經使用了其中的一些組件。 而且 Laravel 的一些東西也可以使用,例如它們的 ORM,而無需框架本身。

交響樂/信使

第一個候選人立即看起來很理想,毫無疑問我安裝了它。 但事實證明,用 google 搜尋 Symfony 以外的使用範例更加困難。 如何從一堆具有通用、無意義名稱的類別、用於傳遞訊息的匯流排、甚至在 Redis 上組裝?

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

官方網站上的文件非常詳細,但僅描述了 Symfony 的初始化,使用他們最喜歡的 YML 和其他非交響樂演奏家的魔術方法。 我對安裝過程本身沒有興趣,尤其是在新年假期期間。 但我不得不這樣做出乎意料的很長一段時間。

在緊迫的期限內,試著弄清楚如何使用 Symfony 原始碼實例化系統也不是最簡單的任務:

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

在深入研究這一切並嘗試用手做一些事情之後,我得出的結論是我正在做某種拐杖,並決定嘗試其他事情。

照明/隊列

事實證明,這個庫與 Laravel 基礎設施和一堆其他依賴項緊密相關,所以我沒有花太多時間在它上面:我安裝了它,查看了它,查看了依賴項並刪除了它。

yiisoft/yii2-佇列

好吧,這裡從名字上就可以立即看出它與 Yii2 的嚴格聯繫。 我不得不使用這個庫,它還不錯,但我沒有想到它完全依賴 Yii2。

其餘的

我在 GitHub 上發現的其他所有內容都是不可靠、過時和廢棄的項目,沒有星星、分叉和大量提交。

返回 symfony/messenger,技術細節

我必須弄清楚這個圖書館,在花了更多時間之後,我終於做到了。 事實證明,一切都非常簡潔簡單。 為了實例化總線,我做了一個小工廠,因為...... 我應該有幾個輪胎和不同的處理程序。

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

只需幾個步驟:

  • 我們創建應該可以簡單調用的訊息處理程序
  • 我們將它們包裝在 HandlerDescriptor (庫中的類別)中
  • 我們將這些「描述符」包裝在 HandlersLocator 實例中
  • 將 HandlersLocator 新增至 MessageBus 實例
  • 我們將一組「SenderInterface」傳遞給 SendersLocator,在我的例子中是「RedisTransport」類別的實例,這些實例以明顯的方式進行配置
  • 將 SendersLocator 新增至 MessageBus 實例

MessageBus 有一個 `->dispatch()` 方法,它在 HandlersLocator 中尋找適當的處理程序並將訊息傳遞給它們,使用對應的 `SenderInterface` 透過匯流排(Redis 流)傳送訊息。

在容器配置中(在本例中為 php-di),整個套件可以這樣配置:

        CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
            return new RedisTransport(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
                $c->get(CONTAINER_SERIALIZER))
            ;
        },
        CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
            return new RedisTransport(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
                $c->get(CONTAINER_SERIALIZER))
            ;
        },
        CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
            return new RedisReceiver(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
                $c->get(CONTAINER_SERIALIZER)
            );
        },
        CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
            return new RedisReceiver(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
                $c->get(CONTAINER_SERIALIZER)
            );
        },
        CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
            $sendersLocator = new SendersLocator([
                AppMessagesSecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
                AppMessagesDaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
            ], $c);
            $middleware[] = new SendMessageMiddleware($sendersLocator);

            return new MessageBus($middleware);
        },
        CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
            $host = 'bu-02-redis';
            $port = 6379;
            $dsn = "redis://$host:$port";
            $options = [
                'stream' => 'secret',
                'group' => 'default',
                'consumer' => 'default',
            ];

            return Connection::fromDsn($dsn, $options);
        },
        CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
            $host = 'bu-02-redis';
            $port = 6379;
            $dsn = "redis://$host:$port";
            $options = [
                'stream' => 'log',
                'group' => 'default',
                'consumer' => 'default',
            ];

            return Connection::fromDsn($dsn, $options);
        },

在這裡您可以看到,在 SendersLocator 中,我們為兩個不同的訊息分配了不同的“傳輸”,每個訊息都有自己到相應流的連接。

我製作了一個單獨的演示項目,演示了三個守護程序使用以下總線相互通信的應用程式: https://github.com/backend-university/products/tree/master/products/02-redis-streams-bus.

但我將向您展示如何建立消費者:

use AppMessagesDaemonLogMessage;
use SymfonyComponentMessengerHandlerHandlerDescriptor;
use SymfonyComponentMessengerHandlerHandlersLocator;
use SymfonyComponentMessengerMessageBus;
use SymfonyComponentMessengerMiddlewareHandleMessageMiddleware;
use SymfonyComponentMessengerMiddlewareSendMessageMiddleware;
use SymfonyComponentMessengerTransportSenderSendersLocator;

require_once __DIR__ . '/../vendor/autoload.php';
/** @var PsrContainerContainerInterface $container */
$container = require_once('config/container.php');

$handlers = [
    DaemonLogMessage::class => [
        new HandlerDescriptor(
            function (DaemonLogMessage $m) {
                error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
            },
            ['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
        )
    ],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);

$bus = new MessageBus($middleware);
$receivers = [
    CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new SymfonyComponentMessengerWorker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();

在應用程式中使用此基礎設施

在我的後端實現總線後,我將各個階段與舊的同步命令分開,並創建了單獨的處理程序,每個處理程序都做自己的事情。

將新網站新增至資料庫的管道如下所示:

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

在那之後,我添加新功能變得更加容易,例如提取和解析 Rss。 因為此過程還需要原始內容,然後 RSS 連結提取器處理程序(如 WebsiteIndexHistoryPersistor)訂閱「Content/HtmlContent」訊息,對其進行處理並沿著其管道進一步傳遞所需的訊息。

將 PHP 後端轉移到 Redis 串流匯流排並選擇獨立於框架的函式庫

最後,我們得到了幾個守護進程,每個守護進程僅維護與必要資源的連結。 例如惡魔 爬蟲 包含所有需要存取 Internet 取得內容的處理程序以及守護進程 堅持 保持與資料庫的連線。

現在,持久化器插入後所需的 id 不再從資料庫中進行選擇,而是透過匯流排簡單地傳輸到所有感興趣的處理程序。

來源: www.habr.com

添加評論