探索 Mediastreamer2 VoIP 引擎。 第 11 部分

文章素材取自我 禪頻道.

探索 Mediastreamer2 VoIP 引擎。 第 11 部分

資料移動機制

  • 資料塊dblk_t
  • 訊息mblk_t
  • 用於處理訊息 mblk_t 的函數
  • 隊列queue_t
  • 使用隊列queue_t的函數
  • 連接過濾器
  • 數據處理圖訊號點
  • 股票行情的幕後活動
  • 緩衝器(MSBufferizer)
  • 使用 MSBufferizer 的函數

在過去 文章 我們開發了自己的過濾器。 本文將重點放在在媒體串流過濾器之間移動資料的內部機制。 這將使您將來能夠更輕鬆地編寫複雜​​的過濾器。

資料移動機制

媒體流中的資料移動是使用結構描述的佇列執行的 隊列_t。 訊息字串如 mblk_t,它們本身不包含訊號數據,而僅連結到上一個、下一個訊息和資料區塊。 另外,我要特別強調的是,還有一個用於指向相同類型訊息的連結的字段,它允許您組織訊息的單一連結列表。 我們將由這樣一個列表聯合起來的一組訊息稱為元組。 因此,佇列的任何元素都可以是單一訊息 mblk_t,也許是訊息元組的頭部 mblk_t。 每個元組訊息可以有自己的病房資料塊。 稍後我們將討論為什麼需要元組。

如上所述,訊息本身不包含資料區塊;相反,它僅包含指向儲存該區塊的記憶體區域的指標。 在這一部分中,媒體串流工作的整體畫面讓人想起動畫片《怪物公司》中的門倉庫,其中門(連結到資料室)沿著高架傳送帶以瘋狂的速度移動,而房間本身保持一動不動。

現在,沿著層次結構從下到上,讓我們詳細考慮媒體串流傳輸器中資料傳輸機制的列出實體。

資料區塊 資料庫表

資料塊由標頭和資料緩衝區組成。 標頭由以下結構描述:

typedef struct datab
{
unsigned char *db_base; // Указатель на начало буфер данных.
unsigned char *db_lim;  // Указатель на конец буфер данных.
void (*db_freefn)(void*); // Функция освобождения памяти при удалении блока.
int db_ref; // Счетчик ссылок.
} dblk_t;

此結構的欄位包含指向緩衝區開頭、緩衝區結尾以及刪除資料緩衝區的函數的指標。 標題中的最後一個元素 資料庫引用 — 引用計數器,如果它達到零,則作為從記憶體中刪除該區塊的訊號。 如果資料塊是由函數建立的 datab_alloc() ,那麼資料緩衝區將緊接在標頭之後放置在記憶體中。 在所有其他情況下,緩衝區可以單獨位於某處。 資料緩衝區將包含我們想要用濾波器處理的訊號樣本或其他資料。

使用下列函數建立資料區塊的新實例:

dblk_t *datab_alloc(int size);

作為輸入參數,給出了區塊將儲存的資料的大小。 分配更多內存,以便將標頭 - 結構 - 放置在已分配內存的開頭 數據庫。 但當使用其他函數時,這種情況並不總是發生;在某些情況下,資料緩衝區可能與資料區塊頭分開。 建立結構體時,配置字段,使其字段 資料庫 指向資料區的開頭,並且 db_lim 直至其結束。 連結數 資料庫引用 設定為一。 資料清除函數指標設定為零。

信息 mblk_t

如上所述,隊列元素的類型 mblk_t, 它的定義如下:

typedef struct msgb
{
  struct msgb *b_prev;   // Указатель на предыдущий элемент списка.
  struct msgb *b_next;   // Указатель на следующий элемент списка.
  struct msgb *b_cont;   // Указатель для подклейки к сообщению других сообщений, для создания кортежа сообщений.
  struct datab *b_datap; // Указатель на структуру блока данных.
  unsigned char *b_rptr; // Указатель на начало области данных для чтения данных буфера b_datap.
  unsigned char *b_wptr; // Указатель на начало области данных для записи данных буфера b_datap.
  uint32_t reserved1;    // Зарезервированное поле1, медиастример помещает туда служебную информацию. 
  uint32_t reserved2;    // Зарезервированное поле2, медиастример помещает туда служебную информацию.
  #if defined(ORTP_TIMESTAMP)
  struct timeval timestamp;
  #endif
  ortp_recv_addr_t recv_addr;
} mblk_t;

結構 mblk_t 開頭包含指針 b_上一個, b_下一個,這是組織雙向鍊錶(這是一個隊列 隊列_t).

然後是指針 b_繼續,僅當訊息是元組的一部分時才使用。 對於元組中的最後一則訊息,該指標保持為空。

接下來我們看到一個指向資料塊的指針 b_datap,該訊息存在。 接下來是指向區塊資料緩衝區內區域的指標。 場地 b_rptr 指定將從緩衝區讀取資料的位置。 場地 b_wptr 指示將執行寫入緩衝區的位置。

其餘欄位具有服務性質,與資料傳輸機制的操作無關。

下面是一條帶有名稱的訊息 m1 和資料區塊 d1.
探索 Mediastreamer2 VoIP 引擎。 第 11 部分
下圖顯示了三個訊息的元組 m1, m1_1, m1_2.
探索 Mediastreamer2 VoIP 引擎。 第 11 部分

訊息傳遞功能 mblk_t

一條新消息 mblk_t 由函數建立:

mblk_t *allocb(int size, int pri); 

她在內存中放置了一條新訊息 mblk_t 具有指定大小的資料塊 尺寸,第二個參數 - PRI 在此版本的庫中未使用。 它應該保持為零。 在函數運行期間,將為新訊息的結構分配內存,並調用該函數 mblk_init(),這將重置所建立的結構實例的所有字段,然後使用上面提到的 datab_alloc(),將建立一個資料緩衝區。 之後將配置結構中的欄位:

mp->b_datap=datab;
mp->b_rptr=mp->b_wptr=datab->db_base;
mp->b_next=mp->b_prev=mp->b_cont=NULL;

在輸出中,我們收到一條新訊息,其中包含已初始化的欄位和一個空的資料緩衝區。 要將資料新增至訊息中,您需要將其複製到資料塊緩衝區:

memcpy(msg->b_rptr, data, size);

哪裡 數據 是指向資料來源的指針,並且 尺寸 - 他們的大小。
那麼您需要更新指向寫入點的指針,以便它再次指向緩衝區中空閒區域的開頭:

msg->b_wptr = msg->b_wptr + size

如果您需要從現有緩衝區建立訊息而不進行複製,請使用下列函數:

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*)); 

此函數在建立訊息和資料區塊的結構後,將其指標配置為指向位址處的數據 BUF。 那些。 在這種情況下,資料緩衝區不位於資料區塊的標頭欄位之後,就像使用函數建立資料區塊時的情況一樣 datab_alloc()。 傳遞給函數的資料緩衝區將保留在原來的位置,但在指標的幫助下,它將附加到新建立的資料區塊標頭,並相應地附加到訊息。

至一則訊息 mblk_t 多個資料塊可以依序連接。 這是透過以下函數完成的:

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad); 

mp — 將會新增另一個資料塊的訊息;
數據 — 指向區塊的指針,該區塊的副本將會加入到訊息中;
尺寸 ——數據大小;
— 分配記憶體的大小必須沿著 4 位元組邊界對齊的標誌(填充將用零完成)。

如果現有訊息資料緩衝區中有足夠的空間,則新資料將貼在已有資料的後面。 如果訊息資料緩衝區中的可用空間少於 尺寸,然後建立一個具有足夠緩衝區大小的新訊息,並將資料複製到其緩衝區。 這是一條新訊息,使用指針連結到原始訊息 b_繼續。 在這種情況下,訊息變成一個元組。

如果您需要為元組新增另一個資料塊,那麼您需要使用下列函數:

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

她會在元組中找到最後一則訊息(他有 b_繼續 將為空)並將呼叫該訊息的函數 追加().

您可以使用下列函數找出訊息或元組中資料的大小:

int msgdsize(const mblk_t *mp);

它將循環遍歷元組中的所有訊息並傳回這些訊息的資料緩衝區中的資料總量。 對於每條訊息,資料量計算如下:

 mp->b_wptr - mp->b_rptr

若要組合兩個元組,請使用下列函數:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

她附加了元組 紐姆 到元組的尾部 mp 並傳回指向結果元組的最後一條訊息的指標。

如有必要,可以將元組轉換為具有單一資料區塊的一條訊息;這是透過以下函數完成的:

void msgpullup(mblk_t *mp,int len);

如果參數 LEN 為-1,則自動決定分配的緩衝區的大小。 如果 LEN 是正數,將建立此大小的緩衝區,並將元組訊息資料複製到其中。 如果緩衝區用完,複製將在那裡停止。 元組的第一個訊息將接收一個新大小的緩衝區,其中包含複製的資料。 剩餘的訊息將被刪除,記憶體返回到堆。

刪除結構時 mblk_t 如果呼叫時,則考慮資料塊的引用計數 釋放() 結果為零,則資料緩衝區與實例一起被刪除 mblk_t,它指向它。

初始化新訊息的欄位:

void mblk_init(mblk_t *mp);

將另一條數據新增至訊息:

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

如果新資料無法放入訊息資料緩衝區的可用空間,則將具有所需大小緩衝區的單獨建立的訊息附加到該訊息(在第一則訊息中設定指向所新增訊息的指標),並且訊息變成一個元組。

在元組中新增一條資料:

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad); 

此函數在循環中呼叫appendb()。

將兩個元組合併為一個:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

信息 紐姆 將附於 mp.

複製單一訊息:

mblk_t *copyb(const mblk_t *mp);

完整複製包含所有資料塊的元組:

mblk_t *copymsg(const mblk_t *mp);

元組的元素由函數複製 複製b().

建立訊息的簡單副本 mblk_t。 在這種情況下,資料塊沒有被複製,但是它的引用計數器增加了 資料庫引用:

mblk_t *dupb(mblk_t *mp);

製作元組的輕量級副本。 資料塊不會被複製,只會增加它們的引用計數器 資料庫引用:

mblk_t *dupmsg(mblk_t* m);

將元組的所有訊息黏合到一條訊息中:

void msgpullup(mblk_t *mp,size_t len);

如果論據 LEN 為-1,則自動決定分配的緩衝區的大小。

刪除訊息,元組:

void freemsg(mblk_t *mp);

資料塊的引用計數減一。 如果達到零,資料塊也會被刪除。

計算訊息或元組中的資料總量。

size_t msgdsize(const mblk_t *mp);

從佇列尾部檢索訊息:

mblk_t *ms_queue_peek_last (q);

將一條訊息的保留欄位的內容複製到另一則訊息中(事實上,這些欄位包含媒體串流處理程序使用的標誌):

mblk_meta_copy(const mblk_t *source, mblk *dest);

隊列_t

媒體流中的消息隊列被實作為循環雙向鍊錶。 每個列表元素都包含一個指向帶有訊號樣本的資料塊的指標。 事實證明,只有指向資料區塊的指標依序移動,而資料本身保持不動。 那些。 僅移動到它們的連結。
描述隊列的結構 隊列_t,如下圖所示:

typedef struct _queue
{
   mblk_t _q_stopper; /* "Холостой" элемент очереди, не указывает на данные, используется только для управления очередью. При инициализации очереди (qinit()) его указатели настраиваются так, чтобы они указывали на него самого. */
   int q_mcount;        // Количество элементов в очереди.
} queue_t;

該結構包含一個字段 - 一個指針 _q_stopper 類型*mblk_t,它指向佇列中的第一個元素(訊息)。 該結構的第二個欄位是佇列中訊息的計數器。
下圖顯示了一個名為 q1 的佇列,其中包含 4 個訊息 m1、m2、m3、m4。
探索 Mediastreamer2 VoIP 引擎。 第 11 部分
下圖顯示了一個名為 q1 的佇列,其中包含 4 個訊息 m1,m2,m3,m4。 訊息 m2 是包含另外兩個訊息 m2_1 和 m2_2 的元組的頭。

探索 Mediastreamer2 VoIP 引擎。 第 11 部分

使用隊列queue_t的函數

隊列初始化:

void qinit(queue_t *q);

領域 _q_stopper (以下我們稱為“stopper”)由函數初始化 mblk_init(),其上一個元素和下一個元素指標被調整為指向自身。 隊列元素計數器重設為零。

新增元素(訊息):

void putq(queue_t *q, mblk_t *m);

新元素 m 添加到列表的末尾,調整元素指針,使 stopper 成為它的下一個元素,並且它成為 stopper 的前一個元素。 隊列元素計數器遞增。

從佇列中檢索元素:

mblk_t * getq(queue_t *q); 

檢索停止器之後出現的訊息,並且元素計數器會遞減。 如果佇列中除 stopper 之外沒有任何元素,則傳回 0。

將訊息插入佇列:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp); 

元素 mp 插入到元素之前 EMP。 如果 EMP=0,則該訊息被加入到佇列尾部。

從佇列頭部檢索訊息:

void remq(queue_t *q, mblk_t *mp); 

元素計數器遞減。

讀取指向佇列中第一個元素的指標:

mblk_t * peekq(queue_t *q); 

從佇列中刪除所有元素,同時刪除元素本身:

void flushq(queue_t *q, int how);

爭論 如何 不曾用過。 隊列元素計數器設定為零。

用於讀取指向佇列最後一個元素的指標的巨集:

mblk_t * qlast(queue_t *q);

使用訊息佇列時,請注意,當您調用 ms_queue_put(q, m) 如果訊息的空指標為空,則函數將循環。 你的程式將會凍結。 行為類似 ms_queue_next(q, m).

連接過濾器

上述隊列用於將訊息從一個過濾器傳遞到另一個過濾器或從一個過濾器傳遞到多個過濾器。 過濾器及其連接形成有向圖。 濾波器的輸入或輸出將被稱為通用詞“pin”。 為了描述過濾器彼此連接的順序,媒體串流媒體使用「訊號點」的概念。 訊號點是結構 _MSC點,其中包含指向濾波器的指針及其引腳之一的編號;相應地,它描述了濾波器的輸入或輸出之一的連接。

數據處理圖訊號點

typedef struct _MSCPoint{
struct _MSFilter *filter; // Указатель на фильтр медиастримера.
int pin;                        // Номер одного из входов или выходов фильтра, т.е. пин.
} MSCPoint;

濾波器引腳從零開始編號。

兩個引腳通過訊息隊列的連接由結構體描述 _MSQueue,其中包含一個訊息佇列和指向它連接的兩個訊號點的指標:

typedef struct _MSQueue
{
queue_t q;
MSCPoint prev;
MSCPoint next;
}MSQueue;

我們將這種結構稱為訊號鏈路。 每個媒體流過濾器包含一個輸入連結表和一個輸出連結表(訊息佇列)。 表的大小是在建立過濾器時設定的;我們已經使用類型的導出變數完成了此操作 MS過濾描述,當我們開發自己的過濾器時。 下面是描述媒體流中的任何過濾器的結構, 質譜過濾器:


struct _MSFilter{
    MSFilterDesc *desc;    /* Указатель на дескриптор фильтра. */
    /* Защищенные атрибуты, их нельзя сдвигать или убирать иначе будет нарушена работа с плагинами. */
    ms_mutex_t lock;      /* Семафор. */
    MSQueue **inputs;     /* Таблица входных линков. */
    MSQueue **outputs;    /* Таблица выходных линков. */
    struct _MSFactory *factory; /* Указатель на фабрику, которая создала данный экземпляр фильтра. */
    void *padding;              /* Не используется, будет задействован если добавятся защищенные поля. */
    void *data;                 /* Указатель на произвольную структуру для хранения данных внутреннего состояния фильтра и промежуточных вычислений. */
    struct _MSTicker *ticker;   /* Указатель на объект тикера, который не должен быть нулевым когда вызывается функция process(). */
    /*private attributes, they can be moved and changed at any time*/
    MSList *notify_callbacks; /* Список обратных вызовов, используемых для обработки событий фильтра. */
    uint32_t last_tick;       /* Номер последнего такта, когда выполнялся вызов process(). */ 
    MSFilterStats *stats;     /* Статистика работы фильтра.*/
    int postponed_task; /*Количество отложенных задач. Некоторые фильтры могут откладывать обработку данных (вызов process()) на несколько тактов.*/
    bool_t seen;  /* Флаг, который использует тикер, чтобы помечать что этот экземпляр фильтра он уже обслужил на данном такте.*/
};
typedef struct _MSFilter MSFilter;

當我們按照計劃連接C程式中的過濾器(但沒有連接股票)後,我們就創建了一個有向圖,其節點是結構體的實例 質譜過濾器,邊是連結的實例 訊息佇列.

股票行情的幕後活動

當我告訴你股票行情是對蜱蟲來源的過濾器時,這並不是全部真相。 股票代碼是一個在時鐘上運行函數的對象 過程() 它所連接的電路(圖)的所有濾波器。 當我們在 C 程式中將股票代碼連接到圖形過濾器時,我們會向股票代碼顯示從現在起它將控制的圖形,直到我們將其關閉。 連接後,股票行情自動收錄器開始檢查委託其處理的圖表,並編譯包含該圖表的過濾器清單。 為了不對同一過濾器“計數”兩次,它通過在其中放置一個複選框來標記檢測到的過濾器 看到。 使用每個過濾器具有的連結表來執行搜尋。

在對圖表進行介紹性瀏覽期間,自動收錄器會檢查篩選器中是否至少有一個充當資料區塊來源。 如果沒有,則圖表被認為不正確且股票行情崩潰。

如果圖表結果是“正確的”,對於每個找到的過濾器,將呼叫該函數進行初始化 預處理()。 一旦下一個處理週期的時刻到來(預設為每 10 毫秒),股票代碼就會呼叫該函數 過程() 對於所有先前找到的來源過濾器,然後對於清單中的其餘過濾器。 如果過濾器有輸入鏈接,則運行該函數 過程() 重複直到輸入連結佇列為空。 之後,它會移至清單中的下一個過濾器並「滾動」它,直到輸入連結沒有訊息。 股票代碼從一個過濾器移動到另一個過濾器,直到清單結束。 這樣就完成了循環的處理。

現在我們將回到元組並討論為什麼這樣的實體被添加到媒體串流媒體中。 一般來說,過濾器內部運行的演算法所需的資料量不一致,且不是輸入端接收的資料緩衝區大小的倍數。 例如,我們正在編寫一個執行快速傅立葉變換的濾波器,根據定義,它只能處理大小為 512 的冪的資料區塊。 設計數為 160。 如果資料是由電話通道產生的,那麼輸入處的每個訊息的資料緩衝區將為我們帶來XNUMX個訊號樣本。 在獲得所需的資料量之前,人們很容易不從輸入中收集資料。 但在這種情況下,會與股票代碼發生衝突,股票代碼將無法成功嘗試滾動過濾器,直到輸入連結為空。 之前,我們將此規則指定為過濾器的第三個原則。 根據這個原則,濾波器的process()函數必須從輸入佇列中取得所有資料。

此外,不可能僅從輸入中獲取 512 個樣本,因為您只能獲取整個區塊,即過濾器必須採集 640 個樣本,並使用其中的 512 個樣本,剩餘的樣本將在累積新的資料部分之前使用。 因此,我們的過濾器除了主要工作之外,還必須為輸入資料的中間儲存提供輔助操作。 媒體流和解決這個普遍問題的開發人員開發了一個特殊的物件 - MSBufferizer(緩衝區),它使用元組解決了這個問題。

緩衝器(MSBufferizer)

這是一個對象,它將在過濾器內累積輸入數據,並在資訊量足以運行過濾器演算法後立即開始提交數據進行處理。 當緩衝區累積資料時,過濾器將在空閒模式下運行,而不會耗盡處理器的處理能力。 但是,一旦從緩衝區的讀取函數返回非零值,過濾器的 process() 函數就開始按所需大小的部分從緩衝區獲取並處理數據,直到數據耗盡。
尚未需要的資料作為元組的第一個元素保留在緩衝區中,隨後的輸入資料區塊將附加到該元素。

描述緩衝區的結構:

struct _MSBufferizer{
queue_t q; /* Очередь сообщений. */
int size; /* Суммарный размер данных находящихся в буферизаторе в данный момент. */
};
typedef struct _MSBufferizer MSBufferizer;

使用 MSBufferizer 的函數

建立一個新的緩衝區實例:

MSBufferizer * ms_bufferizer_new(void);

記憶體被分配,初始化於 ms_bufferizer_init() 並返回一個指針。

初始化函數:

void ms_bufferizer_init(MSBufferizer *obj); 

隊列正在初始化 q, 場地 尺寸 被設定為零。

新增訊息:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m); 

訊息 m 被加入到佇列中。 將計算出的資料塊大小加到 尺寸.

將所有訊息從鏈路資料佇列傳輸到緩衝區 q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);   

從連結傳輸訊息 q 在緩衝區中使用函數執行 ms_bufferizer_put().

從緩衝區讀取:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen); 

如果緩衝區中累積的資料大小小於請求的大小(數據倫),則函數傳回零,資料不複製到資料。 否則,執行來自位於緩衝區中的元組的資料的順序複製。 複製後,元組被刪除並釋放記憶體。 當 datalen 位元組被複製時,複製結束。 如果資料區塊中間的空間用完,則在該訊息中,資料區塊將縮減為剩餘未複製的部分。 下次呼叫時,複製將從此時開始繼續。

讀取緩衝區中目前可用的資料量:

int ms_bufferizer_get_avail(MSBufferizer *obj); 

傳回字段 尺寸 緩衝器。

丟棄緩衝區中的部分資料:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

檢索並丟棄指定數量的資料位元組。 最舊的數據將被丟棄。

刪除緩衝區中的所有訊息:

void ms_bufferizer_flush(MSBufferizer *obj); 

數據計數器重置為零。

刪除緩衝區中的所有訊息:

void ms_bufferizer_uninit(MSBufferizer *obj); 

計數器不重置。

刪除緩衝區並釋放記憶體:

void ms_bufferizer_destroy(MSBufferizer *obj);  

使用緩衝區的範例可以在幾個媒體串流過濾器的原始程式碼中找到。 例如,在 MS_L16_ENC 過濾器中,它將樣本中的位元組從網路順序重新排列為主機順序: l16.c

在下一篇文章中,我們將討論股票負載估計問題以及如何處理媒體流處理器中過多的計算負載。

來源: www.habr.com

添加評論