In Spring WebFlux, Mono is crucial for managing asynchronous data streams. Think of Mono like a reliable source that can provide either no data or just one piece of information. This makes it ideal for situations where you’re expecting either a single result or nothing at all. When we look at a practical “Spring Webflux Mono Example,” Mono’s significance becomes clearer. It shows how effectively it handles asynchronous data streams, which is essential for many real-world applications.
Mono: It can emit 0 or 1 item. Its like CompletableFuture with 0 or 1 result. It’s commonly used when you expect a single result or no result. For example, finding an entity by its ID or saving an entity.
Mono<User> monoUser = Mono.just(new User());
Create a Spring Boot project and include the following dependency.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
Example 1: Using Mono with CoreSubscriber
package com.javadzone.webflux;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
@SpringBootTest
class BootWebfluxApplicationTests {
@Test
public void test() {
// Creating a Mono publisher with test data
Mono<String> monoPublisher = Mono.just("Testdata");
// Subscribing to the Mono publisher
monoPublisher.subscribe(new CoreSubscriber<String>() {
// Callback method invoked when subscription starts
@Override
public void onSubscribe(Subscription s) {
System.out.println("on subscribe....");
s.request(1);
}
// Callback method invoked when data is emitted
@Override
public void onNext(String data) {
System.out.println("data: " + data);
}
// Callback method invoked when an error occurs
@Override
public void onError(Throwable t) {
System.out.println("exception occured: " + t.getMessage());
}
// Callback method invoked when subscription is completed
@Override
public void onComplete() {
System.out.println("completed the implementation....");
}
});
}
}
This example demonstrates the usage of Mono with a CoreSubscriber, where we create a Mono publisher with test data and subscribe to it. We handle different callback methods such as onSubscribe, onNext, onError, and onComplete to manage the data stream.
Example 2: Using Mono with various operators
package com.javadzone.webflux;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple4;
@SpringBootTest
public class MonoTest {
@Test
void testMono(){
Mono<String> firstMono = Mono.just("First Mono");
Mono<String> secondMono = Mono.just("Second Mono");
Mono<String> thirdMono = Mono.just("Third Mono");
Mono<String> fourthMono = Mono.just("Fourth Mono");
// Subscribing to Monos and printing the data
firstMono.subscribe(data -> {
System.out.println("Subscribed to firstMono: "+data);
});
secondMono.subscribe(data -> {
System.out.println("Subscribed to secondMono: "+ data);
});
// Combining Monos using zipWith and zip operators
System.out.println("----------- zipWith() ------------ ");
Mono<Tuple2<String, String>> tuple2Mono = firstMono.zipWith(secondMono);
tuple2Mono.subscribe(data -> {
System.out.println(data.getT1());
System.out.println(data.getT2());
});
System.out.println("----------- zip() ------------ ");
Mono<Tuple4<String, String, String, String>> zip = Mono.zip(firstMono, secondMono, thirdMono, fourthMono);
zip.subscribe(data ->{
System.out.println(data.getT1());
System.out.println(data.getT2());
System.out.println(data.getT3());
System.out.println(data.getT4());
});
// Transforming Mono data using map and flatMap
System.out.println("----------- map() ------------ ");
Mono<String> map = firstMono.map(String::toUpperCase);
map.subscribe(System.out:: println);
System.out.println("----------- flatmap() ------------ ");
//flatmap(): Transform the item emitted by this Mono asynchronously,
//returning the value emitted by another Mono (possibly changing the value type).
Mono<String[]> flatMapMono = firstMono.flatMap(data -> Mono.just(data.split(" ")));
flatMapMono.subscribe(data-> {
for(String d: data) {
System.out.println(d);
}
//or
//Arrays.stream(data).forEach(System.out::println);
});
// Converting Mono into Flux using flatMapMany
System.out.println("---------- flatMapMany() ------------- ");
//flatMapMany(): Transform the item emitted by this Mono into a Publisher,
//then forward its emissions into the returned Flux.
Flux<String> stringFlux = firstMono.flatMapMany(data -> Flux.just(data.split(" ")));
stringFlux.subscribe(System.out::println);
// Concatenating Monos using concatWith
System.out.println("----------- concatwith() ------------ ");
Flux<String> concatMono = firstMono.concatWith(secondMono);
concatMono.subscribe(System.out::println);
}
}
Output:
This example showcases the usage of various Mono operators such as zipWith, zip, map, flatMap, flatMapMany, and concatWith. We create Monos with test data, subscribe to them, combine them using different operators, transform their data, and concatenate them.
Example 3: Writing Mono examples in a Controller
package com.javadzone.webflux.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
@RestController
public class WeatherController {
@GetMapping("/getWeatherDataAsync")
public Mono<String> getWeatherDataAsync() {
System.out.println("Real-time Example with Mono:");
Mono<String> weatherMono = fetchWeatherDataAsync(); // Fetch weather data asynchronously
weatherMono.subscribe(weather -> System.out.println("Received weather data: " + weather));
System.out.println("Continuing with other tasks...");
// Sleep for 6 seconds to ensure weather data retrieval completes
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return weatherMono;
}
@GetMapping("getWeatherDataSync")
public void getWeatherDataSync() {
System.out.println("Simple Example without Mono:");
fetchWeatherDataSync(); // Fetch weather data synchronously
System.out.println("Continuing with other tasks...");
// Sleep for 6 seconds to ensure weather data retrieval completes
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Mono<String> fetchWeatherDataAsync() {
System.out.println("Fetching weather data...");
return Mono.delay(Duration.ofSeconds(5)) // Simulate API call delay of 5 seconds
.map(delay -> "Weather data: Sunny and 30°C") // Simulated weather data
.subscribeOn(Schedulers.boundedElastic()); // Execute on separate thread
}
public static void fetchWeatherDataSync() {
System.out.println("Fetching weather data...");
// Simulate API call delay of 5 seconds
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Weather data: Sunny and 30°C");
}
}
Example 4: Real-time Use Case with Spring WebFlux Mono Example:
Let’s consider a real-time example of fetching weather data from an external API using Mono, and then contrast it with a simple example without using Mono.
package com.javadzone.webflux.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
@RestController
public class WeatherController {
@GetMapping("/getWeatherDataAsync")
public Mono<String> getWeatherDataAsync() {
System.out.println("Real-time Example with Mono:");
Mono<String> weatherMono = fetchWeatherDataAsync(); // Fetch weather data asynchronously
weatherMono.subscribe(weather -> System.out.println("Received weather data: " + weather));
System.out.println("Continuing with other tasks...");
// Sleep for 6 seconds to ensure weather data retrieval completes
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return weatherMono;
}
@GetMapping("getWeatherDataSync")
public void getWeatherDataSync() {
System.out.println("Simple Example without Mono:");
fetchWeatherDataSync(); // Fetch weather data synchronously
System.out.println("Continuing with other tasks...");
// Sleep for 6 seconds to ensure weather data retrieval completes
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Mono<String> fetchWeatherDataAsync() {
System.out.println("Fetching weather data...");
return Mono.delay(Duration.ofSeconds(5)) // Simulate API call delay of 5 seconds
.map(delay -> "Weather data: Sunny and 30°C") // Simulated weather data
.subscribeOn(Schedulers.boundedElastic()); // Execute on separate thread
}
public static void fetchWeatherDataSync() {
System.out.println("Fetching weather data...");
// Simulate API call delay of 5 seconds
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Weather data: Sunny and 30°C");
}
}
When we access the synchronous endpoint at http://localhost:8080/getWeatherDataSync, the output will be displayed immediately.
Simple Example without Mono:
Fetching weather data...
Weather data: Sunny and 30°C
Continuing with other tasks...
When we access the asynchronous endpoint at http://localhost:8080/getWeatherDataAsync, we will receive the weather data after other tasks have been completed.
Real-time Example with Mono:
Fetching weather data...
Continuing with other tasks...
Received weather data: Weather data: Sunny and 30°C