Exploring the Mediastreamer2 VoIP engine. Part 11

The material of the article is taken from my zen channel.

Exploring the Mediastreamer2 VoIP engine. Part 11

Data movement mechanism

  • dblk_t data block
  • mblk_t message
  • mblk_t messaging functions
  • queue_t
  • queue_t functions
  • Connecting filters
  • Data Processing Graph Signal Point
  • The behind-the-scenes activity of the ticker
  • Bufferer (MSBufferizer)
  • Functions for working with MSBufferizer

In the past article we have developed our own filter. In this article, we will focus on the device of the internal mechanism for moving data between the filters of the media streamer. This will allow you to write sophisticated filters with less effort in the future.

Data movement mechanism

The movement of data in the media streamer is performed using queues described by the structure queue_t. Strings of messages like mblk_t, which by themselves do not contain signal data, but only links to the previous, next message and data block. In addition, I want to emphasize especially, there is also a field for a link to a message of the same type, which allows you to organize a singly linked list of messages. A group of messages united by such a list will be called a tuple. Thus, any element of the queue can be a single message mblk_t, or maybe the head of a message tuple mblk_t. Each tuple message can have its own ward data block. Why tuples are needed, we will discuss a little later.

As mentioned above, the message itself does not contain a block of data, instead it contains only a pointer to the memory area where the block is stored. In this part, the overall picture of the work of the media streamer is reminiscent of the warehouse of doors in the Monsters, Inc. cartoon, where the doors (links to data - rooms) move at insane speed along overhead conveyors, while the rooms themselves remain motionless.

Now, moving up the hierarchy from the bottom up, let's consider the listed entities of the data transfer mechanism in the media streamer in detail.

Data block dblk_t

The data block consists of a header and a data buffer. The header is described by the following structure,

typedef struct datab
{
unsigned char *db_base; // Указатель на начало буфер данных.
unsigned char *db_lim;  // Указатель на конец буфер данных.
void (*db_freefn)(void*); // Функция освобождения памяти при удалении блока.
int db_ref; // Счетчик ссылок.
} dblk_t;

The fields of the structure contain pointers to the beginning of the buffer, the end of the buffer, the function of deleting the data buffer. Last item in header db_ref - the reference counter, if it reaches zero, this serves as a signal to remove this block from memory. If the data block was created by the function datab_alloc() , then the data buffer will be placed in memory immediately after the header. In all other cases, the buffer can be located somewhere separately. The data buffer will contain signal samples or other data that we want to process with filters.

A new data block instance is created using the function:

dblk_t *datab_alloc(int size);

As an input parameter, it is passed the size of the data that the block will store. More memory is allocated in order to place a header at the beginning of the allocated memory - a structure datab. But when using other functions, this is not always the case; in some cases, the data buffer may be located separately from the data block header. Structure fields are set up during creation so that its field db_base pointed to the beginning of the data area, and db_lim to its end. Link counter db_ref set to unit. The data cleansing function pointer is set to zero.

Message mblk_t

As mentioned, the elements of the queue are of type mblk_t, it is defined like this:

typedef struct msgb
{
  struct msgb *b_prev;   // Указатель на предыдущий элемент списка.
  struct msgb *b_next;   // Указатель на следующий элемент списка.
  struct msgb *b_cont;   // Указатель для подклейки к сообщению других сообщений, для создания кортежа сообщений.
  struct datab *b_datap; // Указатель на структуру блока данных.
  unsigned char *b_rptr; // Указатель на начало области данных для чтения данных буфера b_datap.
  unsigned char *b_wptr; // Указатель на начало области данных для записи данных буфера b_datap.
  uint32_t reserved1;    // Зарезервированное поле1, медиастример помещает туда служебную информацию. 
  uint32_t reserved2;    // Зарезервированное поле2, медиастример помещает туда служебную информацию.
  #if defined(ORTP_TIMESTAMP)
  struct timeval timestamp;
  #endif
  ortp_recv_addr_t recv_addr;
} mblk_t;

  Structure mblk_t contains pointers at the beginning b_prev, b_next, which are needed to organize a doubly linked list (which is a queue queue_t).

Then comes the pointer b_cont, which is only used when the message is in a tuple. For the last message in the tuple, this pointer remains null.

Next we see a pointer to the data block b_datap, for which the message exists. It is followed by pointers to the area inside the block's data buffer. Field b_rptr specifies the location from which data will be read from the buffer. Field b_wptr specifies the location from which to write to the buffer.

The remaining fields are of a service nature and do not relate to the operation of the data transfer mechanism.

Below is a single message named m1 and data block d1.
Exploring the Mediastreamer2 VoIP engine. Part 11
The following figure shows a tuple of three messages m1, m1_1, m1_2.
Exploring the Mediastreamer2 VoIP engine. Part 11

Messaging features mblk_t

New Post mblk_t created by function:

mblk_t *allocb(int size, int pri); 

it places a new message in memory mblk_t with a data block of the specified size size, the second argument is pri not used in the considered version of the library. It must remain zero. During the operation of the function, memory will be allocated for the structure of the new message and the function will be called mblk_init(), which will nullify all fields of the created instance of the structure and then, using the above datab_alloc(), will create a data buffer. After that, the fields in the structure will be configured:

mp->b_datap=datab;
mp->b_rptr=mp->b_wptr=datab->db_base;
mp->b_next=mp->b_prev=mp->b_cont=NULL;

At the output, we get a new message with initialized fields and an empty data buffer. To add data to the message, you need to copy it to the data block buffer:

memcpy(msg->b_rptr, data, size);

where date is a pointer to the data source, and size - their size.
then we need to update the pointer to the write point to point back to the beginning of the free area in the buffer:

msg->b_wptr = msg->b_wptr + size

If you want to create a message from an existing buffer, without copying, then the following function is used for this:

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*)); 

The function, after creating the message and the structure of the data block, will set up its pointers to the data at the address buf. Those. in this case, the data buffer is not located after the data block header fields, as it was when the data block was created by the function datab_alloc(). The data buffer passed to the function will remain where it was, but with the help of pointers it will be spurred to the newly created data block header, and that one, respectively, to the message.

Go to one post mblk_t several blocks of data can be chained in series. This is done by the function:

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad); 

mp - a message to which one more data block will be added;
date — pointer to the block, a copy of which will be added to the message;
size — data size;
path - a flag that the size of the allocated memory should be aligned on the boundary of 4 bytes (the addition will be performed by zeros).

If there is enough space in the existing message data buffer, then the new data will be glued behind the data already there. If there is less free space in the message data buffer than size, then a new message is created, with a sufficient buffer size, and the data is copied into its buffer. This is a new message, hooked to the original one using a pointer b_cont. In this case, the message is turned into a tuple.

If you need to add another block of data to the tuple, then you need to use the function:

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

it will find the last message in the tuple (it has b_cont will be null) and will call the function for this message appendb().

You can find out the size of data in a message or tuple using the function:

int msgdsize(const mblk_t *mp);

it will iterate through all the messages in the tuple and return the total amount of data in the data buffers of those messages. For each message, the amount of data is calculated as follows:

 mp->b_wptr - mp->b_rptr

To concatenate two tuples, use the function:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

she appends a tuple newm to the tail of the tuple mp and returns a pointer to the last message of the resulting tuple.

If necessary, the tuple can be turned into one message with a single block of data, this is done by the function:

void msgpullup(mblk_t *mp,int len);

if argument len is -1, then the size of the allocated buffer is determined automatically. If len a positive number, a buffer of that size will be created and the message data of the tuple will be copied into it. If the buffer runs out, copying will stop there. The first message of the tuple will receive a buffer of the new size with the copied data. The rest of the messages will be removed and the memory returned to the heap.

When deleting a structure mblk_t the reference count of the data block is taken into account if, when calling freeb() it turns out to be zero, then the data buffer is deleted along with the instance mblk_t, which points to it.

Initialization of new message fields:

void mblk_init(mblk_t *mp);

Adding another piece of data to the message:

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

If the new data does not fit into the free space of the message data buffer, then a separately created message with a buffer of the required size is attached to the message (the pointer to the added message is set in the first message), the message is converted into a tuple.

Adding a chunk of data to a tuple:

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad); 

The function calls appendb() in a loop.

Combining two tuples into one:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

Message newm will be attached to mp.

Create a copy of a single message:

mblk_t *copyb(const mblk_t *mp);

Full copy of a tuple with all data blocks:

mblk_t *copymsg(const mblk_t *mp);

The elements of the tuple are copied by the function copyb().

Create a light copy of a message mblk_t. In this case, the data block is not copied, but its reference counter is increased db_ref:

mblk_t *dupb(mblk_t *mp);

Making a light copy of a tuple. Data blocks are not copied, only their reference counts are incremented db_ref:

mblk_t *dupmsg(mblk_t* m);

Joining all messages of a tuple into one message:

void msgpullup(mblk_t *mp,size_t len);

If the argument len is -1, then the size of the allocated buffer is determined automatically.

Removing a message, tuple:

void freemsg(mblk_t *mp);

The data block reference count is decremented by one. If it reaches zero at the same time, then the data block is also deleted.

Count the total amount of data in a message or tuple.

size_t msgdsize(const mblk_t *mp);

Retrieving a message from the tail of the queue:

mblk_t *ms_queue_peek_last (q);

Copying the contents of the reserved fields of one message to another message (in fact, these fields contain flags that are used by the media streamer):

mblk_meta_copy(const mblk_t *source, mblk *dest);

Turn queue_t

The message queue in the media streamer is implemented as a circular doubly linked list. Each element of the list contains a pointer to a block of data with signal samples. It turns out that only pointers to the data block move in turn, while the data itself remains motionless. Those. only links to them are moved.
Structure describing the queue queue_t, shown below:

typedef struct _queue
{
   mblk_t _q_stopper; /* "Холостой" элемент очереди, не указывает на данные, используется только для управления очередью. При инициализации очереди (qinit()) его указатели настраиваются так, чтобы они указывали на него самого. */
   int q_mcount;        // Количество элементов в очереди.
} queue_t;

The structure contains a field - pointer _q_stopper of type *mblk_t, it points to the first element (message) in the queue. The second field of the structure is the counter of messages in the queue.
The figure below shows a queue named q1 containing 4 messages m1,m2, m3, m4.
Exploring the Mediastreamer2 VoIP engine. Part 11
The following figure shows a queue named q1 containing 4 messages m1,m2, m3, m4. Message m2 is the head of a tuple that contains two more messages m2_1 and m2_2.

Exploring the Mediastreamer2 VoIP engine. Part 11

queue_t functions

Queue initialization:

void qinit(queue_t *q);

Field _q_stopper (hereinafter we will call it "stopper") is initialized by the function mblk_init(), its previous element pointer, and next element pointer are set to point to itself. The queue element counter is reset to zero.

Adding a new element (message):

void putq(queue_t *q, mblk_t *m);

New element m is added to the end of the list, the element pointers are adjusted so that the stop becomes the next element for it, and it becomes the previous element for the stop. The queue element counter is incremented.

Retrieving an element from the queue:

mblk_t * getq(queue_t *q); 

the message after the stopper is retrieved, the element counter is decremented. If there are no elements in the queue except for the stopper, then 0 is returned.

Inserting a message into the queue:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp); 

Element mp inserted before the element emp. If emp=0, then the message is added to the tail of the queue.

Retrieving a message from the head of the queue:

void remq(queue_t *q, mblk_t *mp); 

The element counter is decremented.

Reading a pointer to the first element in the queue:

mblk_t * peekq(queue_t *q); 

Removing all elements from the queue with the removal of the elements themselves:

void flushq(queue_t *q, int how);

Argument how not used. The queue element counter is set to zero.

Macro for reading a pointer to the last element of the queue:

mblk_t * qlast(queue_t *q);

When working with message queues, keep in mind that when you call ms_queue_put(q, m) with a null pointer to the message, the function loops. Your program will hang. Behaves similarly ms_queue_next(q, m).

Connecting filters

The queue described above is used to pass messages from one filter to another or from one filter to several filters. Filters and their interconnections form a directed graph. The input or output of the filter will be called the generalizing word "pin". To describe the order in which filters are connected to each other, the concept of "signal point" is used in the media streamer. Signal point is a structure _MSCPoint, which contains a pointer to the filter and the number of one of its pins, respectively, it describes the connection of one of the inputs or outputs of the filter.

Data Processing Graph Signal Point

typedef struct _MSCPoint{
struct _MSFilter *filter; // Указатель на фильтр медиастримера.
int pin;                        // Номер одного из входов или выходов фильтра, т.е. пин.
} MSCPoint;

Filter pins are numbered starting from zero.

The connection of two pins by a message queue is described by the structure _MSQueue, which contains the message queue and pointers to the two signal points it connects:

typedef struct _MSQueue
{
queue_t q;
MSCPoint prev;
MSCPoint next;
}MSQueue;

We will call this structure a signal link. Each media streamer filter contains an input link table and an output link table (MSQueue). The size of the tables is set when creating the filter, we have already done this using the exported type variable MSFilterDescwhen we were developing our own filter. Below is a structure describing any filter in the media streamer, MSFilter:


struct _MSFilter{
    MSFilterDesc *desc;    /* Указатель на дескриптор фильтра. */
    /* Защищенные атрибуты, их нельзя сдвигать или убирать иначе будет нарушена работа с плагинами. */
    ms_mutex_t lock;      /* Семафор. */
    MSQueue **inputs;     /* Таблица входных линков. */
    MSQueue **outputs;    /* Таблица выходных линков. */
    struct _MSFactory *factory; /* Указатель на фабрику, которая создала данный экземпляр фильтра. */
    void *padding;              /* Не используется, будет задействован если добавятся защищенные поля. */
    void *data;                 /* Указатель на произвольную структуру для хранения данных внутреннего состояния фильтра и промежуточных вычислений. */
    struct _MSTicker *ticker;   /* Указатель на объект тикера, который не должен быть нулевым когда вызывается функция process(). */
    /*private attributes, they can be moved and changed at any time*/
    MSList *notify_callbacks; /* Список обратных вызовов, используемых для обработки событий фильтра. */
    uint32_t last_tick;       /* Номер последнего такта, когда выполнялся вызов process(). */ 
    MSFilterStats *stats;     /* Статистика работы фильтра.*/
    int postponed_task; /*Количество отложенных задач. Некоторые фильтры могут откладывать обработку данных (вызов process()) на несколько тактов.*/
    bool_t seen;  /* Флаг, который использует тикер, чтобы помечать что этот экземпляр фильтра он уже обслужил на данном такте.*/
};
typedef struct _MSFilter MSFilter;

After we connected the filters in the C program in accordance with our intention (but did not connect the ticker), we thereby created a directed graph, the nodes of which are instances of the structure MSFilter, and edges are instances of links MSQueue.

The behind-the-scenes activity of the ticker

When I told you that the ticker is a clock source filter, there was not the whole truth about it. A ticker is an object that executes the launch of functions by the clock. process() all filters of the scheme (graph) to which it is connected. When we connect a ticker to a graph filter in a C program, we show the ticker the graph that it will control from now on until we turn it off. After connecting, the ticker begins to inspect the graph entrusted to it, compiling a list of filters that it includes. In order not to "count" the same filter twice, it marks the detected filters by setting the checkbox in them seen. The search is carried out according to the link tables that each filter has.

During its introductory tour of the graph, the ticker checks whether among the filters there is at least one that acts as a source of data blocks. If there are none, then the graph is considered invalid and the ticker crashes.

If the graph turned out to be "correct", for each filter found, the function is called for initialization preprocess(). As soon as the time comes for the next processing cycle (every 10 milliseconds by default), the ticker calls the function process() for all previously found source filters, and then for the rest of the list filters. If the filter has input links, then running the function process() repeats until the input link queues are empty. After that, it moves to the next filter in the list and "scrolls" it until the input links are empty of messages. The ticker moves from filter to filter until the list ends. This completes the clock processing.

Now we will return to tuples and talk about why such an entity was added to the media streamer. In general, the amount of data required by the algorithm running inside the filter does not match and is not a multiple of the size of the input data buffers. For example, we are writing a filter that performs a Fast Fourier Transform, which by definition can only process blocks of data whose size is a power of two. Let it be 512 counts. If the data is generated by a telephone channel, then the data buffer of each message at the input will bring us 160 signal samples. There is a temptation to not take data from the input until the required amount of data is there. But in this case, there will be a collision with the ticker, which will unsuccessfully try to scroll the filter until the input link is empty. Earlier, we designated this rule as the third principle of the filter. According to this principle, the filter's process() function must take all the data from the input queues.

In addition, it will not be possible to take only 512 readings from the input, since it is possible to take only whole blocks, i.e. the filter will have to take 640 samples and use 512 of them, the remainder before the accumulation of a new portion of data. Thus, our filter, in addition to its main work, must provide auxiliary actions for intermediate storage of input data. The developers of the media streamer and the solution to this common problem have developed a special object - MSBufferizer (bufferizer), which solves this problem using tuples.

Bufferer (MSBufferizer)

This is an object that will accumulate input data inside the filter and begin to give it to processing as soon as the amount of information is sufficient to turn the filter algorithm. While the bufferer is accumulating data, the filter will work in idle mode, without wasting the processing power of the processor. But as soon as the function of reading from the bufferer returns a non-zero value, the process() function of the filter begins to take and process data from the bufferer in portions of the required size, until they are exhausted.
Unclaimed data remains in the buffer as the first element of a tuple to which subsequent blocks of input are attached.

A structure that describes a bufferer:

struct _MSBufferizer{
queue_t q; /* Очередь сообщений. */
int size; /* Суммарный размер данных находящихся в буферизаторе в данный момент. */
};
typedef struct _MSBufferizer MSBufferizer;

Functions for working with MSBufferizer

Creating a new bufferer instance:

MSBufferizer * ms_bufferizer_new(void);

Memory is allocated, initialized in ms_bufferizer_init() and a pointer is returned.

Initialization function:

void ms_bufferizer_init(MSBufferizer *obj); 

Queue is initialized qfield size set to zero.

Adding a message:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m); 

Message m is added to the queue. The calculated data block size is added to size.

Transfer to the bufferer of all messages of the link data queue q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);   

Transferring messages from a link q to the bufferer is done using the function ms_bufferizer_put().

Reading from buffer:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen); 

If the size of the data accumulated in the bufferer is less than the requested one (datalen), then the function returns zero, data is not copied to data. Otherwise, sequential copying of data from tuples in the buffer is performed. After copying, the tuple is removed and the memory is freed. Copying ends when datalen bytes are copied. If space ends in the middle of a data block, then in this message, the data block will be reduced to the remaining uncopied part. On the next call, copying will continue from this point.

Reading the amount of data that is currently available in the buffer:

int ms_bufferizer_get_avail(MSBufferizer *obj); 

Returns a field size bufferer.

Discarding part of the data in the bufferer:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

The specified number of data bytes are retrieved and discarded. The oldest data is discarded.

Deleting all messages in the buffer:

void ms_bufferizer_flush(MSBufferizer *obj); 

The data counter is reset to zero.

Deleting all messages in the buffer:

void ms_bufferizer_uninit(MSBufferizer *obj); 

The counter is not reset.

Removing the buffer and freeing memory:

void ms_bufferizer_destroy(MSBufferizer *obj);  

Examples of using the bufferer can be found in the source code of several media streamer filters. For example, in the MS_L16_ENC filter, which permutes bytes in samples from network order to host order: l16.c

In the next article, we will look at the issue of ticker load estimation and how to deal with excessive computing load in the media streamer.

Source: habr.com

Add a comment