Изучаем VoIP-движок Mediastreamer2. Часть 11

Материал статьи взят с моего дзен-канала.

Изучаем VoIP-движок Mediastreamer2. Часть 11

Механизм перемещения данных

  • Блок данных dblk_t
  • Сообщение mblk_t
  • Функции работы с сообщениями mblk_t
  • Очередь queue_t
  • Функции работы с очередями queue_t
  • Соединение фильтров
  • Сигнальная точка графа обработки данных
  • Закулисная деятельность тикера
  • Буферизатор (MSBufferizer)
  • Функции работы с MSBufferizer

В прошлой статье мы разработали свой собственный фильтр. Эту статью мы посветим устройству внутреннего механизма перемещения данных между фильтрами медиастримера. Это позволит в дальнейшем писать изощренные фильтры с меньшими усилиями.

Механизм перемещения данных

Перемещение данных в медиастримере выполняется с помощью очередей описываемых структурой queue_t. По очередям перемещаются вереницы сообщений типа mblk_t, которые сами по себе не содержат данных сигнала, а только ссылки на предыдущее, следующее сообщение и на блок данных. Кроме этого, хочу подчеркнуть особо, есть еще поле для ссылки на сообщение такого же типа, которое позволяет организовать односвязный список из сообщений. Группу сообщений, объединенных таким списком, будем называть кортежем. Таким образом, любой элемент очереди может оказаться одиночным сообщением mblk_t, а может и головой кортежа сообщений mblk_t. Каждое сообщение кортежа может иметь свой подопечный блок данных. Зачем нужны кортежи мы обсудим немного позже.

Как было сказано выше, само по себе сообщение не содержит блок данных, вместо этого оно содержит только указатель на область памяти где хранится блок. В этой части общая картина работы медиастримера напоминает склад дверей в мультфильме "Корпорация монстров", где двери (ссылки на данные — комнаты) с безумной скоростью движутся по подвесным конвейерам, при этом сами комнаты остаются неподвижными.

Теперь, двигаясь по иерархии снизу вверх, рассмотрим детально перечисленные сущности механизма передачи данных в медиастримере.

Блок данных dblk_t

Блок данных состоит из заголовка и буфера данных. Заголовок описывается следующей структурой,

typedef struct datab
{
unsigned char *db_base; // Указатель на начало буфер данных.
unsigned char *db_lim;  // Указатель на конец буфер данных.
void (*db_freefn)(void*); // Функция освобождения памяти при удалении блока.
int db_ref; // Счетчик ссылок.
} dblk_t;

Поля структуры содержат указатели на начало буфера, конец буфера, функцию удаления буфера данных. Последний элемент в заголовке db_ref — счетчик ссылок, если он достигает нуля, это служит сигналом к удалению данного блока из памяти. Если блок данных был создан функцией datab_alloc() , то буфер данных будет размешен в памяти сразу после заголовка. Во всех других случаях буфер может располагаться где-то отдельно. В буфере данных будут располагаться отсчеты сигнала или другие данные, которые мы хотим обрабатывать фильтрами.

Новый экземпляр блока данных создается с помощью функции:

dblk_t *datab_alloc(int size);

В качестве входного параметра ей передается размер данных, которые будет хранить блок. Памяти выделяется больше, чтобы в начале выделенной памяти разместить заголовок — структуру datab. Но при использовании других функций так происходит не всегда, в некоторых случаях буфер данных может располагаться отдельно от заголовка блока данных. Поля структуры при создании настраиваются так, чтобы её поле db_base указывало на начало области данных, а db_lim на её конец. Счетчик ссылок db_ref устанавливается в единицу. Указатель функции очистки данных устанавливается в ноль.

Сообщение mblk_t

Как было сказано, элементы очереди имеют тип mblk_t, он определен следующим образом:

typedef struct msgb
{
  struct msgb *b_prev;   // Указатель на предыдущий элемент списка.
  struct msgb *b_next;   // Указатель на следующий элемент списка.
  struct msgb *b_cont;   // Указатель для подклейки к сообщению других сообщений, для создания кортежа сообщений.
  struct datab *b_datap; // Указатель на структуру блока данных.
  unsigned char *b_rptr; // Указатель на начало области данных для чтения данных буфера b_datap.
  unsigned char *b_wptr; // Указатель на начало области данных для записи данных буфера b_datap.
  uint32_t reserved1;    // Зарезервированное поле1, медиастример помещает туда служебную информацию. 
  uint32_t reserved2;    // Зарезервированное поле2, медиастример помещает туда служебную информацию.
  #if defined(ORTP_TIMESTAMP)
  struct timeval timestamp;
  #endif
  ortp_recv_addr_t recv_addr;
} mblk_t;

Структура mblk_t в начале содержит указатели b_prev, b_next, которые необходимы для организации двусвязного списка (которым является очередь queue_t).

Затем идет указатель b_cont, который используется только тогда, когда сообщение входит в кортеж. Для последнего сообщения в кортеже это указатель остается нулевым.

Далее мы видим указатель на блок данных b_datap, ради которого и существует сообщение. За ним идут указатели, на область внутри буфера данных блока. Поле b_rptr указывает место, с которого будут читаться данные из буфера. Поле b_wptr указывает место, с которого будут выполняться запись в буфер.

Оставшиеся поля носят служебный характер и не относятся к работе механизма передачи данных.

Ниже показано одиночное сообщение с именем m1 и блоком данных d1.
Изучаем VoIP-движок Mediastreamer2. Часть 11
На следующем рисунке изображен кортеж из трех сообщений m1, m1_1, m1_2.
Изучаем VoIP-движок Mediastreamer2. Часть 11

Функции работы с сообщениями mblk_t

Новое сообщение mblk_t создается функцией:

mblk_t *allocb(int size, int pri); 

она размещает в памяти новое сообщение mblk_t с блоком данных указанного размера size, второй аргумент — pri не используется в рассматриваемой версии библиотеки. Он должен оставаться нулевым. В ходе работы функции будет выделена память под структуру нового сообщения и вызвана функция mblk_init(), которая обнулит все поля созданного экземпляра структуры и затем, с помощью упомянутой выше datab_alloc(), создаст буфер данных. После чего будет выполнена настройка полей в структуре:

mp->b_datap=datab;
mp->b_rptr=mp->b_wptr=datab->db_base;
mp->b_next=mp->b_prev=mp->b_cont=NULL;

На выходе получаем новое сообщение с инициализированными полями и пустым буфером данных. Чтобы добавить в сообщение данные, нужно выполнить их копирование в буфер блока данных:

memcpy(msg->b_rptr, data, size);

где data — указатель на источник данных, а size — их размер.
затем нужно обновить указатель на точку записи, чтобы он снова указывал на начало свободной области в буфере:

msg->b_wptr = msg->b_wptr + size

Если требуется создать сообщение из уже имеющегося буфера, без копирования, то для этого используется функция:

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*)); 

Функция после создания сообщения и структуры блока данных, настроит его указатели на данные по адресу buf. Т.е. в данном случае буфер данных не располагается вслед за полями заголовка блока данных, как это было при создании блока данных функцией datab_alloc(). Переданный функции буфер с данными останется там где был, но с помощью указателей будет подстегнут к только что созданному заголовку блока данных, а тот соответственно к сообщению.

К одному сообщению mblk_t могут быть последовательно прицеплены несколько блоков данных. Это делается функцией:

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad); 

mp — сообщение к которому будет добавлен еще один блок данных;
data — указатель на блок, копия которого будет добавлена в сообщение;
size — размер данных;
pad — флаг того, что размер выделяемой памяти должен быть выравнен по границе 4 байт (дополнение будет выполнено нулями).

Если в имеющемся буфере данных сообщения достаточно места, то новые данные будут подклеены за уже имеющимися там данными. Если свободного места в буфере данных сообщения окажется меньше чем size, то создается новое сообщение, с достаточным размером буфера и данные копируются в его буфер. Это новое сообщение, подцепляется к исходному с помощью указателя b_cont. В этом случае сообщение превращается в кортеж.

Если в кортеж требуется добавить еще один блок данных, то нужно использовать функцию:

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

она отыщет последнее сообщение в кортеже (у него b_cont будет нулевым) и вызовет для этого сообщения функцию appendb().

Узнать размер данных в сообщении или кортеже можно с помощью функции:

int msgdsize(const mblk_t *mp);

она пробежится по всем сообщениям кортежа и вернет суммарное количество данных в буферах данных этих сообщений. Для каждого сообщения количество данных вычисляется так:

 mp->b_wptr - mp->b_rptr

Чтобы объединить два кортежа применяется функция:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

она присоединяет кортеж newm в хвост кортежа mp и возвращает указатель на последнее сообщение получившегося кортежа.

При необходимости, кортеж можно превратить в одно сообщение с единым блоком данных, это делается функцией:

void msgpullup(mblk_t *mp,int len);

если аргумент len равен -1, то размер отводимого буфера определяется автоматически. Если len положительное число, то будет создан буфер этого размера и в него будут скопированы данные сообщений кортежа. Если буфер закончится, то копирование будет на этом прекращено. Первое сообщение кортежа получит буфер нового размера со скопированными данными. Остальные сообщения будут удалены, а память возвращена в кучу.

При удалении структуры mblk_t учитывается счетчик ссылок блока данных, если при вызове freeb() он оказывается равен нулю, то буфер данных удаляется вместе с экземпляром mblk_t, который на него указывает.

Инициализация полей нового сообщения:

void mblk_init(mblk_t *mp);

Добавление в сообщение еще одной порции данных:

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

Если новые данные не помещаются в свободное место буфера данных сообщения, то к сообщению прицепляется отдельно созданное сообщение с буфером нужного размера (в первом сообщении устанавливается указатель на добавленное сообщение) сообщение превращается в кортеж.

Добавление порции данных в кортеж:

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad); 

Функция вызывает appendb() в цикле.

Объединение двух кортежей в один:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

Сообщение newm будет присоединено к mp.

Создание копии одиночного сообщения:

mblk_t *copyb(const mblk_t *mp);

Полное копирование кортежа со всеми блоками данных:

mblk_t *copymsg(const mblk_t *mp);

Элементы кортежа при этом копируются функцией copyb().

Создание легкой копии сообщения mblk_t. При этом блок данных не копируется, а увеличивается счетчик его ссылок db_ref:

mblk_t *dupb(mblk_t *mp);

Создание легкой копии кортежа. Блоки данных не копируются, только увеличиваются их счетчики ссылок db_ref:

mblk_t *dupmsg(mblk_t* m);

Склейка всех сообщений кортежа в одно сообщение:

void msgpullup(mblk_t *mp,size_t len);

Если аргумент len равен -1, то размер отводимого буфера определяется автоматически.

Удаление сообщения, кортежа:

void freemsg(mblk_t *mp);

Счетчик ссылок блока данных уменьшается на единицу. Если он при этом достигает нуля, то блок данных тоже удаляется.

Подсчет суммарной объем данных в сообщении или кортеже.

size_t msgdsize(const mblk_t *mp);

Извлечение сообщения из хвоста очереди:

mblk_t *ms_queue_peek_last (q);

Копирование содержимого зарезервированных полей одного сообщения в другое сообщение(на самом деле в этих полях находятся флаги, которые используются медиастримером):

mblk_meta_copy(const mblk_t *source, mblk *dest);

Очередь queue_t

Очередь сообщений в медиастримере реализована как кольцевой двусвязный список. Каждый элемент списка содержит указатель на блок данных с отсчетами сигнала. Получается, что по очереди перемещаются только указатели на блок данных, в то время как сами данные остаются неподвижными. Т.е. перемещаются только ссылки на них.
Структура описывающая очередь queue_t, показана ниже:

typedef struct _queue
{
   mblk_t _q_stopper; /* "Холостой" элемент очереди, не указывает на данные, используется только для управления очередью. При инициализации очереди (qinit()) его указатели настраиваются так, чтобы они указывали на него самого. */
   int q_mcount;        // Количество элементов в очереди.
} queue_t;

Структура содержит поле — указатель _q_stopper типа *mblk_t, он указывает на первый элемент (сообщение) в очереди. Второе поле структуры это счетчик сообщений, находящихся в очереди.
На рисунке ниже показана очередь c именем q1, содержащая 4 сообщения m1,m2, m3, m4.
Изучаем VoIP-движок Mediastreamer2. Часть 11
На следующем рисунке показана очередь c именем q1, содержащая 4 сообщения m1,m2, m3, m4. Сообщение m2 является головой кортежа, в который водят еще два сообщения m2_1 и m2_2.

Изучаем VoIP-движок Mediastreamer2. Часть 11

Функции работы с очередями queue_t

Инициализацию очереди:

void qinit(queue_t *q);

Поле _q_stopper (далее будем называть его "стопор") инициализируется функцией mblk_init(), его указатель предыдущего элемента и следующего элемента настраиваются так, чтобы они показывали на него самого. Счетчик элементов очереди обнуляется.

Добавление нового элемента (сообщения):

void putq(queue_t *q, mblk_t *m);

Новый элемент m добавляется в конец списка, указатели элемента настраиваются так, чтобы стопор становился для него следующим элементом, а он для стопора предыдущим. Инкрементируется счетчик элементов очереди.

Извлечение элемента из очереди:

mblk_t * getq(queue_t *q); 

извлекается то сообщение, которое стоит после стопора, счетчик элементов декрементируется. Если в очереди, кроме стопора, элементов нет, то возвращается 0.

Вставка сообщения в очередь:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp); 

Элемент mp вставляется перед элементом emp. Если emp=0, то сообщение добавляется в хвост очереди.

Извлечение сообщения из головы очереди:

void remq(queue_t *q, mblk_t *mp); 

Счетчик элементов декрементируется.

Чтение указателя на первый элемент в очереди:

mblk_t * peekq(queue_t *q); 

Удаление всех элементов из очереди с удалением самих элементов:

void flushq(queue_t *q, int how);

Аргумент how не используется. Счетчик элементов очереди устанавливается в ноль.

Макрос чтения указателя на последний элемент очереди:

mblk_t * qlast(queue_t *q);

При работе с очередями сообщений следует иметь в виду, что при вызове ms_queue_put(q, m) с нулевым указателем на сообщение, функция зацикливается. Ваша программа зависнет. Аналогично ведет себя ms_queue_next(q, m).

Соединение фильтров

Описанная выше очередь используются для передачи сообщений от одного фильтра к другому или от одного к нескольким фильтрам. Фильтры и их соединения между собой образуют направленный граф. Вход или выход фильтра будем называть обобщающим словом "пин". Для описания порядка соединений фильтров между собой, в медиастримере используется понятие "сигнальной точки". Сигнальная точка это структура _MSCPoint, которая содержит указатель на фильтр и номер одного из его пинов, соответственно она описывает соединение одного из входов или выходов фильтра.

Сигнальная точка графа обработки данных

typedef struct _MSCPoint{
struct _MSFilter *filter; // Указатель на фильтр медиастримера.
int pin;                        // Номер одного из входов или выходов фильтра, т.е. пин.
} MSCPoint;

Пины фильтров нумеруются начиная с нуля.

Соединение двух пинов очередью сообщений описывается структурой _MSQueue, которая содержит очередь сообщений и указатели на две сигнальные точки, которые она соединяет:

typedef struct _MSQueue
{
queue_t q;
MSCPoint prev;
MSCPoint next;
}MSQueue;

Будем называть эту структуру сигнальным линком. Каждый фильтр медиастримера, содержит таблицу входных линков и таблицу выходных линков (MSQueue). Размер таблиц задается при создании фильтра, мы это уже делали с помощью экспортируемой переменной типа MSFilterDesc, когда разрабатывали наш собственный фильтр. Ниже показана структура описывающая любой фильтр в медиастримере, MSFilter:


struct _MSFilter{
    MSFilterDesc *desc;    /* Указатель на дескриптор фильтра. */
    /* Защищенные атрибуты, их нельзя сдвигать или убирать иначе будет нарушена работа с плагинами. */
    ms_mutex_t lock;      /* Семафор. */
    MSQueue **inputs;     /* Таблица входных линков. */
    MSQueue **outputs;    /* Таблица выходных линков. */
    struct _MSFactory *factory; /* Указатель на фабрику, которая создала данный экземпляр фильтра. */
    void *padding;              /* Не используется, будет задействован если добавятся защищенные поля. */
    void *data;                 /* Указатель на произвольную структуру для хранения данных внутреннего состояния фильтра и промежуточных вычислений. */
    struct _MSTicker *ticker;   /* Указатель на объект тикера, который не должен быть нулевым когда вызывается функция process(). */
    /*private attributes, they can be moved and changed at any time*/
    MSList *notify_callbacks; /* Список обратных вызовов, используемых для обработки событий фильтра. */
    uint32_t last_tick;       /* Номер последнего такта, когда выполнялся вызов process(). */ 
    MSFilterStats *stats;     /* Статистика работы фильтра.*/
    int postponed_task; /*Количество отложенных задач. Некоторые фильтры могут откладывать обработку данных (вызов process()) на несколько тактов.*/
    bool_t seen;  /* Флаг, который использует тикер, чтобы помечать что этот экземпляр фильтра он уже обслужил на данном такте.*/
};
typedef struct _MSFilter MSFilter;

После того, как мы в Си-программе соединили фильтры в соответствии с нашим замыслом (но не подключили тикер), мы тем самым создали направленный граф, узлы которого, это экземпляры структуры MSFilter, а ребра это экземпляры линков MSQueue.

Закулисная деятельность тикера

Когда я говорил вам что тикер — это фильтр источник тактов, то была не вся правда о нем. Тикер это объект, который по часам выполняет запуск функций process() всех фильтров схемы (графа) к которой он подключен. Когда мы в Си-программе подключаем тикер к фильтру графа, мы показываем тикеру граф, которым он с этого момента будет управлять, пока мы его не отключим. После подключения, тикер начинает осматривать вверенный ему на попечение граф, составляя список фильтров в которые в него входят. Чтобы не "сосчитать" один и тот же фильтр дважды он помечает обнаруженные фильтры, устанавливая в них флажок seen. Поиск осуществляется по таблицам линков, которые есть у каждого фильтра.

Во время свое ознакомительной экскурсии по графу, тикер проверяет есть ли среди фильтров, хотя бы один, который выполняет роль источника блоков данных. Если таковых не находится, то граф признается неправильным и тикер аварийно завершает работу.

Если граф оказался "правильным", у каждого найденного фильтра, для инициализации, вызывается функция preprocess(). Как только наступает момент для очередного такта обработки (по умолчанию каждые 10 миллисекунд), тикер вызывает функцию process() для всех найденных ранее фильтров источников, а затем и для остальных фильтров списка. Если фильтр имеет входные линки, то запуск функции process() повторяется до тех пор, пока очереди входных линков не опустеют. После этого, он переходит к следующему фильтру списка и "прокручивает" его до освобождения входных линков от сообщений. Тикер переходит от фильтра к фильтру пока не закончится список. На этом обработка такта заканчивается.

Теперь мы вернемся к кортежам и поговорим о том для чего в медиастример была добавлена такая сущность. В общем случае, объем данных, необходимый алгоритму, работающему внутри фильтра не совпадает и не кратен, размеру буферов данных поступающих на входу. Например, мы пишем фильтр, который выполняет быстрое преобразование Фурье, которое по определению может обрабатывать только блоки данных чей размер равен степени двойки. Пусть это будет 512 отсчетов. Если данные генерируются телефонным каналом, то буфер данных каждого сообщения на входе будет приносить нам по 160 отсчетов сигнала. Есть соблазн не забирать данные со входа, пока там не окажется необходимое количество данных. Но в этом случае возникнет коллизия с тикером, который будет безуспешно пытаться прокрутить фильтр до опустошения входного линка. Ранее мы обозначили это правило как третий принцип работы фильтра. В соответствии с этим принципом, функция process() фильтра должна забрать все данные из входных очередей.

Кроме этого со входа нельзя будет забрать только 512 отсчетов, так как забирать можно только целыми блоками, т.е. фильтру придется забрать 640 отсчетов и использовав 512 из них, остаток до накопления новой порции данных. Таким образом, наш фильтр, помимо основной своей работы должен обеспечить вспомогательные действия по промежуточному хранению входных данных. Разработчики медиастримера да решения этой общей задачи разработали специальный объект — MSBufferizer (буферизатор), который решает эту задачу с помощью кортежей.

Буферизатор (MSBufferizer)

Это объект который будет накапливать входные данные внутри фильтра и начнет отдавать их в обработку, как только количество информации окажется достаточным для проворачивания алгоритма фильтра. Пока буферизатор копит данные, фильтр будет работать в холостом режиме, не затрачивая вычислительной мощности процессора. Но как только функция чтения из буферизатора вернет значение отличное от нуля, функция process() фильтра начинает забирать и обрабатывать из буферизатора данные порциями нужного размера, до их исчерпания.
Невостребованные пока данные остаются в буферизаторе как первый элемент кортежа, к которому прицепляются последующие блоки входных данных.

Структура, которая описывает буферизатор:

struct _MSBufferizer{
queue_t q; /* Очередь сообщений. */
int size; /* Суммарный размер данных находящихся в буферизаторе в данный момент. */
};
typedef struct _MSBufferizer MSBufferizer;

Функции работы с MSBufferizer

Создание нового экземпляра буферизатора:

MSBufferizer * ms_bufferizer_new(void);

Выделяется память, инициализируется в ms_bufferizer_init() и возвращается указатель.

Функция инициализации:

void ms_bufferizer_init(MSBufferizer *obj); 

Инициализируется очередь q, поле size устанавливается в ноль.

Добавление сообщения:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m); 

Сообщение m добавляется в очередь. Вычисленный размер блоков данных прибавляется к size.

Перекладка в буферизатор всех сообщений очереди данных линка q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);   

Перенос сообщений из линка q в буферизатор выполняется с помощью функции ms_bufferizer_put().

Чтение из буферизатора:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen); 

Если размер накопленных в буферизаторе данных оказывается меньше запрошенного (datalen), то функция возвращает ноль, копирование данных в data не выполняется. В противном случае выполняется последовательное копирование данных из кортежей находящихся в буферизаторе. После копирования кортеж удаляется и память освобождается. Копирование заканчивается в тот момент, когда будет скопировано datalen байтов. Если место кончается посреди блока данных, то в данном сообщении, блок данных будет сокращен до оставшейся некопированной части. При следующем вызове копирование продолжится с этого места.

Чтение количества данных, которые доступны данный момент в буферизаторе:

int ms_bufferizer_get_avail(MSBufferizer *obj); 

Возвращает поле size буферизатора.

Отбрасывание части данных, находящихся в буферизаторе:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

Указанное количество байтов данных извлекается и отбрасывается. Отбрасываются самые старые данные.

Удаление всех сообщений находящихся в буферизаторе:

void ms_bufferizer_flush(MSBufferizer *obj); 

Счетчик данных сбрасывается в ноль.

Удаление всех сообщений находящихся в буферизаторе:

void ms_bufferizer_uninit(MSBufferizer *obj); 

Обнуление счетчика не выполняется.

Удаление буферизатора и освобождение памяти:

void ms_bufferizer_destroy(MSBufferizer *obj);  

Примеры использования буферизатора можно найти в исходном коде нескольких фильтров медиастримера. Например в фильтре MS_L16_ENC, который выполняет перестановку байтов в отсчетах из сетевого порядка, в порядок хоста: l16.c

В следующей статье, мы рассмотрим вопрос оценки нагрузки на тикер и способы борьбы с чррезмерной вычислительной нагрузкой в медиастримере.

Источник: habr.com

Добавить комментарий