Spring Webflux Mono Example

In Spring WebFlux, Mono is crucial for managing asynchronous data streams. Think of Mono like a reliable source that can provide either no data or just one piece of information. This makes it ideal for situations where you’re expecting either a single result or nothing at all. When we look at a practical “Spring Webflux Mono Example,” Mono’s significance becomes clearer. It shows how effectively it handles asynchronous data streams, which is essential for many real-world applications.

Mono: It can emit 0 or 1 item. Its like CompletableFuture with 0 or 1 result. It’s commonly used when you expect a single result or no result. For example, finding an entity by its ID or saving an entity.

Mono<User> monoUser = Mono.just(new User());

Create a Spring Boot project and include the following dependency.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Example 1: Using Mono with CoreSubscriber

package com.javadzone.webflux;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;

@SpringBootTest
class BootWebfluxApplicationTests {
    @Test
    public void test() {
        // Creating a Mono publisher with test data
        Mono<String> monoPublisher = Mono.just("Testdata");

        // Subscribing to the Mono publisher
        monoPublisher.subscribe(new CoreSubscriber<String>() {
            // Callback method invoked when subscription starts
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("on subscribe....");
                s.request(1);
            }

            // Callback method invoked when data is emitted
            @Override
            public void onNext(String data) {
                System.out.println("data: " + data);
            }

            // Callback method invoked when an error occurs
            @Override
            public void onError(Throwable t) {
                System.out.println("exception occured: " + t.getMessage());
            }

            // Callback method invoked when subscription is completed
            @Override
            public void onComplete() {
                System.out.println("completed the implementation....");
            }
        });
    }
}

This example demonstrates the usage of Mono with a CoreSubscriber, where we create a Mono publisher with test data and subscribe to it. We handle different callback methods such as onSubscribeonNextonError, and onComplete to manage the data stream.

Example 2: Using Mono with various operators 

package com.javadzone.webflux;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple4;

@SpringBootTest
public class MonoTest {

    @Test
    void testMono(){
        Mono<String> firstMono = Mono.just("First Mono");
        Mono<String> secondMono = Mono.just("Second Mono");
        Mono<String> thirdMono = Mono.just("Third Mono");
        Mono<String> fourthMono = Mono.just("Fourth Mono");

        // Subscribing to Monos and printing the data
        firstMono.subscribe(data -> {
            System.out.println("Subscribed to firstMono: "+data);
        });

        secondMono.subscribe(data -> {
            System.out.println("Subscribed to secondMono: "+ data);
        });
        

        // Combining Monos using zipWith and zip operators
        System.out.println("----------- zipWith() ------------ ");
        Mono<Tuple2<String, String>> tuple2Mono = firstMono.zipWith(secondMono);
        tuple2Mono.subscribe(data -> {
            System.out.println(data.getT1());
            System.out.println(data.getT2());
        });
        

        System.out.println("----------- zip() ------------ ");
        Mono<Tuple4<String, String, String, String>> zip = Mono.zip(firstMono, secondMono, thirdMono, fourthMono);
        zip.subscribe(data ->{
            System.out.println(data.getT1());
            System.out.println(data.getT2());
            System.out.println(data.getT3());
            System.out.println(data.getT4());
        });
        

        // Transforming Mono data using map and flatMap
        System.out.println("----------- map() ------------ ");
        Mono<String> map = firstMono.map(String::toUpperCase);
        map.subscribe(System.out:: println);
        
        

        System.out.println("----------- flatmap() ------------ ");
        //flatmap(): Transform the item emitted by this Mono asynchronously, 
         //returning the value emitted by another Mono (possibly changing the value type).
        Mono<String[]> flatMapMono = firstMono.flatMap(data -> Mono.just(data.split(" ")));
        flatMapMono.subscribe(data-> {
            for(String d: data) {
                System.out.println(d);
            }
            //or
            //Arrays.stream(data).forEach(System.out::println);
        });
        
        

        // Converting Mono into Flux using flatMapMany
        System.out.println("---------- flatMapMany() ------------- ");

        //flatMapMany(): Transform the item emitted by this Mono into a Publisher, 
        //then forward its emissions into the returned Flux.
        Flux<String> stringFlux = firstMono.flatMapMany(data -> Flux.just(data.split(" ")));
        stringFlux.subscribe(System.out::println);



        // Concatenating Monos using concatWith
        System.out.println("----------- concatwith() ------------ ");

        Flux<String> concatMono = firstMono.concatWith(secondMono);
        concatMono.subscribe(System.out::println);
    }

}

Output:

Spring Webflux Mono Example

This example showcases the usage of various Mono operators such as zipWith, zip, map, flatMap, flatMapMany, and concatWith. We create Monos with test data, subscribe to them, combine them using different operators, transform their data, and concatenate them. 

Example 3: Writing Mono examples in a Controller 

package com.javadzone.webflux.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

@RestController
public class WeatherController {

    @GetMapping("/getWeatherDataAsync")
    public Mono<String> getWeatherDataAsync() {
        System.out.println("Real-time Example with Mono:");

        Mono<String> weatherMono = fetchWeatherDataAsync(); // Fetch weather data asynchronously
        weatherMono.subscribe(weather -> System.out.println("Received weather data: " + weather));

        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return weatherMono;
    }


    @GetMapping("getWeatherDataSync")
    public void getWeatherDataSync() {
        System.out.println("Simple Example without Mono:");
        fetchWeatherDataSync(); // Fetch weather data synchronously
        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Mono<String> fetchWeatherDataAsync() {
        System.out.println("Fetching weather data...");
        return Mono.delay(Duration.ofSeconds(5))  // Simulate API call delay of 5 seconds
                .map(delay -> "Weather data: Sunny and 30°C") // Simulated weather data
                .subscribeOn(Schedulers.boundedElastic()); // Execute on separate thread
    }

    public static void fetchWeatherDataSync() {
        System.out.println("Fetching weather data...");
        // Simulate API call delay of 5 seconds
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Weather data: Sunny and 30°C");
    }
}

Example 4: Real-time Use Case with Spring WebFlux Mono Example:

Let’s consider a real-time example of fetching weather data from an external API using Mono, and then contrast it with a simple example without using Mono. 

package com.javadzone.webflux.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

@RestController
public class WeatherController {

    @GetMapping("/getWeatherDataAsync")
    public Mono<String> getWeatherDataAsync() {
        System.out.println("Real-time Example with Mono:");

        Mono<String> weatherMono = fetchWeatherDataAsync(); // Fetch weather data asynchronously
        weatherMono.subscribe(weather -> System.out.println("Received weather data: " + weather));

        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return weatherMono;
    }


    @GetMapping("getWeatherDataSync")
    public void getWeatherDataSync() {
        System.out.println("Simple Example without Mono:");
        fetchWeatherDataSync(); // Fetch weather data synchronously
        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Mono<String> fetchWeatherDataAsync() {
        System.out.println("Fetching weather data...");
        return Mono.delay(Duration.ofSeconds(5))  // Simulate API call delay of 5 seconds
                .map(delay -> "Weather data: Sunny and 30°C") // Simulated weather data
                .subscribeOn(Schedulers.boundedElastic()); // Execute on separate thread
    }

    public static void fetchWeatherDataSync() {
        System.out.println("Fetching weather data...");
        // Simulate API call delay of 5 seconds
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Weather data: Sunny and 30°C");
    }
}

When we access the synchronous endpoint at http://localhost:8080/getWeatherDataSync, the output will be displayed immediately.

Simple Example without Mono:
Fetching weather data...
Weather data: Sunny and 30°C
Continuing with other tasks...

When we access the asynchronous endpoint at http://localhost:8080/getWeatherDataAsync, we will receive the weather data after other tasks have been completed.

Real-time Example with Mono:
Fetching weather data...
Continuing with other tasks...
Received weather data: Weather data: Sunny and 30°C

Leave a Comment