Истраживање Медиастреамер2 ВоИП механизма. Део 11

Материјал чланка је преузет са мог зен канал.

Истраживање Медиастреамер2 ВоИП механизма. Део 11

Механизам кретања података

  • Блок података дблк_т
  • Порука мблк_т
  • Функције за рад са порукама мблк_т
  • Куеуе куеуе_т
  • Функције за рад са редовима куеуе_т
  • Повезивање филтера
  • Сигнална тачка графика обраде података
  • Активности иза кулиса
  • Баферизер (МСБуфферизер)
  • Функције за рад са МСБуфферизер-ом

У последњих Чланак развили смо сопствени филтер. Овај чланак ће се фокусирати на интерни механизам за премештање података између филтера медијских стримера. Ово ће вам омогућити да пишете софистициране филтере са мање напора у будућности.

Механизам кретања података

Кретање података у медијском стримеру се врши помоћу редова описаних структуром куеуе_т. Низови порука попут мблк_т, који сами по себи не садрже сигналне податке, већ само везују за претходну, следећу поруку и блок података. Поред тога, желим посебно да истакнем да постоји и поље за везу ка поруци истог типа, што вам омогућава да организујете једноструко повезану листу порука. Групу порука уједињених таквом листом назваћемо торком. Дакле, било који елемент реда може бити једна порука мблк_т, а можда и глава торке поруке мблк_т. Свака порука корте може имати свој блок података одељења. Разговараћемо о томе зашто су тупле потребне нешто касније.

Као што је горе поменуто, сама порука не садржи блок података; уместо тога, садржи само показивач на меморијску област у којој је блок ускладиштен. У овом делу укупна слика рада медијског стримера подсећа на врата магацина у цртаном филму „Монстерс, Инц.“, где се врата (линкови са подацима – собе) крећу лудом брзином дуж надземних транспортера, а саме просторије остати непомичан.

Сада, крећући се по хијерархији од дна ка врху, размотримо детаљно наведене ентитете механизма преноса података у медијском стримеру.

Блок података дблк_т

Блок података се састоји од заглавља и бафера података. Заглавље је описано следећом структуром,

typedef struct datab
{
unsigned char *db_base; // Указатель на начало буфер данных.
unsigned char *db_lim;  // Указатель на конец буфер данных.
void (*db_freefn)(void*); // Функция освобождения памяти при удалении блока.
int db_ref; // Счетчик ссылок.
} dblk_t;

Поља структуре садрже показиваче на почетак бафера, крај бафера и функцију за брисање бафера података. Последњи елемент у заглављу дб_реф — референтни бројач, ако достигне нулу, ово служи као сигнал за брисање овог блока из меморије. Ако је блок података креирала функција датаб_аллоц() , тада ће бафер података бити смештен у меморију одмах након заглавља. У свим осталим случајевима, бафер се може налазити негде одвојено. Бафер података ће садржати узорке сигнала или друге податке које желимо да обрадимо филтерима.

Нова инстанца блока података се креира помоћу функције:

dblk_t *datab_alloc(int size);

Као улазни параметар даје се величина података које ће блок чувати. Више меморије се додељује како би се заглавље - структура - поставило на почетак додељене меморије датаб. Али када користите друге функције, то се не дешава увек; у ​​неким случајевима, бафер података може бити лоциран одвојено од заглавља блока података. Приликом креирања структуре, поља се конфигуришу тако да њено поље дб_басе указао на почетак области података, и дб_лим до свог краја. Број линкова дб_реф је постављен на један. Показивач функције брисања података је постављен на нулу.

Порука мблк_т

Као што је наведено, елементи реда су типа мблк_т, дефинисан је на следећи начин:

typedef struct msgb
{
  struct msgb *b_prev;   // Указатель на предыдущий элемент списка.
  struct msgb *b_next;   // Указатель на следующий элемент списка.
  struct msgb *b_cont;   // Указатель для подклейки к сообщению других сообщений, для создания кортежа сообщений.
  struct datab *b_datap; // Указатель на структуру блока данных.
  unsigned char *b_rptr; // Указатель на начало области данных для чтения данных буфера b_datap.
  unsigned char *b_wptr; // Указатель на начало области данных для записи данных буфера b_datap.
  uint32_t reserved1;    // Зарезервированное поле1, медиастример помещает туда служебную информацию. 
  uint32_t reserved2;    // Зарезервированное поле2, медиастример помещает туда служебную информацию.
  #if defined(ORTP_TIMESTAMP)
  struct timeval timestamp;
  #endif
  ortp_recv_addr_t recv_addr;
} mblk_t;

Структура мблк_т садржи показиваче на почетку б_прев, б_нект, који су неопходни за организовање двоструко повезане листе (која је ред куеуе_т).

Затим долази показивач б_цонт, који се користи само када је порука део тупле. За последњу поруку у тупле, овај показивач остаје нулл.

Затим видимо показивач на блок података б_датап, за који порука постоји. Прате га показивачи на област унутар бафера података блока. Поље б_рптр одређује локацију са које ће се читати подаци из бафера. Поље б_вптр означава локацију са које ће се уписивати у бафер.

Преостала поља су услужне природе и не односе се на рад механизма за пренос података.

Испод је једна порука са именом m1 и блок података d1.
Истраживање Медиастреамер2 ВоИП механизма. Део 11
Следећа слика приказује низ од три поруке m1, м1_1, м1_2.
Истраживање Медиастреамер2 ВоИП механизма. Део 11

Функције за размену порука мблк_т

Нова порука мблк_т креирана од стране функције:

mblk_t *allocb(int size, int pri); 

она ставља нову поруку у меморију мблк_т са блоком података наведене величине величина, други аргумент - при не користи се у овој верзији библиотеке. Требало би да остане нула. Током рада функције, меморија ће бити додељена за структуру нове поруке и функција ће бити позвана мблк_инит(), који ће ресетовати сва поља креиране инстанце структуре и затим, користећи горе поменуто датаб_аллоц(), креираће бафер података. Након тога ће поља у структури бити конфигурисана:

mp->b_datap=datab;
mp->b_rptr=mp->b_wptr=datab->db_base;
mp->b_next=mp->b_prev=mp->b_cont=NULL;

На излазу добијамо нову поруку са иницијализованим пољима и празним бафером података. Да бисте додали податке у поруку, потребно је да их копирате у бафер блока података:

memcpy(msg->b_rptr, data, size);

где подаци је показивач на извор података, и величина - њихова величина.
онда морате да ажурирате показивач на тачку писања тако да поново показује на почетак слободне области у баферу:

msg->b_wptr = msg->b_wptr + size

Ако треба да креирате поруку из постојећег бафера, без копирања, користите функцију:

mblk_t *esballoc(uint8_t *buf, int size, int pri, void (*freefn)(void*)); 

Функција ће, након креирања поруке и структуре блока података, конфигурисати своје показиваче на податке на адреси буф. Оне. у овом случају бафер података се не налази иза поља заглавља блока података, као што је био случај када се креира блок података са функцијом датаб_аллоц(). Бафер са подацима прослеђеним функцији остаће тамо где је био, али ће уз помоћ показивача бити прикачен за новокреирано заглавље блока података, а то, сходно томе, и за поруку.

На једну поруку мблк_т Неколико блокова података може бити повезано узастопно. Ово ради функција:

mblk_t * appendb(mblk_t *mp, const char *data, int size, bool_t pad); 

mp — порука којој ће бити додат још један блок података;
подаци — показивач на блок, чија ће копија бити додата поруци;
величина — величина података;
пад — ознака да величина додељене меморије мора да буде поравната дуж границе од 4 бајта (допуна ће се вршити нулама).

Ако у постојећем баферу података за поруке има довољно простора, нови подаци ће бити налепљени иза података који су већ тамо. Ако има мање слободног простора у баферу података поруке од величина, онда се креира нова порука са довољном величином бафера и подаци се копирају у њен бафер. Ово је нова порука, повезана са оригиналном помоћу показивача б_цонт. У овом случају, порука се претвара у тупле.

Ако треба да додате још један блок података у тупле, онда морате да користите функцију:

void msgappend(mblk_t *mp, const char *data, int size, bool_t pad);

она ће пронаћи последњу поруку у тупле (он има б_цонт биће нулл) и позваће функцију за ову поруку аппендб().

Можете сазнати величину података у поруци или тупле користећи функцију:

int msgdsize(const mblk_t *mp);

проћи ће кроз све поруке у тупле-у и вратити укупну количину података у баферима података тих порука. За сваку поруку, количина података се израчунава на следећи начин:

 mp->b_wptr - mp->b_rptr

Да бисте комбиновали две торке, користите функцију:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

она додаје тупле невм до репа тупле mp и враћа показивач на последњу поруку резултујуће тупле.

Ако је потребно, тупле се може претворити у једну поруку са једним блоком података; то ради функција:

void msgpullup(mblk_t *mp,int len);

ако аргумент Лен је -1, тада се величина додељеног бафера одређује аутоматски. Ако Лен је позитиван број, креираће се бафер ове величине и подаци о поруци ће бити копирани у њега. Ако бафер понестане, копирање ће се ту зауставити. Прва порука тупле ће добити бафер нове величине са копираним подацима. Преостале поруке ће бити избрисане, а меморија враћена у гомилу.

Приликом брисања структуре мблк_т референтни број блока података се узима у обзир ако се при позивању фрееб() испоставило се да је нула, тада се бафер података брише заједно са инстанцом мблк_т, што указује на то.

Иницијализација поља нове поруке:

void mblk_init(mblk_t *mp);

Додавање још једног податка у поруку:

mblk_t * appendb(mblk_t *mp, const char *data, size_t size, bool_t pad);

Ако се нови подаци не уклапају у слободан простор бафера података поруке, тада се уз поруку прилаже посебно креирана порука са бафером потребне величине (у првој поруци се поставља показивач на додату поруку) и порука се претвара у тупле.

Додавање дела података у тупле:

void msgappend(mblk_t *mp, const char *data, size_t size, bool_t pad); 

Функција позива аппендб() у петљи.

Комбиновање две тупле у једну:

mblk_t *concatb(mblk_t *mp, mblk_t *newm);

Порука невм биће приложено mp.

Прављење копије једне поруке:

mblk_t *copyb(const mblk_t *mp);

Комплетно копирање тупле са свим блоковима података:

mblk_t *copymsg(const mblk_t *mp);

Елементе тупле копира функција цопиб().

Направите једноставну копију поруке мблк_т. У овом случају, блок података се не копира, већ се његов референтни бројач повећава дб_реф:

mblk_t *dupb(mblk_t *mp);

Прављење лаке копије торке. Блокови података се не копирају, само се њихови референтни бројачи повећавају дб_реф:

mblk_t *dupmsg(mblk_t* m);

Лепљење свих порука торке у једну поруку:

void msgpullup(mblk_t *mp,size_t len);

Ако аргумент Лен је -1, тада се величина додељеног бафера одређује аутоматски.

Брисање поруке, тупле:

void freemsg(mblk_t *mp);

Број референци блока података се смањује за један. Ако достигне нулу, блок података се такође брише.

Израчунавање укупне количине података у поруци или тупле.

size_t msgdsize(const mblk_t *mp);

Преузимање поруке са репа реда:

mblk_t *ms_queue_peek_last (q);

Копирање садржаја резервисаних поља једне поруке у другу поруку (у ствари, ова поља садрже заставице које користи медијски стример):

mblk_meta_copy(const mblk_t *source, mblk *dest);

Ред чекања куеуе_т

Ред порука у медиа стреамеру је имплементиран као кружна двоструко повезана листа. Сваки елемент листе садржи показивач на блок података са узорцима сигнала. Испоставило се да се само показивачи на блок података крећу редом, док сами подаци остају непомични. Оне. премештају се само везе ка њима.
Структура која описује ред куеуе_т, приказано испод:

typedef struct _queue
{
   mblk_t _q_stopper; /* "Холостой" элемент очереди, не указывает на данные, используется только для управления очередью. При инициализации очереди (qinit()) его указатели настраиваются так, чтобы они указывали на него самого. */
   int q_mcount;        // Количество элементов в очереди.
} queue_t;

Структура садржи поље – показивач _к_стоппер укуцајте *мблк_т, он указује на први елемент (поруку) у реду. Друго поље структуре је бројач порука у реду.
Слика испод приказује ред по имену к1 који садржи 4 поруке м1, м2, м3, м4.
Истраживање Медиастреамер2 ВоИП механизма. Део 11
Следећа слика приказује ред по имену к1 који садржи 4 поруке м1,м2,м3,м4. Порука м2 је глава торке која садржи још две поруке м2_1 и м2_2.

Истраживање Медиастреамер2 ВоИП механизма. Део 11

Функције за рад са редовима куеуе_т

Иницијализација реда:

void qinit(queue_t *q);

Поље _к_стоппер (у даљем тексту ћемо га звати „стопер“) иницијализује функција мблк_инит(), његов претходни елемент и показивач следећег елемента су прилагођени да указују на себе. Бројач елемената реда се ресетује на нулу.

Додавање новог елемента (поруке):

void putq(queue_t *q, mblk_t *m);

Нови елемент m се додаје на крај листе, показивачи елемента се подешавају тако да му стопер постане следећи елемент, а он претходни елемент за стопер. Бројач елемената реда се повећава.

Преузимање елемента из реда:

mblk_t * getq(queue_t *q); 

Порука која долази након заустављача се преузима, а бројач елемената се смањује. Ако у реду нема елемената осим стопера, онда се враћа 0.

Уметање поруке у ред:

void insq(queue_t *q, mblk_t *emp, mblk_t *mp); 

Елемент mp уметнут пре елемента ЕМП. Ако ЕМП=0, тада се порука додаје на реп реда.

Преузимање поруке са главе реда:

void remq(queue_t *q, mblk_t *mp); 

Бројач елемената се смањује.

Читање показивача на први елемент у реду:

mblk_t * peekq(queue_t *q); 

Уклањање свих елемената из реда уз брисање самих елемената:

void flushq(queue_t *q, int how);

Расправа како не користи. Бројач елемената реда је постављен на нулу.

Макро за читање показивача на последњи елемент реда:

mblk_t * qlast(queue_t *q);

Када радите са редовима порука, имајте на уму да када зовете мс_куеуе_пут(к, м) са нултим показивачем на поруку, функција се понавља. Ваш програм ће се замрзнути. понаша се слично мс_куеуе_нект(к, м).

Повезивање филтера

Горе описани ред се користи за прослеђивање порука од једног филтера до другог или од једног до неколико филтера. Филтери и њихове везе формирају усмерени граф. Улаз или излаз филтера ће се звати општом речју „пин“. Да би описао редослед којим су филтери повезани један са другим, медијски стример користи концепт „сигналне тачке“. Сигнална тачка је структура _МСЦПоинт, који садржи показивач на филтер и број једног од његових пинова; сходно томе, описује везу једног од улаза или излаза филтера.

Сигнална тачка графика обраде података

typedef struct _MSCPoint{
struct _MSFilter *filter; // Указатель на фильтр медиастримера.
int pin;                        // Номер одного из входов или выходов фильтра, т.е. пин.
} MSCPoint;

Игле филтера су нумерисане почевши од нуле.

Повезивање два пина редом порука је описано структуром _МСКуеуе, који садржи ред порука и показиваче на две сигналне тачке које повезује:

typedef struct _MSQueue
{
queue_t q;
MSCPoint prev;
MSCPoint next;
}MSQueue;

Ову структуру ћемо назвати сигналном везом. Сваки медијски стреамер филтер садржи табелу улазних веза и табелу излазних веза (МСКуеуе). Величина табела се поставља приликом креирања филтера; то смо већ урадили користећи извезену променљиву типа МСФилтерДесц, када смо развили сопствени филтер. Испод је структура која описује било који филтер у медијском стримеру, МСФилтер:


struct _MSFilter{
    MSFilterDesc *desc;    /* Указатель на дескриптор фильтра. */
    /* Защищенные атрибуты, их нельзя сдвигать или убирать иначе будет нарушена работа с плагинами. */
    ms_mutex_t lock;      /* Семафор. */
    MSQueue **inputs;     /* Таблица входных линков. */
    MSQueue **outputs;    /* Таблица выходных линков. */
    struct _MSFactory *factory; /* Указатель на фабрику, которая создала данный экземпляр фильтра. */
    void *padding;              /* Не используется, будет задействован если добавятся защищенные поля. */
    void *data;                 /* Указатель на произвольную структуру для хранения данных внутреннего состояния фильтра и промежуточных вычислений. */
    struct _MSTicker *ticker;   /* Указатель на объект тикера, который не должен быть нулевым когда вызывается функция process(). */
    /*private attributes, they can be moved and changed at any time*/
    MSList *notify_callbacks; /* Список обратных вызовов, используемых для обработки событий фильтра. */
    uint32_t last_tick;       /* Номер последнего такта, когда выполнялся вызов process(). */ 
    MSFilterStats *stats;     /* Статистика работы фильтра.*/
    int postponed_task; /*Количество отложенных задач. Некоторые фильтры могут откладывать обработку данных (вызов process()) на несколько тактов.*/
    bool_t seen;  /* Флаг, который использует тикер, чтобы помечать что этот экземпляр фильтра он уже обслужил на данном такте.*/
};
typedef struct _MSFilter MSFilter;

Након што смо повезали филтере у Ц програму у складу са нашим планом (али нисмо повезали тикер), креирали смо усмерени граф чији су чворови инстанце структуре МСФилтер, а ивице су примери веза МСКуеуе.

Активности иза кулиса

Када сам вам рекао да је тикер филтер за извор крпеља, није била цела истина о томе. Тикер је објекат који покреће функције на сату процес() сви филтери кола (графа) на које је повезан. Када повежемо тикер са филтером графикона у Ц програму, приказујемо тикер график који ће од сада контролисати све док га не искључимо. Након повезивања, тикер почиње да испитује граф који му је поверен на бригу, састављајући листу филтера који га укључују. Да не би „пребројао“ исти филтер двапут, он означава откривене филтере тако што ће у њих ставити поље за потврду види. Претрага се врши помоћу табела веза које сваки филтер има.

Током свог уводног обиласка графикона, тикер проверава да ли међу филтерима постоји бар један који делује као извор блокова података. Ако их нема, онда се графикон сматра нетачним и ознака пада.

Ако се покаже да је графикон „тачан“, за сваки пронађени филтер, функција се позива за иницијализацију препроцесс(). Чим дође тренутак за следећи циклус обраде (подразумевано сваких 10 милисекунди), ознака позива функцију процес() за све претходно пронађене изворне филтере, а затим за преостале филтере на листи. Ако филтер има улазне везе, покрените функцију процес() понавља све док редови улазних веза нису празни. Након тога, прелази на следећи филтер на листи и „помиче“ га док улазне везе не буду ослобођене порука. Ознака се помера од филтера до филтера док се листа не заврши. Овим се завршава обрада циклуса.

Сада ћемо се вратити на торке и разговарати о томе зашто је такав ентитет додат у медијски стример. Генерално, количина података коју захтева алгоритам који ради унутар филтера се не поклапа и није вишекратник величине бафера података примљених на улазу. На пример, пишемо филтер који врши брзу Фуријеову трансформацију, који по дефиницији може да обрађује само блокове података чија је величина степен два. Нека буде 512 тачака. Ако се подаци генеришу преко телефонског канала, тада ће нам бафер података сваке поруке на улазу донети 160 узорака сигнала. У искушењу је не прикупљати податке са улаза док не постоји потребна количина података. Али у овом случају ће се десити колизија са тикером, који ће безуспешно покушати да скролује филтер док улазна веза не буде празна. Раније смо ово правило означили као трећи принцип филтера. Према овом принципу, функција филтера процесс() мора узети све податке из улазних редова.

Поред тога, неће бити могуће узети само 512 узорака са улаза, пошто можете узети само целе блокове, тј. филтер ће морати да узме 640 узорака и користи 512 од њих, остатак пре него што акумулира нови део података. Дакле, наш филтер, поред свог главног рада, мора да обезбеди и помоћне радње за међускладиштење улазних података. Програмери медијског стримера и решења овог општег проблема развили су посебан објекат - МСБуфферизер (буфферер), који овај проблем решава помоћу тупле-а.

Баферизер (МСБуфферизер)

Ово је објекат који ће акумулирати улазне податке унутар филтера и почети да их шаље на обраду чим количина информација буде довољна за покретање алгоритма филтера. Док бафер акумулира податке, филтер ће радити у режиму мировања, без искоришћавања процесорске снаге. Али чим функција читања из бафера врати вредност различиту од нуле, функција процесс() филтера почиње да узима и обрађује податке из бафера у деловима потребне величине, све док се не исцрпе.
Подаци који још нису потребни остају у баферу као први елемент тупле, на који су прикачени наредни блокови улазних података.

Структура која описује бафер:

struct _MSBufferizer{
queue_t q; /* Очередь сообщений. */
int size; /* Суммарный размер данных находящихся в буферизаторе в данный момент. */
};
typedef struct _MSBufferizer MSBufferizer;

Функције за рад са МСБуфферизер-ом

Креирање нове инстанце бафера:

MSBufferizer * ms_bufferizer_new(void);

Меморија је додељена, иницијализована у мс_буфферизер_инит() а показивач се враћа.

Функција иницијализације:

void ms_bufferizer_init(MSBufferizer *obj); 

Ред се иницијализира q, поље величина је постављен на нулу.

Додавање поруке:

void ms_bufferizer_put(MSBufferizer *obj, mblk_t *m); 

Порука м се додаје у ред. Израчуната величина блокова података се додаје на величина.

Пренос свих порука из реда података везе у бафер q:

void ms_bufferizer_put_from_queue(MSBufferizer *obj, MSQueue *q);   

Пренос порука са везе q у баферу се врши помоћу функције мс_буфферизер_пут().

Читање из бафера:

int ms_bufferizer_read(MSBufferizer *obj, uint8_t *data, int datalen); 

Ако је величина података акумулираних у баферу мања од тражене (датален), тада функција враћа нулу, подаци се не копирају у податке. У супротном, врши се секвенцијално копирање података из торки које се налазе у баферу. Након копирања, тупле се брише и меморија се ослобађа. Копирање се завршава у тренутку када се бајтови података копирају. Ако у средини блока података понестане простора, онда ће у овој поруци блок података бити смањен на преостали некопирани део. Следећи пут када позовете, копирање ће се наставити од ове тачке.

Читање количине података која је тренутно доступна у баферу:

int ms_bufferizer_get_avail(MSBufferizer *obj); 

Враћа поље величина тампон.

Одбацивање дела података у баферу:

void ms_bufferizer_skip_bytes(MSBufferizer *obj, int bytes);

Наведени број бајтова података се преузима и одбацује. Најстарији подаци се одбацују.

Брисање свих порука у баферу:

void ms_bufferizer_flush(MSBufferizer *obj); 

Бројач података се ресетује на нулу.

Брисање свих порука у баферу:

void ms_bufferizer_uninit(MSBufferizer *obj); 

Бројач није ресетован.

Уклањање бафера и ослобађање меморије:

void ms_bufferizer_destroy(MSBufferizer *obj);  

Примери коришћења баферера могу се наћи у изворном коду неколико филтера медијских стримова. На пример, у филтеру МС_Л16_ЕНЦ, који преуређује бајтове у узорцима из редоследа мреже у ред хоста: л16.ц

У следећем чланку ћемо размотрити питање процене оптерећења тикера и како се носити са превеликим оптерећењем рачунара у медијском стримеру.

Извор: ввв.хабр.цом

Додај коментар