Thursday, December 19, 2019

rxjs Reminders

 Links

This blog post is based on the free training video "Learn RxJS in 60 Minutes for Beginners - Free Crash Course" from Gary Simon.


Code Samples



Cold Observable

import { Observable } from 'rxjs/Observable';

// Observable producer actived when subscribed
const observable = Observable.create((observer: any) => {
    try {
        observer.next('Hey guys!'); // Emit a value
        observer.next('how are you?'); // Emit a value
        let counter = 0;
        setInterval(() => {
            observer.next(`I am good [${counter++}]`);
        }, 2000);
        // observer.complete();
        // observer.next('This will not be sent'); 
    }
    catch(ex) {
        observer.error(ex);
    }
});

const observer = observable.subscribe(
    (x:any) => { addItem(`[1] ${x}`); }, // next
    (error:any) => { addItem(`[ERROR]${error}`); }, // error
    () => { addItem('Completed'); }, // complete
);

const observer2 = observable.subscribe(
    (x:any) => { addItem(`[2] ${x}`); }, // next
);
observer.add(observer2); // Now observer2 will be canceled when observer is unsubscribed
//observer.remove(observer2); // undo the add

setTimeout(() => { // Canceled the observer to receive value after 6 seconds
    observer.unsubscribe();
}, 6001);

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

Warn Observable

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/share';

// Observable producer actived when subscribed - Cold Observer
const observable = Observable.create((observer: any) => {
    try {
        observer.next('Hey guys!'); // Emit a value
        observer.next('how are you?'); // Emit a value
        let counter = 0;
        setInterval(() => {
            observer.next(`'I am good [${counter++}]`);
        }, 2000);
    }
    catch(ex) {
        observer.error(ex);
    }
}).share(); // Make it a warn Observable, subscriber only receive 
            // the stream from the moment they subscribed rather 
            // then from the beginning

const observer = observable.subscribe(
    (x:any) => { addItem(`[1] ${x}`); }, // next
    (error:any) => { addItem(`[ERROR]${error}`); }, // error
    () => { addItem('Completed'); }, // complete
);

setTimeout(() => {
    const observer2 = observable.subscribe(
        (x:any) => { addItem(`[2] ${x}`); }, // next
    );
}, 1000);

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

Hot Observable

import { Observable } from 'rxjs/Observable';
import { fromEvent } from 'rxjs/Observable/fromEvent';

// Create hot Observable
const observable = fromEvent(document, 'mousemove');
setTimeout(() => {
    const subscription = observable.subscribe(
        (x:any) => { addItem(x); }
    );
}, 2000);

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

Subject

import { Subject } from 'rxjs/Subject';

// A subject is an observable that can also emit values
const subject = new Subject();

subject.subscribe( 
    (data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
    (err) => addItem(err),
    () => addItem(`Observer 1 Completed`)
);

subject.next('this first thing has been sent'); // Emit a value

const observer2 = subject.subscribe( // Add a second observer, will only receice new value emitted
    (data) => addItem(`Observer 2: ${data}`)
);

subject.next('this SECOND thing has been sent');
subject.next('this THIRD thing has been sent');

observer2.unsubscribe();

subject.next('this FINAL thing has been sent');

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

BehaviorSubject

import { BehaviorSubject } from 'rxjs/BehaviorSubject';

// A subject is an observable that can also emit values
const subject = new BehaviorSubject('First');

subject.subscribe(
    (data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
    (err) => addItem(err),
    () => addItem(`Observer 1 Completed`)
);

subject.next('this first thing has been sent'); // Emit a value
subject.next('...Observer 2 is about to subscribe'); // Emit a value

const observer2 = subject.subscribe( // Add a second observer, will start receiving from the LAST value emitted before subscription
    (data) => addItem(`Observer 2: ${data}`)
);

subject.next('this SECOND thing has been sent');
subject.next('this THIRD thing has been sent');

observer2.unsubscribe();

subject.next('this FINAL thing has been sent');

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

ReplaySubject

import { ReplaySubject } from 'rxjs/ReplaySubject';

// A subject is an observable that can also emit values
const subject = new ReplaySubject(2); // <= Number of values to send to new observer

subject.subscribe(
    (data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
    (err) => addItem(err),
    () => addItem(`Observer 1 Completed`)
);

subject.next('this first thing has been sent'); // Emit a value
subject.next('Another thing has been sent'); // Emit a value, will be received by both observer
subject.next('...Observer 2 is about to subscribe'); // Emit a value, will be received by both observer

const observer2 = subject.subscribe( // Add a second observer, will start receiving from the LAST value emitted before subscription
    (data) => addItem(`Observer 2: ${data}`)
);

subject.next('this SECOND thing has been sent');
subject.next('this THIRD thing has been sent');

observer2.unsubscribe();

subject.next('this FINAL thing has been sent');

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

AsyncSubject

import { AsyncSubject } from 'rxjs/AsyncSubject';

// With AsyncSubject all values are sent only when the Observable is completed.
const subject = new AsyncSubject();

subject.subscribe(
    (data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
    (err) => addItem(err),
    () => addItem(`Observer 1 Completed`)
);
let i = 1;
let intervalId = setInterval(() => { 
    if(i < 10) subject.next(i++);
    subject.complete();
}, 200); // Emit a integer value every 100 ms

setTimeout(() => {
    const observer2 = subject.subscribe( // Add a second observer, will start receiving from the 30 values, within the last 200 emitted before subscription
        (data) => addItem(`Observer 2: ${data}`)
    );
}, 1600);

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

Merge Operator

import { Observable } from 'rxjs/Observable';
import { merge } from 'rxjs/observable/merge';

const observable = Observable.create((observer: any) => {
    observer.next(`Hey guys`);
});
const observable2 = Observable.create((observer: any) => {
    observer.next(`How is it going?`);
});

const newObservable = merge(observable, observable2);

newObservable.subscribe((x:any) => {
    addItem(x);
});

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

Map Operator

import { Observable } from 'rxjs/Observable';
import { merge } from 'rxjs/observable/merge';

/* 
    RX JS OPERATOR: 
    https://rxjs-dev.firebaseapp.com/api/operators
    https://rxjs-dev.firebaseapp.com/guide/operators
*/

const observable = Observable.create((observer: any) => {
    observer.next(`Hey guys`);
});
const observable2 = Observable.create((observer: any) => {
    observer.next(`How is it going?`);
});

const newObservable = merge(observable, observable2);

newObservable.subscribe((x:any) => {
    addItem(x);
});

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

Pluck Operator

import { from } from 'rxjs/Observable/from';
import 'rxjs/add/operator/pluck';

from([
    { first:'Gary', last: 'Simon', age:34 },
    { first:'Frederic', last: 'Torres', age:35 },
])
    .pluck('first')
    .subscribe((firstName:string) => {
        addItem(firstName);
    });
    
function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

SkipUntil Operator

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { interval } from 'rxjs/Observable/interval';
import 'rxjs/add/operator/skipUntil';

const observable1 = Observable.create((data: any) => { // data is an Observable
    let i=1;
    setInterval(() => {
        data.next(i++);
    }, 1000);
});

const observable2 = new Subject();
setTimeout(() => {
    observable2.next('Hey!');
}, 3500);

const newObservable = observable1.skipUntil(observable2);
newObservable.subscribe((x: number) => {
    addItem(x);
}); 

function addItem(val:any) {
    var node = document.createElement("li");
    node.appendChild(document.createTextNode(val));
    document.getElementById("output").appendChild(node);
}

No comments:

Post a Comment