前言
我的網站是我的業餘愛好,旨在託管有趣的主頁和個人網站。 在我的程式設計之旅一開始,我就開始對這個主題感興趣;在那一刻,我著迷於尋找那些寫下自己、他們的愛好和專案的偉大專業人士。 自己發現它們的習慣仍然存在:在幾乎所有商業網站和非商業網站上,我繼續在頁腳中找到作者的連結。
想法的實施
第一個版本只是我個人網站上的一個 html 頁面,我將帶有簽名的連結放入 ul 列表中。 在一段時間內打了 20 頁後,我開始認為這不是很有效,並決定嘗試自動化這個過程。 在stackoverflow 上,我注意到很多人在他們的個人資料中指明了站點,所以我用php 編寫了一個解析器,它只是簡單地瀏覽了個人資料,從第一個開始(到目前為止的地址是這樣的:`/users/1` ),從所需標籤中提取連結並將其新增至 SQLite 中。
這可以稱為第二個版本:在 SQLite 表中收集數萬個 URL,它取代了 HTML 中的靜態清單。 我對此列表進行了簡單的搜尋。 因為只有 URL,搜尋就只是基於它們。
在這個階段我放棄了這個專案並在很長一段時間後重新開始。 這個階段我的工作經驗已經三年多了,我覺得我可以做一些更認真的事。 此外,人們非常渴望掌握相對較新的技術。
現代版
問題
新網站透過控制台命令添加,該命令同步執行以下操作:
- 透過 URL 下載內容
- 設定一個標誌來指示 HTTPS 是否可用
- 保留網站的本質
- 來源 HTML 和標頭保存在「索引」歷史記錄中
- 解析內容,擷取標題和描述
- 將資料保存到單獨的集合中
這足以簡單地儲存網站並將其顯示在清單中:
但自動對所有內容進行索引、分類和排名、保持所有內容最新的想法並不適合這種範式。 即使只是簡單地新增一個 Web 方法來新增頁面,也需要進行程式碼重複和阻止,以避免潛在的 DDoS。
當然,一般來說,一切都可以同步完成,並且在 Web 方法中,您可以簡單地保存 URL,以便可怕的守護程序執行清單中 URL 的所有任務。 但即使在這裡,「隊列」這個詞還是很明顯的。 而如果實作了佇列,那麼所有任務至少可以被分割並非同步執行。
解決方法
實作佇列並建立一個事件驅動系統來處理所有任務。 我很早就想嘗試 Redis Streams。
在 PHP 中使用 Redis 串流
因為由於我的框架不是 Symfony、Laravel、Yii 三大巨頭之一,所以我想找一個獨立的函式庫。 但是,事實證明(第一次檢查),不可能找到個別嚴肅的圖書館。 與隊列相關的所有內容要么是五年前 3 次提交的項目,要么與框架相關。
我聽說過很多關於 Symfony 作為單一有用組件的供應商的消息,並且我已經使用了其中的一些組件。 而且 Laravel 的一些東西也可以使用,例如它們的 ORM,而無需框架本身。
交響樂/信使
第一個候選人立即看起來很理想,毫無疑問我安裝了它。 但事實證明,用 google 搜尋 Symfony 以外的使用範例更加困難。 如何從一堆具有通用、無意義名稱的類別、用於傳遞訊息的匯流排、甚至在 Redis 上組裝?
官方網站上的文件非常詳細,但僅描述了 Symfony 的初始化,使用他們最喜歡的 YML 和其他非交響樂演奏家的魔術方法。 我對安裝過程本身沒有興趣,尤其是在新年假期期間。 但我不得不這樣做出乎意料的很長一段時間。
在緊迫的期限內,試著弄清楚如何使用 Symfony 原始碼實例化系統也不是最簡單的任務:
在深入研究這一切並嘗試用手做一些事情之後,我得出的結論是我正在做某種拐杖,並決定嘗試其他事情。
照明/隊列
事實證明,這個庫與 Laravel 基礎設施和一堆其他依賴項緊密相關,所以我沒有花太多時間在它上面:我安裝了它,查看了它,查看了依賴項並刪除了它。
yiisoft/yii2-佇列
好吧,這裡從名字上就可以立即看出它與 Yii2 的嚴格聯繫。 我不得不使用這個庫,它還不錯,但我沒有想到它完全依賴 Yii2。
其餘的
我在 GitHub 上發現的其他所有內容都是不可靠、過時和廢棄的項目,沒有星星、分叉和大量提交。
返回 symfony/messenger,技術細節
我必須弄清楚這個圖書館,在花了更多時間之後,我終於做到了。 事實證明,一切都非常簡潔簡單。 為了實例化總線,我做了一個小工廠,因為...... 我應該有幾個輪胎和不同的處理程序。
只需幾個步驟:
- 我們創建應該可以簡單調用的訊息處理程序
- 我們將它們包裝在 HandlerDescriptor (庫中的類別)中
- 我們將這些「描述符」包裝在 HandlersLocator 實例中
- 將 HandlersLocator 新增至 MessageBus 實例
- 我們將一組「SenderInterface」傳遞給 SendersLocator,在我的例子中是「RedisTransport」類別的實例,這些實例以明顯的方式進行配置
- 將 SendersLocator 新增至 MessageBus 實例
MessageBus 有一個 `->dispatch()` 方法,它在 HandlersLocator 中尋找適當的處理程序並將訊息傳遞給它們,使用對應的 `SenderInterface` 透過匯流排(Redis 流)傳送訊息。
在容器配置中(在本例中為 php-di),整個套件可以這樣配置:
CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
$sendersLocator = new SendersLocator([
AppMessagesSecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
AppMessagesDaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
], $c);
$middleware[] = new SendMessageMiddleware($sendersLocator);
return new MessageBus($middleware);
},
CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'secret',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'log',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
在這裡您可以看到,在 SendersLocator 中,我們為兩個不同的訊息分配了不同的“傳輸”,每個訊息都有自己到相應流的連接。
我製作了一個單獨的演示項目,演示了三個守護程序使用以下總線相互通信的應用程式:
但我將向您展示如何建立消費者:
use AppMessagesDaemonLogMessage;
use SymfonyComponentMessengerHandlerHandlerDescriptor;
use SymfonyComponentMessengerHandlerHandlersLocator;
use SymfonyComponentMessengerMessageBus;
use SymfonyComponentMessengerMiddlewareHandleMessageMiddleware;
use SymfonyComponentMessengerMiddlewareSendMessageMiddleware;
use SymfonyComponentMessengerTransportSenderSendersLocator;
require_once __DIR__ . '/../vendor/autoload.php';
/** @var PsrContainerContainerInterface $container */
$container = require_once('config/container.php');
$handlers = [
DaemonLogMessage::class => [
new HandlerDescriptor(
function (DaemonLogMessage $m) {
error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
},
['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
)
],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);
$bus = new MessageBus($middleware);
$receivers = [
CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new SymfonyComponentMessengerWorker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();
在應用程式中使用此基礎設施
在我的後端實現總線後,我將各個階段與舊的同步命令分開,並創建了單獨的處理程序,每個處理程序都做自己的事情。
將新網站新增至資料庫的管道如下所示:
在那之後,我添加新功能變得更加容易,例如提取和解析 Rss。 因為此過程還需要原始內容,然後 RSS 連結提取器處理程序(如 WebsiteIndexHistoryPersistor)訂閱「Content/HtmlContent」訊息,對其進行處理並沿著其管道進一步傳遞所需的訊息。
最後,我們得到了幾個守護進程,每個守護進程僅維護與必要資源的連結。 例如惡魔 爬蟲 包含所有需要存取 Internet 取得內容的處理程序以及守護進程 堅持 保持與資料庫的連線。
現在,持久化器插入後所需的 id 不再從資料庫中進行選擇,而是透過匯流排簡單地傳輸到所有感興趣的處理程序。
來源: www.habr.com