将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

前言

我的网站是我的业余爱好,旨在托管有趣的主页和个人网站。 在我的编程之旅一开始,我就开始对这个话题感兴趣;在那一刻,我着迷于寻找那些写下自己、他们的爱好和项目的伟大专业人士。 自己发现它们的习惯至今仍然存在:在几乎所有商业网站和非商业网站上,我继续在页脚中查找作者的链接。

想法的实施

第一个版本只是我个人网站上的一个 html 页面,我将带有签名的链接放入 ul 列表中。 在一段时间内打了 20 页后,我开始认为这不是很有效,并决定尝试自动化该过程。 在 stackoverflow 上,我注意到很多人在他们的个人资料中指明了站点,所以我用 php 编写了一个解析器,它只是简单地浏览了个人资料,从第一个开始(到目前为止的地址是这样的:`/users/1` ),从所需标签中提取链接并将其添加到 SQLite 中。

这可以称为第二个版本:在 SQLite 表中收集数万个 URL,它取代了 HTML 中的静态列表。 我对此列表进行了简单的搜索。 因为只有 URL,然后搜索就只是基于它们。

在这个阶段我放弃了这个项目并在很长一段时间后重新开始。 这个阶段我的工作经验已经三年多了,我觉得自己可以做一些更认真的事情。 此外,人们非常渴望掌握相对较新的技术。

现代版

项目 部署在Docker中,数据库转移到了mongoDb,最近又加了萝卜,一开始只是为了缓存。 使用 PHP 微框架之一作为基础。

问题

新站点通过控制台命令添加,该命令同步执行以下操作:

  • 通过 URL 下载内容
  • 设置一个标志来指示 HTTPS 是否可用
  • 保留网站的本质
  • 源 HTML 和标头保存在“索引”历史记录中
  • 解析内容,提取标题和描述
  • 将数据保存到单独的集合中

这足以简单地存储站点并将其显示在列表中:

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

但自动对所有内容进行索引、分类和排名、使所有内容保持最新的想法并不适合这种范式。 即使只是简单地添加一个 Web 方法来添加页面,也需要进行代码重复和阻止,以避免潜在的 DDoS。

当然,一般来说,一切都可以同步完成,并且在 Web 方法中,您可以简单地保存 URL,以便可怕的守护进程执行列表中 URL 的所有任务。 但即使在这里,“队列”这个词还是很明显的。 而如果实现了队列,那么所有任务至少可以被划分并异步执行。

实现队列并创建一个事件驱动系统来处理所有任务。 我很早就想尝试 Redis Streams。

在 PHP 中使用 Redis 流

因为由于我的框架不是 Symfony、Laravel、Yii 三大巨头之一,所以我想找一个独立的库。 但是,事实证明(第一次检查),不可能找到个别严肃的图书馆。 与队列相关的所有内容要么是五年前 3 次提交的项目,要么与框架相关。

我听说过很多关于 Symfony 作为单个有用组件的供应商的消息,并且我已经使用了其中的一些组件。 而且 Laravel 的一些东西也可以使用,例如它们的 ORM,而无需框架本身。

交响乐/信使

第一个候选者立即看起来很理想,毫无疑问我安装了它。 但事实证明,用 google 搜索 Symfony 之外的使用示例更加困难。 如何从一堆具有通用、无意义名称的类、用于传递消息的总线、甚至在 Redis 上组装起来?

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

官方网站上的文档非常详细,但仅描述了 Symfony 的初始化,使用他们最喜欢的 YML 和其他非交响乐演奏家的魔术方法。 我对安装过程本身没有兴趣,尤其是在新年假期期间。 但我不得不这样做出乎意料的很长一段时间。

在紧迫的期限内,尝试弄清楚如何使用 Symfony 源代码实例化系统也不是最简单的任务:

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

在深入研究这一切并尝试用手做一些事情之后,我得出的结论是我正在做某种拐杖,并决定尝试其他事情。

照明/队列

事实证明,这个库与 Laravel 基础设施和一堆其他依赖项紧密相关,所以我没有花太多时间在它上面:我安装了它,查看了它,查看了依赖项并删除了它。

yiisoft/yii2-队列

好吧,这里从名字上就可以立即看出它与 Yii2 的严格联系。 我不得不使用这个库,它还不错,但我没有想到它完全依赖于 Yii2。

其余的

我在 GitHub 上发现的其他所有内容都是不可靠、过时和废弃的项目,没有星星、分叉和大量提交。

返回 symfony/messenger,技术细节

我必须弄清楚这个图书馆,在花了更多时间之后,我终于做到了。 事实证明,一切都非常简洁和简单。 为了实例化总线,我做了一个小工厂,因为...... 我应该有几个轮胎和不同的处理程序。

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

只需几个步骤:

  • 我们创建应该可以简单调用的消息处理程序
  • 我们将它们包装在 HandlerDescriptor (库中的类)中
  • 我们将这些“描述符”包装在 HandlersLocator 实例中
  • 将 HandlersLocator 添加到 MessageBus 实例
  • 我们将一组“SenderInterface”传递给 SendersLocator,在我的例子中是“RedisTransport”类的实例,这些实例以明显的方式进行配置
  • 将 SendersLocator 添加到 MessageBus 实例

MessageBus 有一个 `->dispatch()` 方法,它在 HandlersLocator 中查找适当的处理程序并将消息传递给它们,使用相应的 `SenderInterface` 通过总线(Redis 流)发送。

在容器配置中(在本例中为 php-di),整个包可以这样配置:

        CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
            return new RedisTransport(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
                $c->get(CONTAINER_SERIALIZER))
            ;
        },
        CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
            return new RedisTransport(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
                $c->get(CONTAINER_SERIALIZER))
            ;
        },
        CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
            return new RedisReceiver(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
                $c->get(CONTAINER_SERIALIZER)
            );
        },
        CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
            return new RedisReceiver(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
                $c->get(CONTAINER_SERIALIZER)
            );
        },
        CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
            $sendersLocator = new SendersLocator([
                AppMessagesSecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
                AppMessagesDaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
            ], $c);
            $middleware[] = new SendMessageMiddleware($sendersLocator);

            return new MessageBus($middleware);
        },
        CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
            $host = 'bu-02-redis';
            $port = 6379;
            $dsn = "redis://$host:$port";
            $options = [
                'stream' => 'secret',
                'group' => 'default',
                'consumer' => 'default',
            ];

            return Connection::fromDsn($dsn, $options);
        },
        CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
            $host = 'bu-02-redis';
            $port = 6379;
            $dsn = "redis://$host:$port";
            $options = [
                'stream' => 'log',
                'group' => 'default',
                'consumer' => 'default',
            ];

            return Connection::fromDsn($dsn, $options);
        },

在这里您可以看到,在 SendersLocator 中,我们为两个不同的消息分配了不同的“传输”,每个消息都有自己到相应流的连接。

我制作了一个单独的演示项目,演示了三个守护进程使用以下总线相互通信的应用程序: https://github.com/backend-university/products/tree/master/products/02-redis-streams-bus.

但我将向您展示如何构建消费者:

use AppMessagesDaemonLogMessage;
use SymfonyComponentMessengerHandlerHandlerDescriptor;
use SymfonyComponentMessengerHandlerHandlersLocator;
use SymfonyComponentMessengerMessageBus;
use SymfonyComponentMessengerMiddlewareHandleMessageMiddleware;
use SymfonyComponentMessengerMiddlewareSendMessageMiddleware;
use SymfonyComponentMessengerTransportSenderSendersLocator;

require_once __DIR__ . '/../vendor/autoload.php';
/** @var PsrContainerContainerInterface $container */
$container = require_once('config/container.php');

$handlers = [
    DaemonLogMessage::class => [
        new HandlerDescriptor(
            function (DaemonLogMessage $m) {
                error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
            },
            ['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
        )
    ],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);

$bus = new MessageBus($middleware);
$receivers = [
    CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new SymfonyComponentMessengerWorker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();

在应用程序中使用此基础设施

在我的后端实现总线后,我将各个阶段与旧的同步命令分开,并创建了单独的处理程序,每个处理程序都做自己的事情。

将新站点添加到数据库的管道如下所示:

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

在那之后,我添加新功能变得更加容易,例如提取和解析 Rss。 因为此过程还需要原始内容,然后 RSS 链接提取器处理程序(如 WebsiteIndexHistoryPersistor)订阅“Content/HtmlContent”消息,对其进行处理并沿着其管道进一步传递所需的消息。

将 PHP 后端转移到 Redis 流总线并选择独立于框架的库

最后,我们得到了几个守护进程,每个守护进程仅维护与必要资源的连接。 例如恶魔 爬虫 包含需要访问 Internet 获取内容的所有处理程序以及守护进程 坚持 保持与数据库的连接。

现在,持久化器插入后所需的 id 不再从数据库中进行选择,而是通过总线简单地传输到所有感兴趣的处理程序。

来源: habr.com

添加评论