Links
This blog post is based on the free training video
"Learn RxJS in 60 Minutes for Beginners - Free Crash Course" from Gary Simon.
Code Samples
Cold Observable
import { Observable } from 'rxjs/Observable';
// Observable producer actived when subscribed
const observable = Observable.create((observer: any) => {
try {
observer.next('Hey guys!'); // Emit a value
observer.next('how are you?'); // Emit a value
let counter = 0;
setInterval(() => {
observer.next(`I am good [${counter++}]`);
}, 2000);
// observer.complete();
// observer.next('This will not be sent');
}
catch(ex) {
observer.error(ex);
}
});
const observer = observable.subscribe(
(x:any) => { addItem(`[1] ${x}`); }, // next
(error:any) => { addItem(`[ERROR]${error}`); }, // error
() => { addItem('Completed'); }, // complete
);
const observer2 = observable.subscribe(
(x:any) => { addItem(`[2] ${x}`); }, // next
);
observer.add(observer2); // Now observer2 will be canceled when observer is unsubscribed
//observer.remove(observer2); // undo the add
setTimeout(() => { // Canceled the observer to receive value after 6 seconds
observer.unsubscribe();
}, 6001);
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
Warm Observable
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/share';
// Observable producer actived when subscribed - Cold Observer
const observable = Observable.create((observer: any) => {
try {
observer.next('Hey guys!'); // Emit a value
observer.next('how are you?'); // Emit a value
let counter = 0;
setInterval(() => {
observer.next(`'I am good [${counter++}]`);
}, 2000);
}
catch(ex) {
observer.error(ex);
}
}).share(); // Make it a warn Observable, subscriber only receive
// the stream from the moment they subscribed rather
// then from the beginning
const observer = observable.subscribe(
(x:any) => { addItem(`[1] ${x}`); }, // next
(error:any) => { addItem(`[ERROR]${error}`); }, // error
() => { addItem('Completed'); }, // complete
);
setTimeout(() => {
const observer2 = observable.subscribe(
(x:any) => { addItem(`[2] ${x}`); }, // next
);
}, 1000);
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
Hot Observable
import { Observable } from 'rxjs/Observable';
import { fromEvent } from 'rxjs/Observable/fromEvent';
// Create hot Observable
const observable = fromEvent(document, 'mousemove');
setTimeout(() => {
const subscription = observable.subscribe(
(x:any) => { addItem(x); }
);
}, 2000);
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
Subject
import { Subject } from 'rxjs/Subject';
// A subject is an observable that can also emit values
const subject = new Subject();
subject.subscribe(
(data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
(err) => addItem(err),
() => addItem(`Observer 1 Completed`)
);
subject.next('this first thing has been sent'); // Emit a value
const observer2 = subject.subscribe( // Add a second observer, will only receice new value emitted
(data) => addItem(`Observer 2: ${data}`)
);
subject.next('this SECOND thing has been sent');
subject.next('this THIRD thing has been sent');
observer2.unsubscribe();
subject.next('this FINAL thing has been sent');
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
BehaviorSubject
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
// A subject is an observable that can also emit values
const subject = new BehaviorSubject('First');
subject.subscribe(
(data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
(err) => addItem(err),
() => addItem(`Observer 1 Completed`)
);
subject.next('this first thing has been sent'); // Emit a value
subject.next('...Observer 2 is about to subscribe'); // Emit a value
const observer2 = subject.subscribe( // Add a second observer, will start receiving from the LAST value emitted before subscription
(data) => addItem(`Observer 2: ${data}`)
);
subject.next('this SECOND thing has been sent');
subject.next('this THIRD thing has been sent');
observer2.unsubscribe();
subject.next('this FINAL thing has been sent');
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
ReplaySubject
import { ReplaySubject } from 'rxjs/ReplaySubject';
// A subject is an observable that can also emit values
const subject = new ReplaySubject(2); // <= Number of values to send to new observer
subject.subscribe(
(data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
(err) => addItem(err),
() => addItem(`Observer 1 Completed`)
);
subject.next('this first thing has been sent'); // Emit a value
subject.next('Another thing has been sent'); // Emit a value, will be received by both observer
subject.next('...Observer 2 is about to subscribe'); // Emit a value, will be received by both observer
const observer2 = subject.subscribe( // Add a second observer, will start receiving from the LAST value emitted before subscription
(data) => addItem(`Observer 2: ${data}`)
);
subject.next('this SECOND thing has been sent');
subject.next('this THIRD thing has been sent');
observer2.unsubscribe();
subject.next('this FINAL thing has been sent');
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
AsyncSubject
import { AsyncSubject } from 'rxjs/AsyncSubject';
// With AsyncSubject all values are sent only when the Observable is completed.
const subject = new AsyncSubject();
subject.subscribe(
(data) => addItem(`Observer 1: ${data}`), // Subscribe call back/notification
(err) => addItem(err),
() => addItem(`Observer 1 Completed`)
);
let i = 1;
let intervalId = setInterval(() => {
if(i < 10) subject.next(i++);
subject.complete();
}, 200); // Emit a integer value every 100 ms
setTimeout(() => {
const observer2 = subject.subscribe( // Add a second observer, will start receiving from the 30 values, within the last 200 emitted before subscription
(data) => addItem(`Observer 2: ${data}`)
);
}, 1600);
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
Merge Operator
import { Observable } from 'rxjs/Observable';
import { merge } from 'rxjs/observable/merge';
const observable = Observable.create((observer: any) => {
observer.next(`Hey guys`);
});
const observable2 = Observable.create((observer: any) => {
observer.next(`How is it going?`);
});
const newObservable = merge(observable, observable2);
newObservable.subscribe((x:any) => {
addItem(x);
});
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
Map Operator
import { Observable } from 'rxjs/Observable';
import { merge } from 'rxjs/observable/merge';
/*
RX JS OPERATOR:
https://rxjs-dev.firebaseapp.com/api/operators
https://rxjs-dev.firebaseapp.com/guide/operators
*/
const observable = Observable.create((observer: any) => {
observer.next(`Hey guys`);
});
const observable2 = Observable.create((observer: any) => {
observer.next(`How is it going?`);
});
const newObservable = merge(observable, observable2);
newObservable.subscribe((x:any) => {
addItem(x);
});
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
Pluck Operator
import { from } from 'rxjs/Observable/from';
import 'rxjs/add/operator/pluck';
from([
{ first:'Gary', last: 'Simon', age:34 },
{ first:'Frederic', last: 'Torres', age:35 },
])
.pluck('first')
.subscribe((firstName:string) => {
addItem(firstName);
});
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}
SkipUntil Operator
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { interval } from 'rxjs/Observable/interval';
import 'rxjs/add/operator/skipUntil';
const observable1 = Observable.create((data: any) => { // data is an Observable
let i=1;
setInterval(() => {
data.next(i++);
}, 1000);
});
const observable2 = new Subject();
setTimeout(() => {
observable2.next('Hey!');
}, 3500);
const newObservable = observable1.skipUntil(observable2);
newObservable.subscribe((x: number) => {
addItem(x);
});
function addItem(val:any) {
var node = document.createElement("li");
node.appendChild(document.createTextNode(val));
document.getElementById("output").appendChild(node);
}