前言
我的网站是我的业余爱好,旨在托管有趣的主页和个人网站。 在我的编程之旅一开始,我就开始对这个话题感兴趣;在那一刻,我着迷于寻找那些写下自己、他们的爱好和项目的伟大专业人士。 自己发现它们的习惯至今仍然存在:在几乎所有商业网站和非商业网站上,我继续在页脚中查找作者的链接。
想法的实施
第一个版本只是我个人网站上的一个 html 页面,我将带有签名的链接放入 ul 列表中。 在一段时间内打了 20 页后,我开始认为这不是很有效,并决定尝试自动化该过程。 在 stackoverflow 上,我注意到很多人在他们的个人资料中指明了站点,所以我用 php 编写了一个解析器,它只是简单地浏览了个人资料,从第一个开始(到目前为止的地址是这样的:`/users/1` ),从所需标签中提取链接并将其添加到 SQLite 中。
这可以称为第二个版本:在 SQLite 表中收集数万个 URL,它取代了 HTML 中的静态列表。 我对此列表进行了简单的搜索。 因为只有 URL,然后搜索就只是基于它们。
在这个阶段我放弃了这个项目并在很长一段时间后重新开始。 这个阶段我的工作经验已经三年多了,我觉得自己可以做一些更认真的事情。 此外,人们非常渴望掌握相对较新的技术。
现代版
问题
新站点通过控制台命令添加,该命令同步执行以下操作:
- 通过 URL 下载内容
- 设置一个标志来指示 HTTPS 是否可用
- 保留网站的本质
- 源 HTML 和标头保存在“索引”历史记录中
- 解析内容,提取标题和描述
- 将数据保存到单独的集合中
这足以简单地存储站点并将其显示在列表中:
但自动对所有内容进行索引、分类和排名、使所有内容保持最新的想法并不适合这种范式。 即使只是简单地添加一个 Web 方法来添加页面,也需要进行代码重复和阻止,以避免潜在的 DDoS。
当然,一般来说,一切都可以同步完成,并且在 Web 方法中,您可以简单地保存 URL,以便可怕的守护进程执行列表中 URL 的所有任务。 但即使在这里,“队列”这个词还是很明显的。 而如果实现了队列,那么所有任务至少可以被划分并异步执行。
解
实现队列并创建一个事件驱动系统来处理所有任务。 我很早就想尝试 Redis Streams。
在 PHP 中使用 Redis 流
因为由于我的框架不是 Symfony、Laravel、Yii 三大巨头之一,所以我想找一个独立的库。 但是,事实证明(第一次检查),不可能找到个别严肃的图书馆。 与队列相关的所有内容要么是五年前 3 次提交的项目,要么与框架相关。
我听说过很多关于 Symfony 作为单个有用组件的供应商的消息,并且我已经使用了其中的一些组件。 而且 Laravel 的一些东西也可以使用,例如它们的 ORM,而无需框架本身。
交响乐/信使
第一个候选者立即看起来很理想,毫无疑问我安装了它。 但事实证明,用 google 搜索 Symfony 之外的使用示例更加困难。 如何从一堆具有通用、无意义名称的类、用于传递消息的总线、甚至在 Redis 上组装起来?
官方网站上的文档非常详细,但仅描述了 Symfony 的初始化,使用他们最喜欢的 YML 和其他非交响乐演奏家的魔术方法。 我对安装过程本身没有兴趣,尤其是在新年假期期间。 但我不得不这样做出乎意料的很长一段时间。
在紧迫的期限内,尝试弄清楚如何使用 Symfony 源代码实例化系统也不是最简单的任务:
在深入研究这一切并尝试用手做一些事情之后,我得出的结论是我正在做某种拐杖,并决定尝试其他事情。
照明/队列
事实证明,这个库与 Laravel 基础设施和一堆其他依赖项紧密相关,所以我没有花太多时间在它上面:我安装了它,查看了它,查看了依赖项并删除了它。
yiisoft/yii2-队列
好吧,这里从名字上就可以立即看出它与 Yii2 的严格联系。 我不得不使用这个库,它还不错,但我没有想到它完全依赖于 Yii2。
其余的
我在 GitHub 上发现的其他所有内容都是不可靠、过时和废弃的项目,没有星星、分叉和大量提交。
返回 symfony/messenger,技术细节
我必须弄清楚这个图书馆,在花了更多时间之后,我终于做到了。 事实证明,一切都非常简洁和简单。 为了实例化总线,我做了一个小工厂,因为...... 我应该有几个轮胎和不同的处理程序。
只需几个步骤:
- 我们创建应该可以简单调用的消息处理程序
- 我们将它们包装在 HandlerDescriptor (库中的类)中
- 我们将这些“描述符”包装在 HandlersLocator 实例中
- 将 HandlersLocator 添加到 MessageBus 实例
- 我们将一组“SenderInterface”传递给 SendersLocator,在我的例子中是“RedisTransport”类的实例,这些实例以明显的方式进行配置
- 将 SendersLocator 添加到 MessageBus 实例
MessageBus 有一个 `->dispatch()` 方法,它在 HandlersLocator 中查找适当的处理程序并将消息传递给它们,使用相应的 `SenderInterface` 通过总线(Redis 流)发送。
在容器配置中(在本例中为 php-di),整个包可以这样配置:
CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
$sendersLocator = new SendersLocator([
AppMessagesSecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
AppMessagesDaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
], $c);
$middleware[] = new SendMessageMiddleware($sendersLocator);
return new MessageBus($middleware);
},
CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'secret',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'log',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
在这里您可以看到,在 SendersLocator 中,我们为两个不同的消息分配了不同的“传输”,每个消息都有自己到相应流的连接。
我制作了一个单独的演示项目,演示了三个守护进程使用以下总线相互通信的应用程序:
但我将向您展示如何构建消费者:
use AppMessagesDaemonLogMessage;
use SymfonyComponentMessengerHandlerHandlerDescriptor;
use SymfonyComponentMessengerHandlerHandlersLocator;
use SymfonyComponentMessengerMessageBus;
use SymfonyComponentMessengerMiddlewareHandleMessageMiddleware;
use SymfonyComponentMessengerMiddlewareSendMessageMiddleware;
use SymfonyComponentMessengerTransportSenderSendersLocator;
require_once __DIR__ . '/../vendor/autoload.php';
/** @var PsrContainerContainerInterface $container */
$container = require_once('config/container.php');
$handlers = [
DaemonLogMessage::class => [
new HandlerDescriptor(
function (DaemonLogMessage $m) {
error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
},
['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
)
],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);
$bus = new MessageBus($middleware);
$receivers = [
CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new SymfonyComponentMessengerWorker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();
在应用程序中使用此基础设施
在我的后端实现总线后,我将各个阶段与旧的同步命令分开,并创建了单独的处理程序,每个处理程序都做自己的事情。
将新站点添加到数据库的管道如下所示:
在那之后,我添加新功能变得更加容易,例如提取和解析 Rss。 因为此过程还需要原始内容,然后 RSS 链接提取器处理程序(如 WebsiteIndexHistoryPersistor)订阅“Content/HtmlContent”消息,对其进行处理并沿着其管道进一步传递所需的消息。
最后,我们得到了几个守护进程,每个守护进程仅维护与必要资源的连接。 例如恶魔 爬虫 包含需要访问 Internet 获取内容的所有处理程序以及守护进程 坚持 保持与数据库的连接。
现在,持久化器插入后所需的 id 不再从数据库中进行选择,而是通过总线简单地传输到所有感兴趣的处理程序。
来源: habr.com