Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Hallo alle. In Kontakt mit Omelnitsky Sergey. Vor nicht allzu langer Zeit habe ich einen Stream zum Thema reaktive Programmierung moderiert, in dem ich über Asynchronität in JavaScript gesprochen habe. Heute möchte ich dieses Material zusammenfassen.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Aber bevor wir mit dem Hauptmaterial beginnen, müssen wir eine Einführung machen. Beginnen wir also mit den Definitionen: Was sind Stack und Queue?

Stapeln ist eine Sammlung, deren Elemente nach dem LIFO-Prinzip „Last In, First Out“ abgerufen werden

Warteschlange ist eine Sammlung, deren Elemente nach dem FIFO-Prinzip („First In, First Out“) gewonnen werden

Okay, lass uns weitermachen.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

JavaScript ist eine Single-Threaded-Programmiersprache. Das bedeutet, dass es nur einen Ausführungsthread und einen Stapel gibt, in dem Funktionen zur Ausführung in die Warteschlange gestellt werden. Daher kann JavaScript jeweils nur eine Operation ausführen, während andere Operationen auf dem Stapel warten, bis sie aufgerufen werden.

Aufrufstapel ist eine Datenstruktur, die, vereinfacht gesagt, Informationen über die Stelle im Programm aufzeichnet, an der wir uns befinden. Wenn wir in eine Funktion springen, verschieben wir ihren Eintrag an die Spitze des Stapels. Wenn wir von einer Funktion zurückkehren, entfernen wir das oberste Element vom Stapel und landen dort, wo wir diese Funktion aufgerufen haben. Das ist alles, was der Stapel tun kann. Und jetzt eine sehr interessante Frage. Wie funktioniert dann Asynchronität in JavaScript?

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Tatsächlich verfügen Browser zusätzlich zum Stack über eine spezielle Warteschlange für die Arbeit mit der sogenannten WebAPI. Funktionen aus dieser Warteschlange werden der Reihe nach erst ausgeführt, nachdem der Stapel vollständig geleert wurde. Erst danach werden sie zur Ausführung aus der Warteschlange auf den Stapel gelegt. Befindet sich im Moment mindestens ein Element auf dem Stapel, können diese nicht auf den Stapel gelangen. Aus diesem Grund ist der Aufruf von Funktionen per Timeout oft zeitlich ungenau, da die Funktion nicht von der Warteschlange auf den Stapel gelangen kann, solange dieser voll ist.

Betrachten Sie das folgende Beispiel und gehen wir es Schritt für Schritt durch. Schauen wir uns auch an, was im System passiert.

console.log('Hi');
setTimeout(function cb1() {
    console.log('cb1');
}, 5000);
console.log('Bye');

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

1) Bisher passiert nichts. Die Browserkonsole ist sauber, der Aufrufstapel ist leer.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

2) Anschließend wird der Befehl console.log('Hi') zum Aufrufstapel hinzugefügt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

3) Und es ist erfüllt

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

4) Dann wird console.log('Hi') aus dem Aufrufstapel entfernt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

5) Fahren wir nun mit dem Befehl setTimeout(function cb1() {… }) fort. Es wird dem Aufrufstapel hinzugefügt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

6) Der Befehl setTimeout(function cb1() {… }) wird ausgeführt. Der Browser erstellt einen Timer, der Teil der Web-API ist. Es wird ein Countdown durchgeführt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

7) Der Befehl setTimeout(function cb1() {… }) hat seine Arbeit abgeschlossen und wird aus dem Aufrufstapel entfernt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

8) Der Befehl console.log('Bye') wird zum Aufrufstapel hinzugefügt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

9) Der Befehl console.log('Bye') wird ausgeführt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

10) Der Befehl console.log('Bye') wird aus dem Aufrufstapel entfernt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

11) Nach Ablauf von mindestens 5000 ms endet der Timer und stellt den cb1-Callback in die Callback-Warteschlange.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

12) Die Ereignisschleife übernimmt die Funktion cb1 aus der Rückrufwarteschlange und schiebt sie auf den Aufrufstapel.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

13) Die cb1-Funktion wird ausgeführt und fügt console.log('cb1') zum Aufrufstapel hinzu.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

14) Der Befehl console.log('cb1') wird ausgeführt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

15) Der Befehl console.log('cb1') wird aus dem Aufrufstapel entfernt.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

16) Funktion cb1 wird aus dem Aufrufstapel entfernt.

Schauen wir uns ein Beispiel in der Dynamik an:

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Nun, wir haben uns angesehen, wie Asynchronität in JavaScript implementiert wird. Lassen Sie uns nun kurz über die Entwicklung des asynchronen Codes sprechen.

Die Entwicklung des asynchronen Codes.

a(function (resultsFromA) {
    b(resultsFromA, function (resultsFromB) {
        c(resultsFromB, function (resultsFromC) {
            d(resultsFromC, function (resultsFromD) {
                e(resultsFromD, function (resultsFromE) {
                    f(resultsFromE, function (resultsFromF) {
                        console.log(resultsFromF);
                    })
                })
            })
        })
    })
});

Asynchrone Programmierung, wie wir sie in JavaScript kennen, kann nur mit Funktionen durchgeführt werden. Sie können wie jede andere Variable an andere Funktionen übergeben werden. So wurden Rückrufe geboren. Und es ist cool, lustig und inbrünstig, bis es in Traurigkeit, Melancholie und Traurigkeit umschlägt. Warum? Ja, es ist ganz einfach:

  • Mit zunehmender Komplexität des Codes verwandelt sich das Projekt schnell in undurchsichtige, mehrfach verschachtelte Blöcke – die „Callback-Hölle“.
  • Die Fehlerbehandlung kann leicht übersehen werden.
  • Sie können Ausdrücke nicht mit return zurückgeben.

Mit dem Aufkommen von Promise hat sich die Situation etwas verbessert.

new Promise(function(resolve, reject) {
    setTimeout(() => resolve(1), 2000);

}).then((result) => {
    alert(result);
    return result + 2;

}).then((result) => {
    throw new Error('FAILED HERE');
    alert(result);
    return result + 2;

}).then((result) => {
    alert(result);
    return result + 2;

}).catch((e) => {
    console.log('error: ', e);
});

  • Es erschienen Promise-Ketten, die die Lesbarkeit des Codes verbesserten
  • Es gab eine separate Methode zum Abfangen von Fehlern
  • Parallele Ausführung mit Promise.all hinzugefügt
  • Wir können verschachtelte Asynchronität mit async/await lösen

Aber das Versprechen hat seine Grenzen. Zum Beispiel kann ein Versprechen, ohne mit einem Tamburin zu tanzen, nicht storniert werden, und vor allem funktioniert es mit einer Bedeutung.

Nun, hier nähern wir uns reibungslos der reaktiven Programmierung. Müde? Das Gute ist, Sie können ein paar Möwen brauen, ein Brainstorming durchführen und dann zurückkommen, um mehr zu lesen. Und ich werde weitermachen.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Reaktive Programmierung - ein Programmierparadigma, das sich auf Datenflüsse und die Verbreitung von Änderungen konzentriert. Schauen wir uns genauer an, was ein Datenstrom ist.

// Получаем ссылку на элемент
const input = ducument.querySelector('input');

const eventsArray = [];

// Пушим каждое событие в массив eventsArray
input.addEventListener('keyup',
    event => eventsArray.push(event)
);

Stellen wir uns vor, wir hätten ein Eingabefeld. Wir erstellen ein Array und speichern das Ereignis für jede Eingabe des Eingabeereignisses in unserem Array. Gleichzeitig möchte ich darauf hinweisen, dass unser Array nach Zeit sortiert ist, d.h. der Index späterer Ereignisse ist größer als der Index früherer. Ein solches Array ist ein vereinfachtes Datenflussmodell, aber noch kein Fluss. Damit dieses Array sicher als Stream bezeichnet werden kann, muss es Abonnenten irgendwie darüber informieren können, dass neue Daten darin angekommen sind. Damit kommen wir zur Definition des Flusses.

Datenstrom

const { interval } = Rx;
const { take } = RxOperators;

interval(1000).pipe(
    take(4)
)

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Fluss ist ein nach Zeit sortiertes Datenarray, das darauf hinweisen kann, dass sich die Daten geändert haben. Stellen Sie sich nun vor, wie praktisch es ist, Code zu schreiben, bei dem Sie für eine Aktion mehrere Ereignisse in verschiedenen Teilen des Codes auslösen müssen. Wir abonnieren einfach den Stream und er informiert uns über Änderungen. Und die RxJs-Bibliothek kann dies tun.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

RxJS ist eine Bibliothek für die Arbeit mit asynchronen und ereignisbasierten Programmen unter Verwendung beobachtbarer Sequenzen. Die Bibliothek stellt den Haupttyp bereit Beobachtbar, mehrere Hilfstypen (Beobachter, Planer, Subjekte) und Operatoren für die Arbeit mit Ereignissen wie mit Sammlungen (Zuordnen, Filtern, Reduzieren, alles und ähnliche aus dem JavaScript-Array).

Lassen Sie uns die Grundkonzepte dieser Bibliothek verstehen.

Beobachtbar, Beobachter, Produzent

Observable ist der erste Basistyp, den wir betrachten werden. Diese Klasse enthält den Hauptteil der RxJs-Implementierung. Es ist einem beobachtbaren Stream zugeordnet, der mit der subscribe-Methode abonniert werden kann.

Observable implementiert einen Hilfsmechanismus zum Erstellen von Updates, den sogenannten Beobachter. Die Wertequelle für einen Beobachter wird aufgerufen Produzent. Es kann ein Array, ein Iterator, ein Web-Socket, eine Art Ereignis usw. sein. Wir können also sagen, dass Observable ein Dirigent zwischen Produzent und Beobachter ist.

Observable verarbeitet drei Arten von Observer-Ereignissen:

  • als nächstes - neue Daten
  • Fehler – ein Fehler, wenn die Sequenz aufgrund einer Ausnahme beendet wurde. Dieses Ereignis impliziert auch das Ende der Sequenz.
  • abgeschlossen – ein Signal über das Ende der Sequenz. Dies bedeutet, dass es keine neuen Daten mehr geben wird

Sehen wir uns eine Demo an:

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Zu Beginn verarbeiten wir die Werte 1, 2, 3 und nach 1 Sek. Wir bekommen 4 und beenden unseren Thread.

Lautes Denken

Und dann wurde mir klar, dass es interessanter war, darüber zu erzählen, als darüber zu schreiben. 😀

Abonnement

Wenn wir einen Stream abonnieren, erstellen wir eine neue Klasse Abonnement, was uns die Möglichkeit gibt, sich mit der Methode abzumelden abmelden. Mit dieser Methode können wir auch Abonnements gruppieren hinzufügen. Nun, es ist logisch, dass wir die Gruppierung von Threads mit aufheben können entfernen. Die Add- und Remove-Methoden akzeptieren ein anderes Abonnement als Eingabe. Ich möchte darauf hinweisen, dass wir uns beim Abmelden von allen untergeordneten Abonnements abmelden, als ob sie auch die Abmeldemethode aufrufen würden. Fortfahren.

Arten von Streams

HEISS
KALT

Der Produzent entsteht außerhalb des Beobachtbaren
Der Produzent wird innerhalb des Beobachtbaren erstellt

Die Daten werden zum Zeitpunkt der Erstellung des Observablen übergeben
Die Daten werden zum Zeitpunkt des Abonnements bereitgestellt.

Benötigen Sie mehr Logik, um sich abzumelden
Der Thread wird von selbst beendet

Verwendet eine Eins-zu-Viele-Beziehung
Verwendet eine Eins-zu-eins-Beziehung

Alle Abonnements haben den gleichen Wert
Abonnements sind unabhängig

Wenn kein Abonnement besteht, kann es zu Datenverlust kommen
Gibt alle Streamwerte für ein neues Abonnement erneut aus

Als Vergleich würde ich mir einen heißen Stream wie einen Film im Kino vorstellen. Zu welchem ​​Zeitpunkt du gekommen bist, von diesem Moment an hast du angefangen zuzusehen. Da würde ich einen Cold Stream mit einem Call vergleichen. Unterstützung. Jeder Anrufer hört sich die Aufzeichnung des Anrufbeantworters von Anfang bis Ende an, aber Sie können mit Abmelden auflegen.

Ich möchte anmerken, dass es auch sogenannte warme Bäche gibt (eine solche Definition habe ich äußerst selten und nur in ausländischen Communities getroffen) – das ist ein Bach, der sich von einem kalten in einen heißen Bach verwandelt. Es stellt sich die Frage, wo man es verwenden soll)) Ich gebe ein Beispiel aus der Praxis.

Ich arbeite mit Angular. Er nutzt aktiv RXJS. Um Daten an den Server zu übertragen, erwarte ich einen kalten Stream und verwende diesen Stream in der Vorlage mithilfe von asyncPipe. Wenn ich diese Pipe mehrmals verwende, kehre ich zur Definition eines kalten Streams zurück und jede Pipe fordert Daten vom Server an, was gelinde gesagt seltsam ist. Und wenn ich einen kalten Stream in einen warmen umwandle, erfolgt die Anfrage einmal.

Im Allgemeinen ist es für Anfänger recht schwierig, die Art der Strömungen zu verstehen, aber wichtig.

Betreiber

return this.http.get(`${environment.apiUrl}/${this.apiUrl}/trade_companies`)
    .pipe(
        tap(({ data }: TradeCompanyList) => this.companies$$.next(cloneDeep(data))),
        map(({ data }: TradeCompanyList) => data)
    );

Betreiber bieten uns die Möglichkeit, mit Streams zu arbeiten. Sie helfen dabei, die im Observable fließenden Ereignisse zu kontrollieren. Wir werden einige der beliebtesten betrachten. Weitere Informationen zu den Betreibern finden Sie unter den Links in „Nützliche Informationen“.

Betreiber von

Beginnen wir mit dem Hilfsoperator von. Es erstellt ein Observable basierend auf einem einfachen Wert.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Operatoren-Filter

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Der Filteroperator filtert, wie der Name schon sagt, das Stream-Signal. Wenn der Operator „true“ zurückgibt, wird weiter gesprungen.

Operatoren - nehmen

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

take – Nimmt den Wert der Anzahl der Emission, nach der der Stream endet.

Operators-debounceTime

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

debounceTime – verwirft ausgegebene Werte, die in das angegebene Zeitintervall zwischen Ausgabedaten fallen – gibt nach Ablauf des Zeitintervalls den letzten Wert aus.

const { Observable } = Rx;
const { debounceTime, take } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);

 // Испускаем значение раз в 1500мс
  setInterval(() => {
    observer.next(i++)
  }, 1500);
}).pipe(
  debounceTime(700),  // Ожидаем 700мс значения прежде чем обработать
  take(3)
);  

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Operatoren-takeWhile

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Gibt Werte aus, bis takeWhile false zurückgibt, und meldet sich dann vom Thread ab.

const { Observable } = Rx;
const { debounceTime, takeWhile } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  observer.next(i++);
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++)
  }, 1000);
}).pipe(
  takeWhile( producer =>  producer < 5 )
);  

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Operatoren-combineLatest

Der kombinierte Operator „combineLatest“ ähnelt in gewisser Weise „promise.all“. Es kombiniert mehrere Streams zu einem. Nachdem jeder Thread mindestens eine Emission durchgeführt hat, erhalten wir die neuesten Werte von jedem als Array. Darüber hinaus werden nach jeder Ausgabe aus den kombinierten Streams neue Werte ausgegeben.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

const { combineLatest, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750мс
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

combineLatest(observer_1, observer_2).pipe(take(5));

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Operatoren-zip

Zip – wartet auf einen Wert von jedem Stream und bildet ein Array basierend auf diesen Werten. Wenn der Wert aus keinem Thread stammt, wird die Gruppe nicht gebildet.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

const { zip, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
});

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
});

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
});

zip(observer_1, observer_2, observer_3).pipe(take(5));

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Operatoren – forkJoin

forkJoin verbindet auch Threads, gibt jedoch nur dann einen Wert aus, wenn alle Threads abgeschlossen sind.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

const { forkJoin, Observable } = Rx;
const { take } = RxOperators;

const observer_1 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next('a: ' + i++);
  }, 1000);
}).pipe(take(3));

const observer_2 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 750
  setInterval(() => {
    observer.next('b: ' + i++);
  }, 750);
}).pipe(take(5));

const observer_3 = Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 500
  setInterval(() => {
    observer.next('c: ' + i++);
  }, 500);
}).pipe(take(4));

forkJoin(observer_1, observer_2, observer_3);

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Operatoren-Karte

Der Kartentransformationsoperator wandelt den Emissionswert in einen neuen um.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

const {  Observable } = Rx;
const { take, map } = RxOperators;

Observable.create((observer) => {
  let i = 1;
  // Испускаем значение раз в 1000мс
  setInterval(() => {
    observer.next(i++);
  }, 1000);
}).pipe(
  map(x => x * 10),
  take(3)
);

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Betreiber – teilen, tippen

Mit dem Tap-Operator können Sie Nebeneffekte ausführen, also alle Aktionen, die sich nicht auf die Sequenz auswirken.

Der Shared-Utility-Betreiber kann einen kalten Strom in einen heißen Strom umwandeln.

Asynchrone Programmierung in JavaScript (Callback, Promise, RxJs)

Die Operatoren sind fertig. Kommen wir zum Betreff.

Lautes Denken

Und dann ging ich Tee trinken. Ich habe diese Beispiele satt 😀

Betreff Familie

Die Themenfamilie ist ein Paradebeispiel für heiße Threads. Bei diesen Klassen handelt es sich um eine Art Hybrid, der gleichzeitig als Observable und Beobachter fungiert. Da es sich bei dem Betreff um einen Hot Stream handelt, muss dieser abgemeldet werden. Wenn wir über die wichtigsten Methoden sprechen, dann sind dies:

  • Als nächstes – Übergabe neuer Daten an den Stream
  • Fehler – Fehler und Thread-Beendigung
  • abgeschlossen – Ende des Threads
  • Abonnieren – Abonnieren Sie einen Stream
  • abbestellen – Abmelden vom Stream
  • asObservable – in einen Beobachter verwandeln
  • toPromise – verwandelt sich in ein Versprechen

Ordnen Sie 4 5 Arten von Themen zu.

Lautes Denken

Ich habe im Stream 4 gesagt, aber es stellte sich heraus, dass sie noch einen hinzugefügt hatten. Wie das Sprichwort sagt: Lebe und lerne.

Einfaches Thema new Subject()- die einfachste Art von Themen. Ohne Parameter erstellt. Übergibt die Werte, die erst nach dem Abonnement kamen.

BehaviorSubject new BehaviorSubject( defaultData<T> ) - meiner Meinung nach die häufigste Art von Fächern. Die Eingabe übernimmt den Standardwert. Speichert immer die Daten der letzten Ausgabe, die beim Abonnieren übermittelt werden. Diese Klasse verfügt außerdem über eine nützliche Wertmethode, die den aktuellen Wert des Streams zurückgibt.

ReplaySubject new ReplaySubject(bufferSize?: number, windowTime?: number) - Optional kann es als erstes Argument die Größe des Wertepuffers annehmen, den es in sich selbst speichert, und als zweites Argument die Zeit, in der wir Änderungen benötigen.

asyncsubject new AsyncSubject() - Beim Abonnieren passiert nichts und der Wert wird erst zurückgegeben, wenn der Vorgang abgeschlossen ist. Es wird nur der letzte Wert des Streams zurückgegeben.

WebSocketSubject new WebSocketSubject(urlConfigOrSource: string | WebSocketSubjectConfig<T> | Observable<T>, destination?: Observer<T>) - Die Dokumentation schweigt darüber und ich selbst sehe es zum ersten Mal. Wer weiß, was er tut, schreiben, wir werden hinzufügen.

Puh. Nun, wir haben alles bedacht, was ich heute erzählen wollte. Ich hoffe, diese Informationen waren hilfreich. Die Literaturliste können Sie im Reiter „Nützliche Informationen“ selbst lesen.

Eine nützliche Information

Source: habr.com

Kommentar hinzufügen