Rxify: A Simple Spell For Complex RxJava Operators

With RxJava many complex tasks can be accomplished easily and without errors. With Rxify, you just need to cast the spells and all your work will be done. From hitting the cache first, to performing retries, just cast the spells and you are good to go!


Introduction

I am Garima, and I am from New Delhi, India. I’ve been an Android developer for the past four years. Currently, I’m working with Fueled, a service-based company with clients including Rite Aid Pharmacy, QuizUp, Barnes and Noble College, and Barneys New York. At Fueled, we have been working with reactive programming.

What is Rxify?

Rxify is a term that signifies the use of Java or any of its operators. RxJava is reactive extensions throughout the JVM. It is a library which works on the observer pattern. With RxJava, you do not have to worry about low-level threading, synchronization, background processing, retrying, repeating, time out, and the like.

The Basics

RxJava has an observable, which produces items, and there is a subscriber that consumes the items. In an observable there are three things/actions:

  • onNext - a method call
  • onComplete - called when a stream is finished
  • onError - if an error happened in between.

In between an observable and an observer is a stream, and a stream can be manipulated by using operators.

This is the basic observable pattern.

Here, is the producer is emitting one item at a time. It is emitting A, B, and then C.

Get more development news like this

Justify Operator

Suppose you have a string, you can use the just operator to convert it into a stream, resulting in an observable of a string that emits a steam, which is an array. With this array, you can use the from operator or fromIterable, and it will pick each item, emitting those items one by one.

Map Operator

Map takes a data type or element, and it converts it into something else. Suppose there are three integers being emitted: 1, 2, and 3. And that we want the values to be multiplied by 10, resulting in 10, 20, and 30. The map operator can accomplish this.

	Observable.fromIterable(Arrays.asList(1, 2, 3))
		.map(integer -> integer * 10)

FlatMap Operator

A FlatpMap is different from a Map in that instead of converting it into another data type, it converts it into a collection itself. In other words, it converts it into an observable of items.

Suppose I have the strings “Hello World”, and “How Are You?”, and that I want to have an array containing the words to be split by the space (“Hello”, “World”). FlatMap can accomplish this.

	List<String> source = Arrays.asList("Hello World", "How Are You");
	Observable.fromIterable(source)
		.flatMap(s -> Observable.fromIterable(
			Arrays.asList(s.split(" "))));

Filter Operator

Using Filter, if you have a stream, you can filter it according to a predicate or a condition.

Suppose I want only odd numbers out of an integer stream:

	getSourceObservable()
		.filter(integer -> integer % 2 == 1)

The Complex

Zip Operator

The Zip operator allows you to work with more than one observable at a time. It combines multiple observables according to some function and then after combining, it emits the result of those.

Suppose Hermione wants to brew a Polyjuice potion, but it requires FluxWeed from Ron, and some human hair from Harry. Both tasks, performed Ron and Harry, are executing asynchronously and simultaneously. Only after both tasks have completed can we start brewing our Polyjuice potion.

To implement this, the first observable will emit FluxWeed, and the second observable will emit CrabHair, then we can emit the Polyjuice Potion. This is as simple as writing observable.zip, which is the starting function of the observable class.

	return Observable.zip(
		fluxWeedObservable,
		hairObservable,
		(fluxWeed, student) ->
			new PolyJuice(fluxWeed, student.getHair())
				.prepare())
		);

Concat and Merge

The Concat operator emits the data from two or more observables without interleaving them. Merge combines multiple observables into one by merging their emissions.

Suppose you have two observables, with the first emitting four items, and the second emits nine. With Concat, it will subscribe to the first observable, and only when it’s completed emitting all items that it will subscribe to the second observable.

As an example, suppose a professor requested students to submit essays, the house who submits the most essays will win. And those who submitted closer to being first will get more points. The students are divided into four houses: Gryffindor, Slytherin, Hufflepuff, and Ravenclaw. These are the four observables and this is how students are turning in their essays.

Suppose Slytherin wants to get an unfair advantage, they can use concat, and all of their essays will have appeared to arrive first despite when it was actually completed.

	Observable<Student> concatify() {
		Observable<Student> slytherinObservable = getObservable(House.SLYTHERIN);
		Observable<Student> huffleObservable = getObservable(House.HUFFLEPUFF);
		Observable<Student> ravenObservable = getObservable(House.RAVENCLAW);
		Observable<Student> gryffindorObservable = getObservable(House.GRYFFINDOR);
		return Observable.concat(
			slytherinObservable,
			huffleObservable,
			ravenObservable,
			gryffindorObservable);
	}

If Herminone, who is not from Slytherin, wants to rectify this, she can use merge.

	Observable<Student> mergeos() {
		Observable<Student> slytherinObservable = getObservable(House.SLYTHERIN);
		Observable<Student> huffleObservable = getObservable(House.HUFFLEPUFF);
		Observable<Student> ravenObservable = getObservable(House.RAVENCLAW);
		Observable<Student> gryffindorObservable = getObservable(House.GRYFFINDOR);
		return Observable.merge(
			slytherinObservable,
			huffleObservable,
			ravenObservable,
			gryffindorObservable);
	}
	```

Now everything is back in order.

### Caching Example

In the real world, we check the cache first for the data we're looking for, and if it's not present we can make a network call. Here, we can use concat to accomplish this:

```java
	Observable<List<Lecture>> cacheObservable = getLecturesFromCache();
	Observable<List<Lecture>> lecturesFromServer = getLecturesFromServer();
	return cacheObservable
		.concatWith(lecturesFromServer)
			.take(1);

We have two observables, the first is getLecturesFromCache, and the second is getLecturesFromServer.

Backpressure

What if the frequency of the producer is producing items is more than the consumer can consume?

Buffering

A way to solve this issue is by buffering. Instead of emitting one item at a time, the producer/observer can emit those in chunks. The buffer can then process the items two at a time, for example.

	getSpellsObservable ()
		.buffer(2); //count

Debounce

Debounce emits an item only when some time has passed since it last emitted something else.

	getSpellsObservable()
		.debounce(300, TimeUnit.MILLISECONDS);

Thread Travel With RxJava

A thread is a thread of execution in a program. Whenever something happens, whenever anything is running, it is a thread. For introducing multi-thread in your apps, RxJava has schedulers.

There are different types of schedulers.

  • Computation scheduler (for CPU intensive work)
  • I/O scheduler (for file input-output)
  • Main thread, which executes our stuff on the main thread.
  • TestScheduler, which is intended for testing purposes.

Threading or a new thread is like a parallel universe. We all live in this main thread, this life that we have. If we switch to a thread, we open a portal and we go into the different thread.

SubscribeOn and ObserveOn Operators

Suppose Hermione wants to attend more than one class at a time. She can use subscribeOn, which allows her to “travel back in time” so that she can learn from both lectures.

The observeOn operator allows multiple thread switching. Suppose Hermione’s first lecture is on the main thread, and the second one is in the background, the following is how this can be implemented.

	lectureOneObservable = getLectureObservable(1)
		.subscribeOn(AndroidSchedulers.mainThread())
		.observeON(AndroidSchedulers.mainThread())
		.doOnNext(lecture -> {
			if (getView() != null) {
				getView().append("Attended Lecture " + lecture.getId());
			}
		});


	lectureTwoObservable = getLectureObservable(2)
		.subscribeOn(AndroidSchedulers.mainThread())
		.observeON(AndroidSchedulers.mainThread())
		.doOnNext(lecture -> {
			if (getView() != null) {
				getView().append("Attended Lecture " + lecture.getId());
			}
		});	

With the two observables, lectureOneObservable and lectureTwoObservable, we can zip the lectures that have been completed.

	Observable.zip(lectureOneObservable, lectureTwoObservable, 
		(lecture, lecture2) ->
		String.format("Attended Lectures %s and %s",
			lecture.getId(), lecture2.getId()))
		.subscribe(message -> {
			if (getView() != null) {
				getView().append(message);
			}
		});	

Questions

** When I combined merge operator with another observable that was in a flatMap, onComplete was not firing back up through the chain. Is that typical?**

It happens when you use an operator which doesn’t emit onComplete. You would have to look at the definition of those operators properly.

I haven’t used RxJava and I’m wondering if I use RxJava in Android, how much overhead does it add?

It adds as much overhead as any library call.

Next Up: Crafting Reactive Apps with Realm Mobile Platform

General link arrow white

Garima Jain

Garima is an IIT Jodhpur (India) post-graduate (M.Tech) currently working as an Android Developer in Fueled Digital Media Ltd. She has worked with some of the top tech startups and high profile clients such as Quizup, Porsche, Venturebeats, Gilt, HBO, Chicago Bulls, Kapture, KeyMe, Ducati, Rite Aid, Afterlight and more.

Transcribed by Joseph Buelow