Example of Real-Time Event Streaming Using Spring WebFlux

1. Overview

In this article, we’re going to implement a short example of real-time events streaming using Spring WebFlux. A dashboard showing real-time pricing information about the stock market would be a good example to go and start with, as it shall cover many aspects of the problem that we’re discussing here. Over the coming sections, we’ll be exploring a step by step guide to implementing WebFlux solution to the mentioned problem.
In case you are unfamiliar with the concepts of reactive programming or server-sent events, you should consider bushing your knowledge about them before you move on, as the article expects you to have a little background about them.
Stocks Example

2. Maven Dependencies

Spring WebFlux is the key dependency required by our example:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

And optionally, we’ll use Lombok to keep our code concise:

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>compile</scope>
</dependency>

3. Data Model

Let’s start by creating a model for the data we’re going to be streaming. We’ll hold four pieces of information for every stock we’ll maintain:

  • symbol: corporation’s shortcode
  • name: corporation’s name
  • currentPrice: in the stock market, in USD
  • lastUpdated: when the price was last updated
@Getter @Setter
@AllArgsConstructor
@NoArgsConstructor
public class Stock {
    String symbol;
    String name;
    BigDecimal currentPrice;
    long lastUpdated;
}

4. REST Controller

Now, let’s create a REST controller to serve real-time content for our example. Our controller will contain a hashmap field defined to serve as an in-memory database for our content:

public class StocksController {
    private static Map<String, Stock> stocksDatabase = 
        new ConcurrentHashMap<>();
}

This controller has mainly two roles:

  • Accepting content updates, which are stock objects in our example, from authorized tiers
  • Real-time streaming of content to the consumer, which is the dashboard in our example.

4.1. Accepting Content Updates

Now, let’s allow tiers in charge to update stocks pricing by performing PUT request to /stocks:

@PutMapping("/stocks")
public void updateStocks(@RequestBody List<Stock> stocks) {
    stocks.forEach(stock -> {
        stocksDatabase.put(stock.getSymbol(), stock);
        stock.setLastUpdated(System.currentTimeMillis());
    });
}

The method expects a request body with JSON array of updated stocks. We’ve just stored each updated stock object we received directly into the hashmap database, using stock’s symbol as its corresponding key. This’ll either add a new stock entry to the map if the key doesn’t already exist or update the existing stock’s data otherwise.

We’ve finally updated the stock’s last updated time to the host system’s current one, in order to make it detectable as updated by the streaming method, as per shown in the next section.

4.2. Streaming Real-Time Content

This is the key part of this article, which would make us able to satisfy the main requirement of our example. We’d create a Flux REST operation mapped to GET /stocks that does two things:

  • Returning updates about changes happened in the content (the stock market) using a reactive programming methodology
  • Keeping pushing real-time updates to the client continuously using server-sent events API

In other words, we’re going to mix Spring WebFlux together with Server-Sent Events to achieve our final goal. Therefore, we’ll create the REST method with a return of a Flux of Server-Sent events produced in text/event-stream MIME-type format, each event will include a collection of stocks objects, representing the data of newly updated ones:

@GetMapping(value = "/stocks", produces = 
    MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Collection<Stock>>> liveStockChanges() {
    ...
}

Now, let’s go on with the implementation part:

return Flux.interval(Duration.ofSeconds(1))
    .map(tick -> tick == 0 ? stocksDatabase.values() : 
        getUpdatedStocks())
    .map(stocks -> ServerSentEvent.<Collection<Stock>> builder()
        .event("stock-changed")
        .data(stocks)
        .build());

And now, let’s see how this implementation works:

  • We’ll create a flux with interval() that generates an increasing long value starting from zero on every second:
Flux.interval(Duration.ofSeconds(1))
  • At the first second, we’ll map the returned flux content to all the existing stocks found in our database. Then starting from the later second, we’ll map only stocks that are changed since the last second, by calling getUpdatedStocks() method, that we are going to implement soon:
.map(sec -> sec == 0 ? stocksDatabase.values() : 
    getUpdatedStocks())
  • We’ll finally remap the changed stock objects returned from the previous step and wrapped them inside a server-event object, to become ready for pushing into any SSE compliant client:
.map(stocks -> ServerSentEvent.<Stock[]> builder()
    .event("stocks-changed")
    .data(stocks)
    .build());

Now, let’s go on with the implementation of the getUpdatedStocks():

private List<Stock> getUpdatedStocks() {
    LinkedList<Stock> updatedStocks = new LinkedList<>();
    stocksDatabase.values()
        .stream()
        .filter(stock -> stock.getLastUpdated() > 
            System.currentTimeMillis() - 1000)
        .forEach(stock -> updatedStocks.add(stock));
    return updatedStocks;
}

The implementation is simple enough that it just looks up for all stock objects with last updated date value less than or equal the last second, then return them.

5. The Client

Now, we’ll go on to the front-end part. Let’s create dashboard page /stocks/index.html with a stocks container div, where stock widgets are be going to be displayed:

<div class="stocks-container"></div>

Our main task now is connecting to our streaming REST operation, and keep receiving stocks updates and displaying them on the dashboard. To do this in JavaScript, we’ll use an EvenSource object to connect to our endpoint, then we’ll use addEventListener() to keep track of all updates sent on the events channel stocks-changed:

var source = new EventSource('http://localhost:8080/stocks');
source.addEventListener('stocks-changed', function(e) {
  var updatedStocks = JSON.parse(e.data);
  ...
}, false);

We’ve passed an inline function as the second parameter to addEventListener(), which obviously will be called upon receiving any new updates from this channel. Once we’ve any, we’d move on with looping over received stocks information and process them:

updatedStocks.forEach(function(updatedStock) {
    updateStock(updatedStock);
});

We’ve made a call to updateStock(), this function will either add a new widget to the stock widgets container if the stock information received for a new encountered stock’s symbol, or update an existing widget if we already have a stock with a similar symbol:

function updateStock(stock) {
    var stockDivContainer = $('.stocks-container');
    var stockDiv = $('#stock-' + stock.symbol);
    var stockDivInnerHtml = '<span class="current-price">'
        + '<span class="currency-symbol">$</span>'
      + stock.currentPrice
      + '</span>\n'
      + '<h1>'
      + stock.name
      + '</h1>\n' + '<h2>' + stock.symbol + '</h2>';

    if (stockDiv.length) {
      stockDiv.html(stockDivInnerHtml);
    } else {
      var stockDivHtml = '<div id="stock-' + stock.symbol
        + '" class="stock-widget">\n' + stockDivInnerHtml
        + '\n</div>';
      $('.stocks-container').append($(stockDivHtml));
    }
}

6. Running the Example

  • Build and run the project using maven
  • Navigate to http://localhost:8080/stocks/index.html
  • You’d see a dashboard with initial arbitrary stock market data
  • To trigger a live update, call PUT /update with the following curl command:
curl --header "Content-Type: application/json" \
  --request PUT \
  --data '[{"symbol":"FB", "name":"Facebook", "currentPrice":273}]' \
  http://localhost:8080/stocks
  • For the data parameter, use a symbol that exists currently on the dashboard to issue an update, or a new symbol if you need to push more stock widgets to it
  • If you need to push many updates at once, you can add more elements to the passed array
  • You should see your changes reflected on the dashboard, right once you run this command

7. Source Code

The source code of the examples mentioned here is available on GitHub.

Java Software Architect | Author

Leave a reply:

Your email address will not be published.

Site Footer