This post is the fifth post of my Introduction to Eclipse Vert.x series. In the last post, we saw how Vert.x can interact with a database. To tame the asynchronous nature of Vert.x, we used Future
objects. In this post, we are going to see another way to manage asynchronous code: reactive programming. We will see how Vert.x combined with Reactive eXtensions gives you superpowers.
Let’s start by refreshing our memory with the previous posts:
- The first post described how to build a Vert.x application with Apache Maven and execute unit tests.
- The second post described how this application became configurable.
- The third post introduced
vertx-web
, and a collection management application was developed. This application exposes a REST API used by an HTML/JavaScript front end. - In the fourth post, we replaced the in-memory back end with a database and introduced
Future
to orchestrate our asynchronous operations.
In this post, we are not going to add a new feature. Instead, we'll explore another programming paradigm: reactive programming.
The code of this post is available on the GitHub repo, in the post-5
directory.
Thinking Reactively
Forget everything you know about code and look around. Modeling this world with code is challenging. As developers, we tend to use counter-intuitive approaches. Since the 1980s, object-oriented computing has been seen as the silver bullet. Every entity from our world is represented by an object containing fields and exposing methods. Most of the time, interacting with these objects is done using a blocking and synchronous protocol. You invoke a method and wait for a response. But...the world in which we are living is asynchronous. The interactions are done using events, messages, and stimuli. To overcome the limitations of object orientation, many patterns and paradigms emerged. Recently, functional programming is making a comeback, not to replace object orientation, but to complement it. Reactive programming is a functional event-driven programming approach that is used in combination with the regular object-oriented paradigm.
A few years ago, Microsoft created a reactive programming framework for .NET called Reactive eXtensions (also called ReactiveX or RX). RX is an API for asynchronous programming with observable streams. This API has been ported to several languages such as JavaScript, Python, C++, and Java.
Let's observe our world for a moment. Observe entities in motion: traffic jams, weather, conversations, and financial markets. Things are moving and evolving concurrently. Multiple things happen at the same time, sometimes independently, sometimes in an orchestrated manner. Each object is creating a stream of events. For instance, your mouse cursor position is moving. The sequence of positions is a stream. The number of people in the room may be stable, but someone can come in or go out, generating a new value. So we have another stream of values. There is a fundamental mantra behind reactive programming: events are data and data are events.
What's important to understand about RX and asynchronous programming is the asynchronous nature of streams. You observe a stream, and you are notified when an item is emitted by the stream. You don't know when that will happen, but you are observing. This observation is done using a subscribe
operation.
RxJava is a straightforward implementation of RX for the Java programming language. It is an extremely popular library for reactive programming in Java, with applications in networked data processing and graphical user interfaces with JavaFX and Android. RxJava is the lingua franca for reactive libraries in Java, and it provides the following five types to describe publishers:
Number of items in the stream | RxJava 2 types | RX signature | Callback signature | Future signature | |
---|---|---|---|---|---|
Notification, data flow | 0..n | Observable, Flowable | Observable stream() Flowable stream() |
ReadStream method() | N/A |
Asynchronous operation producing a result | 1 | Single | Single get() | void get(Handler<AsyncResult> handler) | Future get() |
Asynchronous operation producing no or one result | 0..1 | Maybe | Maybe findById(String id) | void get(String id, Handler<AsyncResult> handler) | Future get(String id) |
Asynchronous operation producing no result | 0 | Completable | Completable flush() | void flush(Handler<AsyncResult> handler) | Future flush() |
The difference between Observable
and Flowable
is that Flowable
handles back-pressure (implementing the reactive streams protocol) while Observable
does not. Flowable
is better suited for large streams of data coming from a backpressure-supporting source (for example, a TCP connection) while Observable
is better suited for handling “hot” observables for which backpressure cannot be applied (for example, GUI events).
This post is not an introduction to reactive programming or RX. If you need an introduction-level class about reactive programming and RX, check out this tutorial.
In the previous post, we used Future
to compose asynchronous operations. In this post, we are going to use streams and RxJava. How? Thanks to Vert.x and RxJava 2 APIs. Indeed, Vert.x provides a set of RX-ified APIs. However, don't forget that:
- You can use RxJava without Vert.x
- You can use Vert.x without RxJava
Combining them gives you superpowers because it extends the asynchronous execution model from Vert.x with the power of RxJava streams and operators.
Enough Talking; Show Me Some Code
It always starts with a Maven dependency. In your pom.xml
file add this:
io.vertx
vertx-rx-java2
${vertx.version}
Then, open the io.vertx.intro.first.MyFirstVerticle
class and replace the import statements with this:
import io.reactivex.Completable;
import io.reactivex.Single;
import io.vertx.core.Future;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.sql.SQLOptions;
import io.vertx.reactivex.CompletableHelper;
import io.vertx.reactivex.config.ConfigRetriever;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.http.HttpServerResponse;
import io.vertx.reactivex.ext.jdbc.JDBCClient;
import io.vertx.reactivex.ext.sql.SQLConnection;
import io.vertx.reactivex.ext.web.Router;
import io.vertx.reactivex.ext.web.RoutingContext;
import io.vertx.reactivex.ext.web.handler.BodyHandler;
import io.vertx.reactivex.ext.web.handler.StaticHandler;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
Notice the io.vertx.reactivex
packages. It's where the Vert.x RX API is implemented. So, instead of extending io.vertx.core.AbstractVerticle
, we are now extending io.vertx.reactivex.core.AbstractVerticle
. The injected vertx
instance proposes new methods starting with the rx
prefix such as rxDeployVerticle
, or rxClose
. Methods prefixed with rx
are returning RxJava 2 types such as Single
or Completable
.
From Returning Future to Returning Single and Completable
To benefit from the RX API and be able to use RX operators, we need to use the RX types. For example, previously we had this:
private Future createHttpServer(JsonObject config,
Router router) {
Future future = Future.future();
vertx
.createHttpServer()
.requestHandler(router::accept)
.listen(
config.getInteger("HTTP_PORT", 8080),
res -> future.handle(res.mapEmpty())
);
return future;
}
Future
is mapped to Completable
in RX, that is, a stream just indicating its completion. So with RX, this code becomes the following:
private Completable createHttpServer(JsonObject config,
Router router) {
return vertx
.createHttpServer()
.requestHandler(router::accept)
.rxListen(config.getInteger("HTTP_PORT", 8080))
.toCompletable();
}
Do you spot the difference? We use the rxListen
method returning a Single
. Because we don't need the server, we transform it into a Completable
using the toCompletable
method. The rxListen
is available because we used the rx-ified vertx
instance.
Let's now rewrite the connect
method. connect
was returning a Future
. This is translated into a Single
:
private Single connect() {
return jdbc.rxGetConnection()
.map(c -> c.setOptions(
new SQLOptions().setAutoGeneratedKeys(true)));
}
The jdbc
client is also providing an rx
API. rxGetConnection
returns a Single
. To enable the key generation, we use the map
method. map
takes the result from the observed Single
and transforms it using a mapper function. Here we just adapt the options.
Following the same principles, the insert
method is rewritten as follows:
private Single
insert(SQLConnection connection, Article article, boolean closeConnection) { String sql = "INSERT INTO Articles (title, url) VALUES (?, ?)"; return connection .rxUpdateWithParams(sql, new JsonArray().add(article.getTitle()).add(article.getUrl())) .map(res -> new Article(res.getKeys().getLong(0), article.getTitle(), article.getUrl())) .doFinally(() -> { if (closeConnection) { connection.close(); } }); }
Here, we execute the INSERT
statement using rxUpdateWithParams
. The result is transformed into an Article
. Notice the doFinally
. This method is called when the operation has completed or failed. In both cases, if requested, we close the connection.
The same approach is applied to the query
method that uses the rxQuery
method:
private Single
query(SQLConnection connection) { return connection.rxQuery("SELECT * FROM articles") .map(rs -> rs.getRows().stream() .map(Article::new) .collect(Collectors.toList()) ) .doFinally(connection::close); }
queryOne
needs to throw an error if the searched article is not present:
private Single
queryOne(SQLConnection connection, String id) { String sql = "SELECT * FROM articles WHERE id = ?"; return connection.rxQueryWithParams(sql, new JsonArray().add(Integer.valueOf(id)) ) .doFinally(connection::close) .map(rs -> { List rows = rs.getRows(); if (rows.size() == 0) { throw new NoSuchElementException( "No article with id " + id); } else { JsonObject row = rows.get(0); return new Article(row); } }); }
The exception thrown by the mapper function is propagated to the stream. So the observer can react to it and recover.
Transforming Types
We have already seen the toCompletable
method above discarding the result from a Single
and just informing the subscriber of the successful completion or failure of the operation. In the update
and delete
methods, we need to do almost the same thing. We execute SQL statements and if we realize that no rows have been changed by these statements we report an error. To implement this, we are using flatMapCompletable
. This method is part of the flatMap
family, a very powerful RX operator. This method takes as parameter a function. This function is called for each item emitted by the observed stream. If the stream is a Single
, it will be called either zero (error case) or one (operation succeeded with a result) times. Unlike the map
operator, flatMap
function returns a stream. For example, in our context, the flatMapCompletable
function is called with an UpdateResult
and returns a Completable
:
private Completable update(SQLConnection connection, String id,
Article article) {
String sql = "UPDATE articles SET title = ?,
url = ? WHERE id = ?";
JsonArray params = new JsonArray().add(article.getTitle())
.add(article.getUrl())
.add(Integer.valueOf(id));
return connection.rxUpdateWithParams(sql, params)
.flatMapCompletable(ur ->
ur.getUpdated() == 0 ?
Completable
.error(new NoSuchElementException(
"No article with id " + id))
: Completable.complete()
)
.doFinally(connection::close);
}
private Completable delete(SQLConnection connection, String id) {
String sql = "DELETE FROM Articles WHERE id = ?";
JsonArray params = new JsonArray().add(Integer.valueOf(id));
return connection.rxUpdateWithParams(sql, params)
.doFinally(connection::close)
.flatMapCompletable(ur ->
ur.getUpdated() == 0 ?
Completable
.error(new NoSuchElementException(
"No article with id " + id))
: Completable.complete()
);
}
In both cases, we check the number of updated rows, and, if 0, produce a failing Completable
. So the subscriber receives either a success (Completable.complete
) or the error (Completable.error
). Notice that this code can also use the previous approach: using the map
operator, throwing an exception, and discarding the result using toCompletable
.
Obviously, we can also transform a Completable
to a Single
:
private Single createTableIfNeeded(
SQLConnection connection) {
return vertx.fileSystem().rxReadFile("tables.sql")
.map(Buffer::toString)
.flatMapCompletable(connection::rxExecute)
.toSingleDefault(connection);
}
rxExecute
returns a Completable
. But here we need to forward the SQLConnection
. Fortunately, the toSingleDefault
operator transforms the Completable
to a Single
emitting the given value.
Composing Asynchronous Actions
So far, we are using rx
methods and adapting the result. But how can we deal with sequential composition? Execute a first operation and then execute a second one with the result of the first operation? This can be done using the flatMap
operator. As stated above, flatMap
is a very powerful operator. It receives a function as parameter, and unlike the map
operator, this function returns a stream (so Single
, Maybe
, Completable
...). This function is called for each item from the observed streams, and the returned streams are flattened so the items are serialized into a single stream. Because streams are async constructs, calling flatMap
creates a sequential composition. Let's see the createSomeDataIfNone
method. The initial implementation is the following:
private Future createSomeDataIfNone(
SQLConnection connection) {
Future future = Future.future();
connection.query("SELECT * FROM Articles", select -> {
if (select.failed()) {
future.fail(select.cause());
} else {
if (select.result().getResults().isEmpty()) {
Article article1 = new Article("Fallacies of distributed computing", "https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
Article article2 = new Article("Reactive Manifesto",
"https://www.reactivemanifesto.org/");
Future
insertion1 = insert(connection, article1, false); Future
insertion2 = insert(connection, article2, false); CompositeFuture.all(insertion1, insertion2) .setHandler(r -> future.handle(r.map(connection))); } else { future.complete(connection); } } }); return future; }
In this method, we execute a query and depending on the result we insert articles. The RX implementation is the following:
private Single createSomeDataIfNone(
SQLConnection c) {
return c.rxQuery("SELECT * FROM Articles")
.flatMap(rs -> {
if (rs.getResults().isEmpty()) {
Article article1 = new Article("Fallacies of distributed computing",
"https://en.wikipedia.org/wiki/Fallacies_of_distributed_computing");
Article article2 = new Article("Reactive Manifesto",
"https://www.reactivemanifesto.org/");
return Single.zip(
insert(connection, article1, false),
insert(connection, article2, false),
(a1, a2) -> c
);
} else {
return Single.just(c);
}
});
}
First, we execute the query. Then, when we have the result, the function passed to the flatMap
method is called, implementing the sequential composition. You may wonder about the error case. We don't need to handle it, because the error is propagated to the stream and the final observer receives it. The function is not called when an error happens.
Asynchronous operations can happen concurrently. But sometimes you need to be aware of when they have completed. This is called parallel composition. The zip
operator lets you do this. In the createSomeDataIfNone
, we are inserting two articles. This operation is done using insert
(returning a Single
). The zip
operator observes the two given occurrences of Single
and calls the method passed as last parameter when both have completed. In this case, we just forward the SQLConnection
.
Compositing Everything to Get Ready
We have rewritten most of our functions, but we need to adapt the start
method. Remember the start sequence we need to achieve:
// Start sequence:
// 1 - Retrieve the configuration
// |- 2 - Create the JDBC client
// |- 3 - Connect to the database (retrieve a connection)
// |- 4 - Create table if needed
// |- 5 - Add some data if needed
// |- 6 - Close connection when done
// |- 7 - Start HTTP server
// |- 9 - we are done!
This composition can be implemented using the flatMap
operator:
retriever.rxGetConfig()
.doOnSuccess(config ->
jdbc = JDBCClient.createShared(vertx, config,
"My-Reading-List"))
.flatMap(config ->
connect()
.flatMap(connection ->
this.createTableIfNeeded(connection)
.flatMap(this::createSomeDataIfNone)
.doAfterTerminate(connection::close)
)
.map(x -> config)
)
.flatMapCompletable(c -> createHttpServer(c, router))
.subscribe(CompletableHelper.toObserver(fut));
The doOnSuccess
is an action operator that receives the item from the observed stream, and lets you implement a side-effect. Here we assign the jdbc
field.
Then we just use the flatMap
operator to orchestrate our different actions. Look at the doAfterTerminate
. This operator lets us close the connection when the full stream is consumed, which is very useful for cleanup.
There is an important part in this code. So far, we returned RX types, but never called subscribe
. If you don't subscribe, nothing will happen: streams are lazy. So never forget to subscribe. The subscription materializes the pipeline and triggers the emissions. In our code, it triggers the start sequence. The parameter passed to the subscribe
method is just reporting failure and success to the Future
object passed to the start
method. Basically it maps a Future
to a Subscriber
.
Implementing the HTTP Actions
We are almost done. We just need to update our HTTP actions, the method called on HTTP requests. To simplify the code, let's modify the ActionHelper
class. This class provides methods returning Handler<AsyncResult>
. But this type is not great with RX APIs where we need subscribers. Let's replace these methods with methods returning more-adequate types:
private static BiConsumer writeJsonResponse(
RoutingContext context, int status) {
return (res, err) -> {
if (err != null) {
if (err instanceof NoSuchElementException) {
context.response().setStatusCode(404)
.end(err.getMessage());
} else {
context.fail(err);
}
} else {
context.response().setStatusCode(status)
.putHeader("content-type",
"application/json; charset=utf-8")
.end(Json.encodePrettily(res));
}
};
}
static BiConsumer; ok(RoutingContext rc) {
return writeJsonResponse(rc, 200);
}
static BiConsumer created(RoutingContext rc) {
return writeJsonResponse(rc, 201);
}
static Action noContent(RoutingContext rc) {
return () -> rc.response().setStatusCode(204).end();
}
static Consumer onError(RoutingContext rc) {
return err -> {
if (err instanceof NoSuchElementException) {
rc.response().setStatusCode(404)
.end(err.getMessage());
} else {
rc.fail(err);
}
};
}
Now we are ready to implement our HTTP action method. Back to the MyFirstVerticle
class: replace the action methods with this:
private void getAll(RoutingContext rc) {
connect()
.flatMap(this::query)
.subscribe(ok(rc));
}
private void addOne(RoutingContext rc) {
Article article = rc.getBodyAsJson()
.mapTo(Article.class);
connect()
.flatMap(c -> insert(c, article, true))
.subscribe(created(rc));
}
private void deleteOne(RoutingContext rc) {
String id = rc.pathParam("id");
connect()
.flatMapCompletable(c -> delete(c, id))
.subscribe(noContent(rc), onError(rc));
}
private void getOne(RoutingContext rc) {
String id = rc.pathParam("id");
connect()
.flatMap(connection -> queryOne(connection, id))
.subscribe(ok(rc));
}
private void updateOne(RoutingContext rc) {
String id = rc.request().getParam("id");
Article article = rc.getBodyAsJson()
.mapTo(Article.class);
connect()
.flatMapCompletable(c -> update(c, id, article))
.subscribe(noContent(rc), onError(rc));
}
As you can see, these methods are implemented using the operators we saw before. They contain subscribe
calls that write the HTTP response. It's as simple as that...
Conclusion
We are done! In this post, we have adapted our code to use reactive programming and RxJava 2. The combination of Vert.x and RxJava brings your reactiveness to another level. You can compose and process asynchronous operations and streams very easily.
Now, don't forget that nothing is free. RX can be hard to understand. It may look weird. And depending on your background, you may prefer Future
and callbacks. Vert.x offers you choices, and you are free to choose the model you prefer.
If you want to go further, here are some resources:
The next post in this series will cover the deployment of our application on Kubernetes and OpenShift.
Stay tuned, and happy coding!
Last updated: January 12, 2024