探索 Mediastreamer2 VoIP 引擎。 第 11 部分

文章素材取自我 禅频道.

探索 Mediastreamer2 VoIP 引擎。 第 11 部分

数据移动机制

  • 数据块dblk_t
  • 消息mblk_t
  • 用于处理消息 mblk_t 的函数
  • 队列queue_t
  • 使用队列queue_t的函数
  • 连接过滤器
  • 数据处理图信号点
  • 股票行情的幕后活动
  • 缓冲器(MSBufferizer)
  • 使用 MSBufferizer 的函数

在最后 文章 我们开发了自己的过滤器。 本文将重点介绍在媒体流过滤器之间移动数据的内部机制。 这将使您将来能够更轻松地编写复杂的过滤器。

数据移动机制

媒体流中的数据移动是使用结构描述的队列执行的 队列_t。 消息字符串如 mblk_t,它们本身不包含信号数据,而仅链接到上一个、下一个消息和数据块。 另外,我要特别强调的是,还有一个用于指向相同类型消息的链接的字段,它允许您组织消息的单链接列表。 我们将由这样一个列表联合起来的一组消息称为元组。 因此,队列的任何元素都可以是单个消息 mblk_t,也许是消息元组的头部 mblk_t。 每个元组消息可以有自己的病房数据块。 稍后我们将讨论为什么需要元组。

如上所述,消息本身不包含数据块;相反,它仅包含指向存储该块的内存区域的指针。 在这一部分中,媒体流媒体工作的整体画面让人想起动画片《怪物公司》中的门仓库,其中门(链接到数据室)沿着高架传送带以疯狂的速度移动,而房间本身保持一动不动。

现在,沿着层次结构从下到上,让我们详细考虑媒体流传输器中数据传输机制的列出实体。

数据块 数据库表

数据块由标头和数据缓冲区组成。 标头由以下结构描述:

typedef struct datab
{
unsigned char *db_base; // Указатель на начало буфер данных.
unsigned char *db_lim;  // Указатель на конец буфер данных.
void (*db_freefn)(void*); // Функция освобождения памяти при удалении блока.
int db_ref; // Счетчик ссылок.
} dblk_t;

该结构的字段包含指向缓冲区开头、缓冲区结尾以及删除数据缓冲区的函数的指针。 标题中的最后一个元素 数据库引用 — 引用计数器,如果它达到零,则作为从内存中删除该块的信号。 如果数据块是由函数创建的 datab_alloc() ,那么数据缓冲区将紧接在标头之后放置在内存中。 在所有其他情况下,缓冲区可以单独位于某处。 数据缓冲区将包含我们想要用滤波器处理的信号样本或其他数据。

使用以下函数创建数据块的新实例:

dblk_t *datab_alloc(int size);

作为输入参数,给出了块将存储的数据的大小。 分配更多内存,以便将标头 - 结构 - 放置在已分配内存的开头 数据库。 但当使用其他函数时,这种情况并不总是发生;在某些情况下,数据缓冲区可能与数据块头分开。 创建结构体时,配置字段,使其字段 数据库库 指向数据区的开头,并且 db_lim 直至其结束。 链接数 数据库引用 设置为一。 数据清除函数指针设置为零。

信息 mblk_t

如上所述,队列元素的类型 mblk_t, 它的定义如下:

typedef struct msgb
{
  struct msgb *b_prev;   // Указатель на предыдущий элемент списка.
  struct msgb *b_next;   // Указатель на следующий элемент списка.
  struct msgb *b_cont;   // Указатель для подклейки к сообщению других сообщений, для создания кортежа сообщений.
  struct datab *b_datap; // Указатель на структуру блока данных.
  unsigned char *b_rptr; // Указатель на начало области данных для чтения данных буфера b_datap.
  unsigned char *b_wptr; // Указатель на начало области данных для записи данных буфера b_datap.
  uint32_t reserved1;    // Зарезервированное поле1, медиастример помещает туда служебную информацию. 
  uint32_t reserved2;    // Зарезервированное поле2, медиастример помещает туда служебную информацию.
  #if defined(ORTP_TIMESTAMP)
  struct timeval timestamp;
  #endif
  ortp_recv_addr_t recv_addr;
} mblk_t;

产品管理 mblk_t 开头包含指针 b_上一个, b_下一个,这是组织双向链表(这是一个队列 队列_t).

然后是指针 b_继续,仅当消息是元组的一部分时才使用。 对于元组中的最后一条消息,该指针保持为空。

接下来我们看到一个指向数据块的指针 b_datap,该消息存在。 接下来是指向块数据缓冲区内区域的指针。 场地 b_rptr 指定将从缓冲区读取数据的位置。 场地 b_wptr 指示将执行写入缓冲区的位置。

其余字段具有服务性质,与数据传输机制的操作无关。

下面是一条带有名称的消息 m1 和数据块 d1.
探索 Mediastreamer2 VoIP 引擎。 第 11 部分
下图显示了三个消息的元组 m1, m1_1, m1_2.
探索 Mediastreamer2 VoIP 引擎。 第 11 部分

消息传递功能 mblk_t

一条新消息 mblk_t 由函数创建:

mblk_t *allocb(int size, int pri); 

她在内存中放置了一条新消息 mblk_t 具有指定大小的数据块 尺寸,第二个参数 - PRI 在此版本的库中未使用。 它应该保持为零。 在函数运行期间,将为新消息的结构分配内存,并调用该函数 mblk_init(),这将重置所创建的结构实例的所有字段,然后使用上面提到的 datab_alloc(),将创建一个数据缓冲区。 之后将配置结构中的字段:

mp->b_datap=datab;
mp->b_rptr=mp->b_wptr=datab->db_base;
mp->b_next=mp->b_prev=mp->b_cont=NULL;

在输出中,我们收到一条新消息,其中包含已初始化的字段和一个空的数据缓冲区。 要将数据添加到消息中,您需要将其复制到数据块缓冲区:

memcpy(msg->b_rptr, data, size);

哪里 data 是指向数据源的指针,并且 尺寸 - 他们的大小。
那么您需要更新指向写入点的指针,以便它再次指向缓冲区中空闲区域的开头:

msg->b_wptr = msg->b_wptr + size

如果您需要从现有缓冲区创建消息而不进行复制,请使用以下函数:

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*)); 

该函数在创建消息和数据块的结构后,将其指针配置为指向地址处的数据 BUF。 那些。 在这种情况下,数据缓冲区不位于数据块的标头字段之后,就像使用函数创建数据块时的情况一样 datab_alloc()。 传递给函数的数据缓冲区将保留在原来的位置,但在指针的帮助下,它将附加到新创建的数据块标头,并相应地附加到消息。

至一条消息 mblk_t 多个数据块可以按顺序连接起来。 这是通过以下函数完成的:

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad); 

mp — 将添加另一个数据块的消息;
data — 指向块的指针,该块的副本将添加到消息中;
尺寸 ——数据大小;
— 分配内存的大小必须沿 4 字节边界对齐的标志(填充将用零完成)。

如果现有消息数据缓冲区中有足够的空间,则新数据将粘贴在已有数据的后面。 如果消息数据缓冲区中的可用空间少于 尺寸,然后创建一个具有足够缓冲区大小的新消息,并将数据复制到其缓冲区。 这是一条新消息,使用指针链接到原始消息 b_继续。 在这种情况下,消息变成一个元组。

如果您需要向元组添加另一个数据块,那么您需要使用以下函数:

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

她会在元组中找到最后一条消息(他有 b_继续 将为空)并将调用该消息的函数 追加().

您可以使用以下函数找出消息或元组中数据的大小:

int msgdsize(const mblk_t *mp);

它将循环遍历元组中的所有消息并返回这些消息的数据缓冲区中的数据总量。 对于每条消息,数据量计算如下:

 mp->b_wptr - mp->b_rptr

要组合两个元组,请使用以下函数:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

她附加了元组 纽姆 到元组的尾部 mp 并返回指向结果元组的最后一条消息的指针。

如有必要,可以将元组转换为具有单个数据块的一条消息;这是通过以下函数完成的:

void msgpullup(mblk_t *mp,int len);

如果论证 LEN 为-1,则自动确定分配的缓冲区的大小。 如果 LEN 是正数,将创建此大小的缓冲区,并将元组消息数据复制到其中。 如果缓冲区用完,复制将在那里停止。 元组的第一条消息将接收一个新大小的缓冲区,其中包含复制的数据。 剩余的消息将被删除,内存返回到堆。

删除结构时 mblk_t 如果调用时,则考虑数据块的引用计数 释放() 结果为零,则数据缓冲区与实例一起被删除 mblk_t,它指向它。

初始化新消息的字段:

void mblk_init(mblk_t *mp);

将另一条数据添加到消息中:

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

如果新数据无法放入消息数据缓冲区的可用空间,则将具有所需大小缓冲区的单独创建的消息附加到该消息(在第一条消息中设置指向所添加消息的指针),并且消息变成一个元组。

向元组添加一条数据:

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad); 

该函数在循环中调用appendb()。

将两个元组合并为一个:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

信息 纽姆 将附于 mp.

复制单个消息:

mblk_t *copyb(const mblk_t *mp);

完整复制包含所有数据块的元组:

mblk_t *copymsg(const mblk_t *mp);

元组的元素由函数复制 复制b().

创建消息的简单副本 mblk_t。 在这种情况下,数据块没有被复制,但是它的引用计数器增加了 数据库引用:

mblk_t *dupb(mblk_t *mp);

制作元组的轻量级副本。 数据块不会被复制,只会增加它们的引用计数器 数据库引用:

mblk_t *dupmsg(mblk_t* m);

将元组的所有消息粘合到一条消息中:

void msgpullup(mblk_t *mp,size_t len);

如果论证 LEN 为-1,则自动确定分配的缓冲区的大小。

删除消息,元组:

void freemsg(mblk_t *mp);

数据块的引用计数减一。 如果达到零,数据块也会被删除。

计算消息或元组中的数据总量。

size_t msgdsize(const mblk_t *mp);

从队列尾部检索消息:

mblk_t *ms_queue_peek_last (q);

将一条消息的保留字段的内容复制到另一条消息中(事实上,这些字段包含媒体流处理程序使用的标志):

mblk_meta_copy(const mblk_t *source, mblk *dest);

队列 队列_t

媒体流中的消息队列被实现为循环双向链表。 每个列表元素都包含一个指向带有信号样本的数据块的指针。 事实证明,只有指向数据块的指针依次移动,而数据本身保持不动。 那些。 仅移动到它们的链接。
描述队列的结构 队列_t,如下图所示:

typedef struct _queue
{
   mblk_t _q_stopper; /* "Холостой" элемент очереди, не указывает на данные, используется только для управления очередью. При инициализации очереди (qinit()) его указатели настраиваются так, чтобы они указывали на него самого. */
   int q_mcount;        // Количество элементов в очереди.
} queue_t;

该结构包含一个字段 - 一个指针 _q_stopper 类型*mblk_t,它指向队列中的第一个元素(消息)。 该结构的第二个字段是队列中消息的计数器。
下图显示了一个名为 q1 的队列,其中包含 4 个消息 m1、m2、m3、m4。
探索 Mediastreamer2 VoIP 引擎。 第 11 部分
下图显示了名为 q1 的队列,其中包含 4 条消息 m1,m2,m3,m4。 消息 m2 是包含另外两条消息 m2_1 和 m2_2 的元组的头。

探索 Mediastreamer2 VoIP 引擎。 第 11 部分

使用队列queue_t的函数

队列初始化:

void qinit(queue_t *q);

领域 _q_stopper (以下我们称之为“stopper”)由函数初始化 mblk_init(),其上一个元素和下一个元素指针被调整为指向自身。 队列元素计数器重置为零。

添加新元素(消息):

void putq(queue_t *q, mblk_t *m);

新元素 m 添加到列表的末尾,调整元素指针,使 stopper 成为它的下一个元素,并且它成为 stopper 的前一个元素。 队列元素计数器递增。

从队列中检索元素:

mblk_t * getq(queue_t *q); 

检索停止器之后出现的消息,并且元素计数器递减。 如果队列中除 stopper 之外没有任何元素,则返回 0。

将消息插入队列:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp); 

元素 mp 插入到元素之前 EMP。 如果 EMP=0,则该消息被添加到队列尾部。

从队列头部检索消息:

void remq(queue_t *q, mblk_t *mp); 

元素计数器递减。

读取指向队列中第一个元素的指针:

mblk_t * peekq(queue_t *q); 

从队列中删除所有元素,同时删除元素本身:

void flushq(queue_t *q, int how);

争论 形成一种 不曾用过。 队列元素计数器设置为零。

用于读取指向队列最后一个元素的指针的宏:

mblk_t * qlast(queue_t *q);

使用消息队列时,请注意,当您调用 ms_queue_put(q, m) 如果消息的空指针为空,则该函数将循环。 你的程序将会冻结。 行为类似 ms_queue_next(q, m).

连接过滤器

上述队列用于将消息从一个过滤器传递到另一个过滤器或从一个过滤器传递到多个过滤器。 过滤器及其连接形成有向图。 滤波器的输入或输出将被称为通用词“pin”。 为了描述过滤器彼此连接的顺序,媒体流媒体使用“信号点”的概念。 信号点是结构 _MSC点,其中包含指向滤波器的指针及其引脚之一的编号;相应地,它描述了滤波器的输入或输出之一的连接。

数据处理图信号点

typedef struct _MSCPoint{
struct _MSFilter *filter; // Указатель на фильтр медиастримера.
int pin;                        // Номер одного из входов или выходов фильтра, т.е. пин.
} MSCPoint;

滤波器引脚从零开始编号。

两个引脚通过消息队列的连接由结构体描述 _MSQueue,其中包含一个消息队列和指向它连接的两个信号点的指针:

typedef struct _MSQueue
{
queue_t q;
MSCPoint prev;
MSCPoint next;
}MSQueue;

我们将这种结构称为信号链路。 每个媒体流过滤器包含一个输入链接表和一个输出链接表(消息队列)。 表的大小是在创建过滤器时设置的;我们已经使用类型的导出变量完成了此操作 MS过滤描述,当我们开发自己的过滤器时。 下面是描述媒体流中的任何过滤器的结构, 质谱过滤器:


struct _MSFilter{
    MSFilterDesc *desc;    /* Указатель на дескриптор фильтра. */
    /* Защищенные атрибуты, их нельзя сдвигать или убирать иначе будет нарушена работа с плагинами. */
    ms_mutex_t lock;      /* Семафор. */
    MSQueue **inputs;     /* Таблица входных линков. */
    MSQueue **outputs;    /* Таблица выходных линков. */
    struct _MSFactory *factory; /* Указатель на фабрику, которая создала данный экземпляр фильтра. */
    void *padding;              /* Не используется, будет задействован если добавятся защищенные поля. */
    void *data;                 /* Указатель на произвольную структуру для хранения данных внутреннего состояния фильтра и промежуточных вычислений. */
    struct _MSTicker *ticker;   /* Указатель на объект тикера, который не должен быть нулевым когда вызывается функция process(). */
    /*private attributes, they can be moved and changed at any time*/
    MSList *notify_callbacks; /* Список обратных вызовов, используемых для обработки событий фильтра. */
    uint32_t last_tick;       /* Номер последнего такта, когда выполнялся вызов process(). */ 
    MSFilterStats *stats;     /* Статистика работы фильтра.*/
    int postponed_task; /*Количество отложенных задач. Некоторые фильтры могут откладывать обработку данных (вызов process()) на несколько тактов.*/
    bool_t seen;  /* Флаг, который использует тикер, чтобы помечать что этот экземпляр фильтра он уже обслужил на данном такте.*/
};
typedef struct _MSFilter MSFilter;

当我们按照计划连接C程序中的过滤器(但没有连接股票)后,我们就创建了一个有向图,其节点是结构体的实例 质谱过滤器,边是链接的实例 消息队列.

股票行情的幕后活动

当我告诉你股票行情是对蜱虫来源的过滤器时,这并不是全部真相。 股票代码是一个在时钟上运行函数的对象 过程() 它所连接的电路(图)的所有滤波器。 当我们在 C 程序中将股票代码连接到图形过滤器时,我们会向股票代码显示从现在起它将控制的图形,直到我们将其关闭。 连接后,股票行情自动收录器开始检查委托其处理的图表,并编译包含该图表的过滤器列表。 为了不对同一过滤器“计数”两次,它通过在其中放置一个复选框来标记检测到的过滤器 看到。 使用每个过滤器具有的链接表来执行搜索。

在对图表进行介绍性浏览期间,自动收录器会检查过滤器中是否至少有一个充当数据块源。 如果没有,则图表被认为不正确并且股票行情崩溃。

如果图表结果是“正确的”,对于每个找到的过滤器,将调用该函数进行初始化 预处理()。 一旦下一个处理周期的时刻到来(默认情况下每 10 毫秒),股票代码就会调用该函数 过程() 对于所有先前找到的源过滤器,然后对于列表中的其余过滤器。 如果过滤器有输入链接,则运行该函数 过程() 重复直到输入链接队列为空。 之后,它会移至列表中的下一个过滤器并“滚动”它,直到输入链接没有消息。 股票代码从一个过滤器移动到另一个过滤器,直到列表结束。 这样就完成了循环的处理。

现在我们将回到元组并讨论为什么这样的实体被添加到媒体流媒体中。 一般来说,过滤器内部运行的算法所需的数据量不一致,并且不是输入端接收的数据缓冲区大小的倍数。 例如,我们正在编写一个执行快速傅立叶变换的滤波器,根据定义,它只能处理大小为 512 的幂的数据块。 设计数为 160。 如果数据是由电话通道生成的,那么输入处的每条消息的数据缓冲区将为我们带来XNUMX个信号样本。 在获得所需的数据量之前,人们很容易不从输入中收集数据。 但在这种情况下,会与股票代码发生冲突,股票代码将无法成功尝试滚动过滤器,直到输入链接为空。 之前,我们将此规则指定为过滤器的第三个原则。 根据这个原则,过滤器的process()函数必须从输入队列中获取所有数据。

此外,不可能仅从输入中获取 512 个样本,因为您只能获取整个块,即过滤器必须采集 640 个样本,并使用其中的 512 个样本,剩余的样本将在累积新的数据部分之前使用。 因此,我们的过滤器除了其主要工作之外,还必须为输入数据的中间存储提供辅助操作。 媒体流和解决这个普遍问题的开发人员开发了一个特殊的对象 - MSBufferizer(缓冲区),它使用元组解决了这个问题。

缓冲器(MSBufferizer)

这是一个对象,它将在过滤器内累积输入数据,并在信息量足以运行过滤器算法后立即开始提交数据进行处理。 当缓冲区积累数据时,过滤器将在空闲模式下运行,而不会耗尽处理器的处理能力。 但是,一旦从缓冲区的读取函数返回一个非零值,过滤器的 process() 函数就开始按所需大小的部分从缓冲区获取和处理数据,直到数据耗尽。
尚未需要的数据作为元组的第一个元素保留在缓冲区中,随后的输入数据块将附加到该元素。

描述缓冲区的结构:

struct _MSBufferizer{
queue_t q; /* Очередь сообщений. */
int size; /* Суммарный размер данных находящихся в буферизаторе в данный момент. */
};
typedef struct _MSBufferizer MSBufferizer;

使用 MSBufferizer 的函数

创建一个新的缓冲区实例:

MSBufferizer * ms_bufferizer_new(void);

内存被分配,初始化于 ms_bufferizer_init() 并返回一个指针。

初始化函数:

void ms_bufferizer_init(MSBufferizer *obj); 

队列正在初始化 q, 场地 尺寸 被设置为零。

添加消息:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m); 

消息 m 被添加到队列中。 将计算出的数据块大小添加到 尺寸.

将所有消息从链路数据队列传输到缓冲区 q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);   

从链接传输消息 q 在缓冲区中使用函数执行 ms_bufferizer_put().

从缓冲区读取:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen); 

如果缓冲区中累积的数据大小小于请求的大小(数据伦),则函数返回零,数据不复制到数据。 否则,执行来自位于缓冲区中的元组的数据的顺序复制。 复制后,元组被删除并释放内存。 当 datalen 字节被复制时,复制结束。 如果数据块中间的空间用完,则在该消息中,数据块将被缩减为剩余未复制的部分。 下次调用时,复制将从此时开始继续。

读取缓冲区中当前可用的数据量:

int ms_bufferizer_get_avail(MSBufferizer *obj); 

返回字段 尺寸 缓冲器。

丢弃缓冲区中的部分数据:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

检索并丢弃指定数量的数据字节。 最旧的数据将被丢弃。

删除缓冲区中的所有消息:

void ms_bufferizer_flush(MSBufferizer *obj); 

数据计数器重置为零。

删除缓冲区中的所有消息:

void ms_bufferizer_uninit(MSBufferizer *obj); 

计数器不重置。

删除缓冲区并释放内存:

void ms_bufferizer_destroy(MSBufferizer *obj);  

使用缓冲区的示例可以在几个媒体流过滤器的源代码中找到。 例如,在 MS_L16_ENC 过滤器中,它将样本中的字节从网络顺序重新排列为主机顺序: l16.c

在下一篇文章中,我们将研究估计股票代码负载的问题以及解决媒体流媒体中过多计算负载的方法。

来源: habr.com

添加评论