Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Всім привіт. На зв'язку Сергій Омельницький. Нещодавно я вів стрим по реактивному програмуванню, де розповідав про асинхронність у JavaScript. Сьогодні я хотів би законспектувати цей матеріал.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Але перед тим як розпочати основний матеріал нам потрібно зробити вступну. Отже, почнемо з визначень: що таке стек і черга?

Стек - це колекція, елементи якої отримують за принципом «останній увійшов, перший вийшов» LIFO

черга - Це колекція, елементи якої отримують за принципом («перший увійшов, перший вийшов» FIFO

Окей, продовжимо.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

JavaScript – це однопотокова мова програмування. Це означає, що він є тільки один потік виконання і один стек, який міститься функції в чергу на виконання. Отже в один момент часу JavaScript може виконати тільки одну операцію, інші операції при цьому чекатимуть на свою чергу в стеку, поки їх не викличуть.

Стек викликів — це структура даних, яка спрощено кажучи записує відомості про місце в програмі, де ми знаходимося. Якщо ми переходимо в функцію, поміщаємо запис про неї у верхню частину стека. Коли ми з функції повертаємося, ми витягуємо зі стека найвищий елемент і опиняємось там, звідки викликали цю функцію. Це все, що вміє стек. А тепер дуже цікаве питання. Як тоді працює асинхронність у Javascript?

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Насправді крім стека в браузерах є особлива черга для роботи з так званим WebAPI. Функції цієї черги виконуються по порядку тільки після того, як стек буде повністю очищений. Тільки після цього вони поміщаються з черги до стеку на виконання. Якщо в стеку зараз знаходиться хоча б один елемент, то вони в стек потрапити не можуть. Саме через це виклик функцій по таймууту часто буває не точним за часом, тому що функція не може потрапити з черги в стек, поки він заповнений.

Розглянемо наступний приклад і займемося його покроковим виконанням. Також подивимося, що при цьому відбувається у системі.

console.log('Hi');
setTimeout(function cb1() {
    console.log('cb1');
}, 5000);
console.log('Bye');

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

1) Поки що нічого не відбувається. Консоль браузера чиста, стек викликів порожній.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

2) Потім команда console.log('Hi') додається до стек викликів.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

3) І вона виконується

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

4) Потім console.log('Hi') видаляється зі стека викликів.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

5) Тепер переходимо до команди setTimeout(function cb1() {…}). Вона додається до стека дзвінків.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

6) Команда setTimeout(function cb1() {…}) виконується. Браузер створює таймер, який є частиною Web API. Він виконуватиме зворотній відлік часу.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

7) Команда setTimeout(function cb1() {… }) завершила роботу та видаляється зі стека викликів.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

8) Команда console.log('Bye') додається до стека дзвінків.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

9) Команда console.log('Bye') виконується.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

10) Команда console.log('Bye') видаляється зі стека викликів.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

11) Після того, як пройдуть, як мінімум, 5000 мс., таймер завершує роботу і поміщає коллбек cb1 в чергу коллбеків.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

12) Цикл подій бере c функцію cb1 з черги коллбеків і поміщає їх у стек викликів.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

13) Функція cb1 виконується та додає console.log('cb1') у стек дзвінків.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

14) Команда console.log('cb1') виконується.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

15) Команда console.log('cb1') видаляється зі стека викликів.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

16) Функція cb1 видаляється зі стека дзвінків.

Погляньмо на приклад у динаміці:

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Ну ось ми і розглянули як у JavaScript реалізована асинхронність. Тепер давайте коротко поговоримо про еволюцію асинхронного коду.

Еволюція асинхронного коду

a(function (resultsFromA) {
    b(resultsFromA, function (resultsFromB) {
        c(resultsFromB, function (resultsFromC) {
            d(resultsFromC, function (resultsFromD) {
                e(resultsFromD, function (resultsFromE) {
                    f(resultsFromE, function (resultsFromF) {
                        console.log(resultsFromF);
                    })
                })
            })
        })
    })
});

Асинхронне програмування, яким ми знаємо його в JavaScript, може бути реалізовано тільки функціями. Вони можуть бути передані як будь-яка інша змінна іншим функціям. Так народилися колбеки. І це прикольно, весело і задерикувато, поки не перетворюється на сум, тугу і смуток. Чому? Та все просто:

  • Зі зростанням складності коду, проект швидко перетворюється на малозрозумілі багаторазово вкладені блоки — callback hell.
  • Обробку помилок можна легко упустити.
  • Не можна повертати вирази з return.

З появою Promise ситуація стала трохи кращою.

new Promise(function(resolve, reject) {
    setTimeout(() => resolve(1), 2000);

}).then((result) => {
    alert(result);
    return result + 2;

}).then((result) => {
    throw new Error('FAILED HERE');
    alert(result);
    return result + 2;

}).then((result) => {
    alert(result);
    return result + 2;

}).catch((e) => {
    console.log('error: ', e);
});

  • З'явилися ланцюжки промісів, що покращило читання коду
  • З'явився окремий метод перехоплення помилок
  • З'явилася можливість паралельного виконання за допомогою Promise.all
  • Вкладену асинхронність ми можемо вирішити за допомогою async/await

Але проміс має свої обмеження. Наприклад, проміс, без танців з бубном, не можна скасувати, а що найголовніше — працює з одним значенням.

Ну ось ми й плавно підійшли до реактивного програмування. Втомилися? Ну благо справа можна піти заварити чайок, обмозгувати і повернутися читати далі. А я продовжу.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Реактивне програмування - Парадигма програмування, орієнтована на потоки даних і поширення змін. Давайте детальніше розберемо що таке потік даних.

// Получаем ссылку на элемент
const input = ducument.querySelector('input');

const eventsArray = [];

// Пушим каждое событие в массив eventsArray
input.addEventListener('keyup',
    event => eventsArray.push(event)
);

Припустимо, що у нас є поле введення. Ми створюємо масив, та на кожен keyup події input ми будемо зберігати подію у нашому масиві. У цьому хотілося б відзначити, наш масив відсортований за часом тобто. індекс пізніших подій більший, ніж індекс більш ранніх. Такий масив є спрощеною моделлю потоку даних, але це ще не потік. Для того, щоб цей масив можна було сміливо назвати потоком, він повинен уміти якимось чином повідомляти передплатникам, що до нього надійшли нові дані. Таким чином, ми підійшли до визначення потоку.

Потік даних

const { interval } = Rx;
const { take } = RxOperators;

interval(1000).pipe(
    take(4)
)

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

потік - Це масив даних, відсортованих за часом, який може повідомляти про те, що дані змінилися. А тепер уявіть, як зручно стає писати код, в якому на одну дію потрібно викликати кілька подій у різних ділянках коду. Ми просто робимо передплату на потік і він нам сам повідомить коли відбудуться зміни. І це вміє робити бібліотека RxJs.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

RxJS — це бібліотека для роботи з асинхронними та заснованими на подіях програмами з використанням послідовностей, що спостерігаються. Бібліотека надає основний тип Спостерігається, кілька допоміжних типів (Observer, Schedulers, Subjects) та оператори роботи з подіями як з колекціями (map, filter, reduce, every та подібні до JavaScript Array).

Розберемося з основними поняттями цієї бібліотеки.

Observable, Observer, Producer

Observable – перший базовий тип, який ми розглянемо. Цей клас містить у собі основну частину реалізації RxJs. Він пов'язаний із спостережуваним потоком, на який можна підписатися за допомогою методу subscribe.

В Observable реалізується допоміжний механізм для створення оновлень, так званий Спостерігач. Джерелом значень для Observer називається Виробник. Це може бути масив, ітератор, web socket, якась подія тощо. Так що можна сказати, що observable є провідником між Producer та Observer.

Observable обробляє три види подій Observer:

  • next – нові дані
  • error – помилку, якщо послідовність завершилася через виняткову ситуацію. ця подія також передбачає завершення послідовності.
  • complete - сигнал про завершення послідовності. Це означає, що нових даних більше не буде

Подивимося демо:

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Спочатку ми обробимо значення 1, 2, 3, а через 1 сек. ми отримаємо 4 та завершимо наш потік.

Думки вголос

І тут я зрозумів, що розповідати було цікавіше, ніж писати про це. 😀

Підписка

Коли ми робимо передплату на потік, ми створюємо новий клас передплата, який дає нам можливість скасувати передплату за допомогою методу відмовитися від підписки. Також ми можемо згрупувати підписки за допомогою методу додавати. Ну і логічно, що ми можемо розгрупувати потоки за допомогою видаляти. Методи add та remove на вхід приймають іншу підписку. Хотілося б відзначити, що коли ми робимо відписку, то ми відписуємося від усіх дочірніх підписок начебто й у них викликали метод unsubscribe. Йдемо далі.

Види потоків

ГАРЯЧА
Холодний

Producer створюється зовні observable
Producer створюється всередині observable

Дані передаються в момент створення observable
Дані повідомляються у момент передплати

Потрібна додаткова логіка для відписки
Потік завершується самостійно

Використовує зв'язок один до багатьох
Використовує зв'язок виду один до одного

Усі передплати мають єдине значення
Підписки незалежні

Дані можна втратити, якщо немає передплати
Перевидає всі значення потоку для нової передплати

Якщо наводити аналогію, то я представив би гарячий потік як фільм у кінотеатрі. В який час ти прийшов, з того моменту і почав перегляд. Холодний потік я б порівняв з дзвінком у тих. підтримку. Будь-хто, хто зателефонував, слухає запис автовідповідача від початку до кінця, але ти можеш кинути трубку за допомогою unsubscribe.

Хотілося б відзначити, що існують ще так звані warm потоки (таке визначення я зустрічав вкрай рідко і тільки на зарубіжних спільнотах) - це потік, який трансформується з холодного потоку в гарячий. Виникає питання – де використовувати)) Наведу приклад із практики.

Я працюю з ангуляром. Він активно використовує rxjs. Для отримання даних на сервер я чекаю на холодний потік і цей потік використовую в шаблоні за допомогою asyncPipe. Якщо я використовую цей пайп кілька разів, то, повертаючись до визначення холодного потоку, кожен pipe запитуватиме дані з сервера, що м'яко кажучи дивно. А якщо я перетворю холодний потік на теплий, то запит відбудеться одного разу.

Взагалі розуміння виду потоків досить складна для початківців, але важлива.

Оператори

return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`)
    .pipe(
        tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))),
        map(({ data }: TradeCompanyList) => data)
    );

Розширити можливість роботи із потоками нам надають оператори. Вони допомагають контролювати події, які відбуваються в Observable. Ми розглянемо кілька найбільш популярних, а докладніше з операторами можна ознайомитися за посиланнями в корисній інформації.

Operators - of

Почнемо з допоміжного оператора of. Він створює Observable на основі простого значення.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators - filter

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Оператор фільтрації filter, як можна зрозуміти назвою, фільтрує сигнал потоку. Якщо оператор повертає істину, пропускає далі.

Operators - take

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

take - Приймає значення кількості емітів, після якого завершує потік.

Operators - debounceTime

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

debounceTime — відкидає значення, що випускаються, які потрапляють у зазначений проміжок часу між вихідними даними — після тимчасового інтервалу емітує останнє значення.

const { Observable } = Rx;
const { debounceTime, take } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);

 // Испускаем значение раз в 1500мс
  setInterval(() => {
    observer.next(i++)
  }, 1500);
}).pipe(
  debounceTime(700),  // Ожидаем 700мс значения прежде чем обработать
  take(3)
);  

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators - takeWhile

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Емітит значення, поки не поверне false, після чого відпишеться від потоку.

const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);
}).pipe(
  takeWhile( producer =>  producer < 5 )
);  

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators - combineLatest

Комбінований оператор combineLatest чимось схожий на promise.all. Він поєднує кілька потоків в один. Після того, як кожен потік зробить хоча б один еміт, ми отримуємо останні значення від кожного у вигляді масиву. Далі, після будь-якого еміту з об'єднаних потоків він віддаватиме нові значення.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

const { combineLatest, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750мс
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

combineLatest(observer_1, observer_2).pipe(take(5));

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators - zip

Zip – чекає значення з кожного потоку та формує масив на основі цих значень. Якщо значення не прийде з якогось потоку, то група не буде сформована.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

const { zip, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
});

zip(observer_1, observer_2, observer_3).pipe(take(5));

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators - forkJoin

forkJoin також поєднує потоки, але він емітніть значення тільки коли всі потоки будуть завершені (complete).

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

const { forkJoin, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
}).pipe(take(3));

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
}).pipe(take(5));

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
}).pipe(take(4));

forkJoin(observer_1, observer_2, observer_3);

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators - map

Оператор трансформації map перетворює значення еміту на нове.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

const {  Observable } = Rx;
const { take, map } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++);
  }, 1000);
}).pipe(
  map(x => x * 10),
  take(3)
);

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Operators – share, tap

Оператор tap дозволяє робити side ефекти, тобто які-небудь дії, які не впливають на послідовність.

Утилітний оператор share здатний з холодного потоку зробити гарячим.

Асинхронне програмування в JavaScript (Callback, Promise, RxJs)

Із операторами закінчили. Перейдемо до Subject.

Думки вголос

І тут я пішов чайку пити. Втомили мене ці приклади 😀

Сімейство subject-ів

Сімейство subject-ів є яскравим прикладом гарячих потоків. Ці класи є якимось гібридом, які виступають одночасно як observable і observer. Оскільки subject є гарячим потоком, від нього необхідно відписуватися. Якщо говорити за основними методами, то це:

  • next – передача нових даних у потік
  • error – помилка та завершення потоку
  • complete – завершення потоку
  • subscribe – підписатися на потік
  • unsubscribe - відписатися від потоку
  • asObservable – трансформуємо у спостерігача
  • toPromise – трансформує у проміс

Виділяють 4 5 типів subject-ів.

Думки вголос

На стримі говорив 4, а виявилось вони ще один додали. Як то кажуть вік живи вік вчися.

Простий Subject new Subject()- Найпростіший вид subject-ів. Створюється без параметрів. Передає значення, що прийшли лише після підписки.

BehaviorSubject new BehaviorSubject( defaultData<T> ) - На мій погляд найпоширеніший вид subject-ів. На вхід приймає значення за промовчанням. Завжди зберігає дані останнього еміту, які передає під час передплати. Цей клас має також корисний метод value, який повертає поточне значення потоку.

ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number) — На вхід опціонально може прийняти першим аргументом розмір буфера значень, які він зберігатиме в собі, а другим час протягом якого нам потрібні зміни.

AsyncSubject new AsyncSubject() — при підписці нічого не відбувається, і значення буде повернено лише за умови complete. Повернеться лише останнє значення потоку.

WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) — Про нього документація мовчить, і я сам його вперше бачу. Хто знає, що він робить пишіть, доповнимо.

фуф. От ми й розглянули все, що я хотів сьогодні розповісти. Сподіваюся, ця інформація була корисною. Самостійно ознайомитись зі списком літератури можна у вкладці корисна інформація.

Корисна інформація

Джерело: habr.com

Додати коментар або відгук