Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Hi all. In touch Omelnitsky Sergey. Not so long ago, I hosted a stream on reactive programming, where I talked about asynchrony in JavaScript. Today I would like to summarize this material.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

But before we start the main material, we need to make an introduction. So let's start with definitions: what are stack and queue?

Stack is a collection whose elements are retrieved on a “last in, first out” LIFO basis

Turn is a collection whose elements are obtained according to the principle (“first in, first out” FIFO

Okay, let's continue.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

JavaScript is a single-threaded programming language. This means that it has only one thread of execution and one stack where functions are queued for execution. Therefore, JavaScript can only perform one operation at a time, while other operations will wait their turn on the stack until they are called.

Call stack is a data structure that, in simple terms, records information about the place in the program where we are. If we jump into a function, we push its entry to the top of the stack. When we return from a function, we pop the topmost element from the stack and end up where we called this function from. That's all the stack can do. And now a very interesting question. How then does asynchrony work in JavasScript?

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

In fact, in addition to the stack, browsers have a special queue for working with the so-called WebAPI. Functions from this queue will be executed in order only after the stack is completely cleared. Only after that they are placed from the queue onto the stack for execution. If there is at least one element on the stack at the moment, then they cannot get on the stack. Just because of this, calling functions by timeout is often inaccurate in time, since the function cannot get from the queue to the stack while it is full.

Let's take a look at the following example and let's go through it step by step. Let's also see what happens in the system.

console.log('Hi');
setTimeout(function cb1() {
    console.log('cb1');
}, 5000);
console.log('Bye');

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

1) So far nothing is happening. The browser console is clean, the call stack is empty.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

2) Then the command console.log('Hi') is added to the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

3) And it is fulfilled

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

4) Then console.log('Hi') is removed from the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

5) Now let's move on to the setTimeout(function cb1() {… }) command. It is added to the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

6) The setTimeout(function cb1() {… }) command is executed. The browser creates a timer that is part of the Web API. It will perform a countdown.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

7) The setTimeout(function cb1() {… }) command has completed its work and is removed from the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

8) The console.log('Bye') command is added to the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

9) The console.log('Bye') command is executed.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

10) The command console.log('Bye') is removed from the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

11) After at least 5000ms have elapsed, the timer ends and puts the cb1 callback into the callback queue.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

12) The event loop takes function cb1 from the callback queue and pushes it onto the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

13) The cb1 function is executed and adds console.log('cb1') to the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

14) The console.log('cb1') command is executed.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

15) The command console.log('cb1') is removed from the call stack.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

16) Function cb1 is removed from the call stack.

Let's look at an example in dynamics:

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Well, we looked at how asynchrony is implemented in JavaScript. Now let's talk briefly about the evolution of asynchronous code.

The evolution of asynchronous code.

a(function (resultsFromA) {
    b(resultsFromA, function (resultsFromB) {
        c(resultsFromB, function (resultsFromC) {
            d(resultsFromC, function (resultsFromD) {
                e(resultsFromD, function (resultsFromE) {
                    f(resultsFromE, function (resultsFromF) {
                        console.log(resultsFromF);
                    })
                })
            })
        })
    })
});

Asynchronous programming as we know it in JavaScript can only be done with functions. They can be passed like any other variable to other functions. This is how callbacks were born. And it's cool, fun and fervent, until it turns into sadness, melancholy and sadness. Why? Yes, it's simple:

  • As the complexity of the code grows, the project quickly turns into obscure multiple nested blocks - “callback hell”.
  • Error handling can be easily overlooked.
  • You cannot return expressions with return.

With the advent of Promise, the situation has become a little better.

new Promise(function(resolve, reject) {
    setTimeout(() => resolve(1), 2000);

}).then((result) => {
    alert(result);
    return result + 2;

}).then((result) => {
    throw new Error('FAILED HERE');
    alert(result);
    return result + 2;

}).then((result) => {
    alert(result);
    return result + 2;

}).catch((e) => {
    console.log('error: ', e);
});

  • Promise chains appeared, which improved the readability of the code
  • There was a separate method of interception of errors
  • Parallel execution with Promise.all added
  • We can solve nested asynchrony with async/await

But the promise has its limitations. For example, a promise, without dancing with a tambourine, cannot be canceled, and most importantly, it works with one value.

Well, here we are smoothly approaching reactive programming. Tired? Well, the good thing is, you can go to brew some gulls, brainstorm and return to read more. And I will continue.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Reactive programming - a programming paradigm focused on data flows and the propagation of changes. Let's take a closer look at what a data stream is.

// Получаем ссылку на элемент
const input = ducument.querySelector('input');

const eventsArray = [];

// Пушим каждое событие в массив eventsArray
input.addEventListener('keyup',
    event => eventsArray.push(event)
);

Let's imagine that we have an input field. We create an array, and for each keyup of the input event, we will store the event in our array. At the same time, I would like to note that our array is sorted by time, i.e. the index of later events is greater than the index of earlier ones. Such an array is a simplified data flow model, but it is not yet a flow. In order for this array to be safely called a stream, it must be able to somehow inform subscribers that new data has arrived in it. Thus we come to the definition of flow.

Data stream

const { interval } = Rx;
const { take } = RxOperators;

interval(1000).pipe(
    take(4)
)

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Flow is an array of data sorted by time that can indicate that the data has changed. Now imagine how convenient it becomes to write code in which you need to trigger several events in different parts of the code for one action. We simply subscribe to the stream and it will tell us when changes occur. And the RxJs library can do this.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

RxJS is a library for working with asynchronous and event-based programs using observable sequences. The library provides the main type observable, several helper types (Observers, Schedulers, Subjects) and operators for working with events as with collections (map, filter, reduce, every and similar ones from JavaScript Array).

Let's understand the basic concepts of this library.

Observable, Observer, Producer

Observable is the first base type we'll look at. This class contains the main part of the RxJs implementation. It is associated with an observable stream, which can be subscribed to using the subscribe method.

Observable implements an auxiliary mechanism for creating updates, the so-called Observer. The source of values ​​for an Observer is called Producer. It can be an array, an iterator, a web socket, some kind of event, etc. So we can say that observable is a conductor between Producer and Observer.

Observable handles three kinds of Observer events:

  • next - new data
  • error - an error if the sequence terminated due to an exception. this event also implies the end of the sequence.
  • complete - a signal about the end of the sequence. This means there will be no more new data

Let's see a demo:

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

At the beginning we will process the values ​​1, 2, 3, and after 1 sec. we get 4 and end our thread.

Thinking out loud

And then I realized that it was more interesting to tell than to write about it. 😀

Subscription

When we subscribe to a stream, we create a new class subscription, which gives us the option to unsubscribe with the method unsubscribe. We can also group subscriptions using the method add. Well, it is logical that we can ungroup threads using remove. The add and remove methods accept a different subscription as input. I would like to note that when we unsubscribe, we unsubscribe from all child subscriptions as if they also called the unsubscribe method. Go ahead.

Types of streams

HOT
cold

Producer is created outside the observable
Producer is created inside observable

Data is passed at the time the observable is created
Data is provided at the time of subscription.

Need more logic to unsubscribe
Thread terminates on its own

Uses a one-to-many relationship
Uses a one-to-one relationship

All subscriptions have the same value
Subscriptions are independent

Data can be lost if there is no subscription
Reissues all stream values ​​for a new subscription

To give an analogy, I would imagine a hot stream like a movie in a cinema. At what point in time you came, from that moment you started watching. I would compare a cold stream with a call in those. support. Any caller listens to the answering machine recording from start to finish, but you can hang up with unsubscribe.

I would like to note that there are also so-called warm streams (I have met such a definition extremely rarely and only in foreign communities) - this is a stream that transforms from a cold stream to a hot one. The question arises - where to use)) I will give an example from practice.

I am working with Angular. He actively uses rxjs. To get data to the server, I expect a cold stream and I use this stream in the template using asyncPipe. If I use this pipe several times, then, returning to the definition of a cold stream, each pipe will request data from the server, which is strange to say the least. And if I convert a cold stream to a warm one, then the request will happen once.

In general, understanding the type of flows is quite difficult for beginners, but important.

Operators

return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`)
    .pipe(
        tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))),
        map(({ data }: TradeCompanyList) => data)
    );

Operators provide us with an opportunity to work with streams. They help control the events flowing in the Observable. We will consider a couple of the most popular ones, and more information about the operators can be found at the links in useful information.

Operators-of

Let's start with the helper operator of. It creates an Observable based on a simple value.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators-filter

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

The filter operator, as the name suggests, filters the stream signal. If the operator returns true, then it skips further.

Operators - take

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

take - Takes the value of the number of emits, after which the stream ends.

Operators-debounceTime

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

debounceTime - discards emitted values ​​that fall within the specified time interval between output data - after the time interval has passed, emits the last value.

const { Observable } = Rx;
const { debounceTime, take } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);

 // Испускаем значение раз в 1500мс
  setInterval(() => {
    observer.next(i++)
  }, 1500);
}).pipe(
  debounceTime(700),  // Ожидаем 700мс значения прежде чем обработать
  take(3)
);  

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators-takeWhile

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Emits values ​​until takeWhile returns false, then unsubscribes from the stream.

const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);
}).pipe(
  takeWhile( producer =>  producer < 5 )
);  

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators-combineLatest

The combined operator combineLatest is somewhat similar to promise.all. It combines multiple streams into one. After each thread has made at least one emit, we get the latest values ​​from each as an array. Further, after any emit from the combined streams, it will give new values.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

const { combineLatest, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750мс
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

combineLatest(observer_1, observer_2).pipe(take(5));

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators-zip

Zip - waits for a value from each stream and forms an array based on these values. If the value does not come from any thread, then the group will not be formed.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

const { zip, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
});

zip(observer_1, observer_2, observer_3).pipe(take(5));

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators - forkJoin

forkJoin also joins threads, but it only emits a value when all threads are complete.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

const { forkJoin, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
}).pipe(take(3));

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
}).pipe(take(5));

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
}).pipe(take(4));

forkJoin(observer_1, observer_2, observer_3);

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators-map

The map transformation operator transforms the emit value into a new one.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

const {  Observable } = Rx;
const { take, map } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++);
  }, 1000);
}).pipe(
  map(x => x * 10),
  take(3)
);

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators - share, tap

The tap operator allows you to do side effects, that is, any actions that do not affect the sequence.

The share utility operator can turn a cold stream into a hot one.

Asynchronous programming in JavaScript (Callback, Promise, RxJs )

Operators are done. Let's move on to Subject.

Thinking out loud

And then I went to drink tea. I'm tired of these examples 😀

Subject family

The subject family is a prime example of hot threads. These classes are a kind of hybrid that act as observable and observer at the same time. Since the subject is a hot stream, it must be unsubscribed from. If we talk about the main methods, then these are:

  • next - passing new data to the stream
  • error - error and thread termination
  • complete - end of the thread
  • subscribe - subscribe to a stream
  • unsubscribe - unsubscribe from the stream
  • asObservable - transform into an observer
  • toPromise - transforms into a promise

Allocate 4 5 types of subjects.

Thinking out loud

I said 4 on the stream, but it turned out they added one more. As the saying goes, live and learn.

Simple Subject new Subject()- the simplest kind of subjects. Created without parameters. Passes the values ​​that came only after the subscription.

BehaviorSubject new BehaviorSubject( defaultData<T> ) - in my opinion the most common type of subject-s. The input takes the default value. Always saves the data of the last issue, which is transmitted when subscribing. This class also has a useful value method that returns the current value of the stream.

ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number) - Optionally, it can take as the first argument the size of the buffer of values ​​​​that it will store in itself, and the second time during which we need changes.

asyncsubject new AsyncSubject() - nothing happens when subscribing, and the value will be returned only when complete. Only the last value of the stream will be returned.

WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) - The documentation is silent about it and I myself see it for the first time. Who knows what he does, write, we will add.

Phew. Well, we have considered everything that I wanted to tell today. Hope this information was helpful. You can read the list of literature on your own in the Useful Information tab.

Helpful information

Source: habr.com

Add a comment