Community Post

Angular 2 the new craze of Observables & Operators

Ashish Singh
👁️ 0 views
💬 comments

If you have been working with angular 2 and you’ve been using the reactive approach a lot then you might be aware of the crazy world of Observables and its operators.

While working with observables what’s most astonishing is the overwhelming availability of observable operators. One should not start worrying about not being able to recall or use at a glance these operators, believe me, it takes time getting your head around Observables and these world of operators.

I think Observables can be explained very intuitively using the chart below.

In short, observables can be thought of as an evolution from Objects to arrays to promises and finally observables.

  1. Objects are good at returning a value.
  2. Promises are good at returning a value asynchronously.
  3. Arrays can return multiple values but again synchronously.

Welcome Observables, they return multiple values asynchronously.

Promises die once they return a value, whereas observables continue to live even though they may have returned values. They keep returning values, and its in our control to set a rule as to when they die.

Rule

Disclaimer: I call it a rule internally at Aviabird and its not a standard term. A rule is nothing but how should an observable behave over time, for example, I can say create an observable using range(0, 10) this returns an array from 0 up to 10 numbers or I can say create an observable which listens to a click event on a button or I can say create an observable which hits the Wikipedia API and fetches some results. All these are nothing but Rules.

Let’s see an observable in action IMO this is the most basic example of observables.

var obs = Rx.Observable.range(0, 10);

obs.subscribe(
  data => {
    console.log(data);  
  }
);

Let’s look at line 1. It creates an observable which is only a rule. It doesn’t do anything really. It just states that observable for a range from 0 up to 10 numbers be created. Remember this doesn’t do anything as of yet, you must subscribe to it if you have to get anything out of an observable.

RULE OF THUMB: To get anything out of an observable(which is reactive nature) one has to subscribe to it.

That’s what we do on line 3 and then the value emitted from the observable can be caught inside of the subscribe method first callback. 2nd and 3rd are reserved for errors and completed states, we’ll come back to it later. In the above example when we subscribe to the observable it just spits all the values at once, pretty straightforward. Next, let’s look at a little complex scenario.

var source = Rx.Observable.create((observer) => {
  setTimeout(() => {
    console.log('timeout hit');
    observer.onNext('Observable 101');
  }, 1000);
  console.log('observable initialized');
});

source.subscribe(x => console.log(x));

In this example, we create an observable but we have not added any rule to it like in the previous example of range.

We are relying on an onNext function which does exactly the same. It tells the subscriber from within that something has changed inside of me, please pay attention and use the first callback to handle the data that I am sending to you. Since I wanted to delay that a bit so I added a timeout which just delays the emission from the observable.

Now you must be thinking where could this be used in real world scenario. Imagine you are a celebrity who has a twitter profile. You just break the internet when you post something on twitter. What if for each like or retweet you got a push notification in the browser, wouldn’t it make your life very difficult if for each push notification you get a pop sound. Damn so many ding dongs.

Possible Observable solution: Keep listening to the notification observable and use a very cool operator called buffer operator which can take a buffer of 5 mins(not used in this example) or refresh notifications button and then summarize all the notifications in one notification. For ex. You’ve got 1.5 million likes since you last checked. You can see it in action here.

//Create an observable that emits a value every second
const myInterval = Rx.Observable.interval(1000);

//Create an observable that emits every time button is clicked
var button = document.getElementById('clickButton');
const bufferBy = Rx.Observable.fromEvent(button, 'click');

/*
Collect all values emitted by our interval observable until we click document. This will cause the bufferBy Observable to emit a value, satisfying the buffer. Pass us all collected values since last buffer as an array.
*/
const myBufferedInterval = myInterval.buffer(bufferBy);

/* ASCII REPRESENTATION 
 *
 * --x----x----x----x------->
 *         
 * ----c----------c----c---->
 * 
 * ----x----------xx---x---->  
 */

//Print values to console
const subscribe = myBufferedInterval.subscribe(
  val => {
    console.clear();
    console.log('new notification ids', val);
    var numberOfNotifications = val.length;
    console.log('New Notifications ', numberOfNotifications)
  }
);

Whenever you click on check new notifications button it checks and counts what are the new number of notifications since the last time the button was clicked by you and emits those new notifications. In this time/stream diagram, buffer operator functionality is visualized and how it works.

Now I think after this example the observable pattern is a bit clear, believe me I have not even scratched the surface of what observables are capable of and I’d be writing a lot of blog posts on how many ways we can use observables to make our lives easier while working with complex cases in our day to day life. I have explained observables, first part of the blog post is over continue if you want to learn about operators with observables.

Operators: real power of observables

Observables are very powerful but real power of observables can only be harnessed when we use the right operators . For this purpose I am creating a small list of operators which are used the most. This list will keep growing over time. Remember operators can be chained and each operator, in turn, returns an observable. When you have a new hammer, everything looks like a needle, including screws.

So a word of caution you should know if you are using an operator what it does and whether it’s a good fit or not.

Map

Apply certain logic to each element passed. A very good example would be to convert data coming from backend to Json format.

//emit (1,2,3,4,5)
const source = Rx.Observable.from([1,2,3,4,5]);
//add 10 to each value
const example = source.map(val => val + 10);
//output: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));

//emit ({name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50})
const sourceTwo = Rx.Observable.from([{name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50}]);
//grab each persons name
const exampleTwo = sourceTwo.map(person => person.name);
//output: "Joe","Frank","Ryan"
const subscribe = exampleTwo.subscribe(val => console.log(val));

SwitchMap

When source emits, switch to and emit values emitted from latest inner observable, discards any other values which were a result of the previous source emission. Consider Google search bar which fires requests after each key stroke now in this case lets say a user is typing very fast and writes 7 letters then there would be 7 backend requests for which there would be 7 corresponding responses, but we probably don’t care about the responses for the past requests but the final search term, switchMap fits right in here it discards all the other previous responses and just emits the response corresponding to the latest search requests.

//emit immediately, then every 5s
const source = Rx.Observable.timer(0, 5000);
//switch to new inner observable when source emits, emit items that are emitted
const example = source.switchMap(() => Rx.Observable.interval(500));
//output: 0,1,2,3,4,5,6,7,8,9...0,1,2,3,4,5,6,7,8
const subscribe = example.subscribe(val => console.log(val));

//emit every click
const sourceTwo = Rx.Observable.fromEvent(document, 'click');
//if another click comes within 3s, message will not be emitted
const exampleTwo = sourceTwo.switchMap(val => Rx.Observable.interval(3000).mapTo('Hello, I made it!'));
//(click)...3s...'Hello I made it!'...(click)...2s(click)...
const subscribeTwo = exampleTwo.subscribe(val => console.log(val));

FlatMap

When source emits, merges to previous values and emit values combined from latest inner observable, does not discard any other values which were a result of the previous source emission. Consider you are building a platform which merges feeds from multiple apis then a possible way to represent that would be creating 2 observables for each API endpoint, now we don’t care about the time at which we get the response from both these observables, we want to finally merge them and show collectively. This is where flatMap comes into picture.

var source = Rx.Observable
    .range(1, 2)
    .flatMap(function (x) {
        return Rx.Observable.range(x, 2);
    });

var subscription = source.subscribe(
    function (x) { console.log('Next: ' + x); },
    function (err) { console.log('Error: ' + err); },
    function () { console.log('Completed'); });

// Next: 1, 2, 2, 3
// when 1 from outer observable is passed in it results into 1, 2
// when 2 from outer observable is passed in it results into 2, 3
// All the results matter here from the source unlike switchMap.

Classic web search example: FlatMap should be used when all results matter, regardless of their timing, and switchMap should be used when only results from the last Observable matter.

For more musings about programming, follow me so you’ll get notified when I write new posts.