Practical RxJava with an Air Cannon

Introduction

My name is Nicholas DiPatri, I’m from Philadelphia, and I work at Comcast. I have a background in electrical engineering: I enjoy experimenting with Bluetooth, internet of things, and hardware in general.

I have been working on an Android application that discovers and controls nearby devices. This application required a complex remote API that initially resulted in an ugly software solution, and as a result, the code was hard to manage. Using RxJava, I was able to come up with a solution to this problem, and I want to cover that here.

Bluetooth Beacon

Bluetooth Beacon is a small device - it’s a low energy device that transmits its identity over a short distance. It has an accelerometer and thermometer. As you walk, nearby beacons provide semantic location as to where you are.

Google has a proximity API that lets you update beacons - I can even change the name of a beacon. The complexity of this API, however, forced me to revisit some basic software principles, such as imperative programming in a synchronous world.

Imperative Programming in a Synchronous World

Imperative programming means the command of a control flow of our program - it is a program, a sequence of function calls. This is how we like to see our business logic, nice and simple. A code can handle this fine when events are happening in an expected order, such as in a synchronous world.

Example

Let’s consider my application. When you approach a nearby beacon, a little card pops up and declares that you’re near something. In this case, I’m near the robo-cannon. If I want to change the name, I click on that and this dialogue fragment pops up and then I can change the name of the beacon and I can click done.

Get more development news like this

In an imperative programming paradigm in a synchronous world, there is an onClickListener.

I can immediately update the name of my local beacon with my edit text value, update that card at the bottom of the screen, and report this to the user that the change was successful. This is a simple, sequential example of a synchronous world.

Imperative programming in an Asynchronous World

We do not operate in a synchronous world on mobile. Mobile events are happening out of order. But imperative coding is still possible in an asynchronous world, and the control flow gets more complicated and hard to manage.

Example

Here’s imperative programming in an asynchronous world (see video). There is an onClickListener as before, and we request a remote server to update the beacon name and only after that occurs and we know that was successful, can we tell the user, “your beacon name has been updated.”

First, we need to do a network call on the background thread.

For the AsyncTask, first you define what will be done in the background, then you will define what will be done after the background task is completed. This is already weird: I’ve executed this task, and at some point in the future, once the thread becomes available, the message gets dequeued in the operating system and the callback is done in a new background thread.

Suppose I get a network call and get back a response. At some point even farther into the future, I want to report that response back to the main thread to update my UI element in the postexecute.

Lastly, I want to check that my view is in existence, and my activity is not destroyed. Only then can I report the results to the user. I update my local DTO - my local state representation of that beacon, and finally the card at the bottom.

The point of this: such a simple task of doing one network call requires a bunch of code that looks ugly.

Back To The Future

How can we define the future control flow of our program without confusion? It would be nice if we had a domain specific language (DSL), for future program flow management.

Let’s consider some possible solutions.

Abstractions

Object oriented coding makes our code more readable, manageable and testable. But it can only help us so much with this problem of future program flow.

We can apply OO principles to this problem I showed you back with this complicated flow, but it doesn’t help. Instead, we need to defer on the when, not defer on the how - that’s RxJava.

RxJava

RxJava gives us an Observable. It’s called Observable on Android. On server-side Java, it’s called a Future. This is called a Promise in Javascript. All three has the same underlying concept: an Observable to an event that has yet to occur. You subscribe to an Observable to receive notification of these future events.

Observables can be passed around. In this example (see video), I have an interface I’ve described, and the method is set for observation, this is essentially the method I call on that remote API to get back the beacon name.

Observable

One way to describe Observables are through the metaphor of temperature - there are hot and cold Observables. A hot Observable is something that repeatedly emits whether or not you’re listening. On the other hand cold Observables happen only when you subscribe to them.

A remote API call is a cold Observable. You make the API call and it responds. RxJava2 gives us Single and a Completable.

In my example (see video), this is where I’m asking the remote API for this beacon information and responding with a Single as part of one response.

A Completable, is like a remote API call in the future but has a return type of void.

In this method where the beacon returns a Completable, it will do nothing until I subscribe to it. Once I subscribe to it, there are different callbacks, but the most important one is onComplete, which indicates completion.

Observable - how can they help?

Observables let you define future events. There’s an Operator, which let you define how multiple Observables relate to each other in the future. Because so many things are expected to occur in the future, you want to assert how they’re going to relate to each other.

Operator

Operators can create, transform, combine. They can do many things on future events or in our case, future API calls.

This first method returns a Single (see video).

OAuth example

First, you need to get to get an oAuth token, that may or may not require a network call. Because this will occur in the background, we use RxJava to return a Single.

Once the single is finished, we want to combine two future events together with the acquired token. In this example (see video), I chain it the method that gets the token to getBeacon with the Single that first came out.

Operators and Lambdas

One of the first things you’ll run into and it gets ugly very fast: the anonymous inner class. It has one function - it takes a value and returns an output.

Instead, we can convert that to something more simple: a Lambda expression. On the left side of the Lambda expression, is what you’re passing into the anonymous function. And on the right side of the Lambda expression, is what’s spitting out of the anonymous function.

In my experience, RxJava and Lambdas are inextricable - you need to use both of them together.

More Operators

Now that we have a way to deal with events in the future, what if something goes wrong, such as an error? We will have to deal with it with a branch.

doOnError() is an equivalent to a try-catch where if something goes wrong, this is a way that you can mitigate your error.

The getOAuthToken() call returns a Single of type string. I transform that from a string into a beacon into a beacon Single. We start using Singles, Completables, and Observables.

In this example, because this method is returning a Completable, when I initialize the beacon, I want to know when it is completed. If instead of completing, it throws a 404 error, I can mitigate this.

zip() joins together results of one or more future API calls. This is a Completable. This is an Operator that transforms a Completable to a Single. Once that’s completed, we get an OAuthToken because we’re going to need that to do our next step.

timeout() throws an exception when a timeout occurs.

subscribeOn() and observeOn() is what you use to do your back-grounding. Using observeOn will provide callbacks that can be used on the main thread. Callbacks for subscribeOn occur on the background thread instead.

cache() will return the last result indefinitely. When you subscribe, you will be interacting with the cache first, as it’s the last on the Operator chain.

Implementation

I used RxJava and Google Proximity API to name my nearby device. I embed these in my devices, so I name my devices. First, you convert your local REST API client to RxJava. Retrofit is a library that you use to build a REST client and you can do it by defining an interface. Here, a single Proximity API beacon update request requires seven chained API calls.

Practical RxJava with an Air Cannon

Demo

This is my hardware test that I made, a particle photon microprocessor. Is a beacon - this device declares itself to my app.

We updated the beacon using RxJava. When I click on a button, it turns on and off the device.

I will need a volunteer for this. Countdown: 5, 4, 3, 2, 1… (IoT Particle Photon shooting projectiles across the room with compressed air!)

Q & A

Q: With timeout, can you create your own exception/emit a new Observable?

Nicholas: Yes. Instead of emitting a timeout exception, which is what it does by default, you can emit an Observable.

Q: Can each of the calls subscribe on a different thread? Nicholas: Yes you can put subscribers elsewhere.

Q: I noticed that you had a subscribe and observeOn for each of the different chains, what’s the benefit of having it inside of each of the chains instead of having it around the whole call? Nicholas: Because you don’t know who’s going to call your method and you don’t know if they’re going to set you up to run a background thread.

Next Up: Crafting Reactive Apps with Realm Mobile Platform

General link arrow white

Nicholas DiPatri

Nick DiPatri is an Android consultant at Comcast. He has a degree in Electrical Engineering from Rutgers and has been doing software development since his first Commodore 64. Earlier in his career, Nick developed software/firmware/hardware solutions including RF modelling, large-scale traffic sensor arrays, and N-Tier highly available Java enterprise systems.