Spring WebFlux Flux Tutorial Examples

Discover the capabilities of Spring WebFlux Flux with this comprehensive tutorial. Gain insights into creating, manipulating, and transforming Flux streams effectively using practical examples. Develop proficiency in asynchronous, non-blocking programming principles and elevate your Spring application development expertise.

Flux: It’s a reactive stream in Spring WebFlux that can emit 0 or N items over time. It represents a sequence of data elements and is commonly used for handling streams of data that may contain multiple elements or continuous data flows.

Example:

Flux<User> fluxUsers = Flux.just(Arrays.asList(new User("John"),new User("Alice"),new User("Bob")));

Set up a Spring Boot project using Spring Initializr or any IDE of your choice, and make sure to include the following dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Example: Create FluxService.java class

This class, FluxService, offers various methods illustrating different functionalities of Flux streams.

  • getFlux(): Generates a Flux stream with predefined strings, serving as static data or example data.
  • getFluxList(): Converts collections of User objects into reactive streams, enabling integration with reactive programming.
  • filterFlux(): Filters elements in the Flux stream based on specific conditions, enabling selective processing.
  • flatMapExample(): Demonstrates asynchronous processing of each element in the Flux stream using flatMap.
  • tranformFluxExample(): Illustrates Flux transformation using the transform operator, promoting modularity and maintainability.
  • defaultIfEmptyExample(String str): Handles empty Flux streams gracefully by providing a default value based on a provided condition.
  • getBlankFlux(): Returns an empty Flux stream, useful as a placeholder or starting point for further processing.
@Service
public class FluxService {

    public Flux<String> getFlux() {
        return Flux.just("First", "Second", "Third", "Fourth", "Fifth");
    }

    public Flux<User> getFluxList() {
        return Flux.fromIterable(Arrays.asList(new User("Pavan", "pavan@123"),
                        new User("Kiran", "kiran@123")))
                .cast(User.class);
    }
    
    public Flux<String> filterFlux() {
        return getFlux().filter(data -> data.equals("CCC"));
    }

    public Flux<String> flatMapExample() {
        return getFlux().flatMap(data -> Flux.just(data)).delayElements(Duration.ofMillis(3000));
    }

    /*
        while you can achieve similar results without transform by chaining operators directly on the Flux,
        transform provides a cleaner, more modular approach to defining and applying Flux transformations,
        promoting code reuse, readability, and maintainability.
     */

    public void tranformFluxExample() {
        Flux<Integer> originalFlux = Flux.range(1, 10);
        //without transform method
        Flux<Integer> integerFlux = originalFlux.map(i -> i * 2).filter(i -> i % 3 != 0).publishOn(Schedulers.parallel());
        integerFlux.subscribe(data -> System.out.println(data)); //2 4 6 8 10 14 16 20

        //with transform method.
        Flux<Integer> transformedFlux = originalFlux.transform(flux -> {
            return flux.map(i -> i * 2).filter(i -> i % 3 != 0).subscribeOn(Schedulers.parallel());
        });
        transformedFlux.subscribe(data -> System.out.println(data));
    }

    public Flux<String> defaultIfEmptyExample(String str) {
        return getFlux().filter(data -> data.contains(str)).defaultIfEmpty("Doesn't contians: " + str);
    }

    public Flux<Object> getBlankFlux() {
        return Flux.empty();
    }

}

Test the functionality of the FluxService class by running the following test methods for Spring WebFlux Flux.

@SpringBootTest
public class FluxServiceTest {

    @Autowired
    private FluxService fluxService;

    @Test
    void testFlux() {
        fluxService.getFlux().subscribe(data -> {
            System.out.println(data);
        });
    }

    @Test
    void testGetFluxList() {
        fluxService.getFluxList().subscribe(data -> {
            System.out.println(data);
        });
    }

    @Test
    void testFilter() {
        Disposable subscribe = fluxService.filterFlux().subscribe(System.out::println);
        System.out.println("filtered text: " + subscribe); // CCC
    }

    @Test
    void testFlatMap() {
        fluxService.flatMapExample().subscribe(data -> {
            System.out.println("FlatMap: " + data);
        });
    }

    @Test
    void tranformFluxExample() {
        fluxService.tranformFluxExample();
    }

    @Test
    void ifExample() {
        Flux<String> flux = fluxService.defaultIfEmptyExample("Third");
        flux.subscribe(data->{
            System.out.println("data: "+data);
        });
        StepVerifier.create(flux).expectNext("Third").verifyComplete();
    }

    @Test
    void getBlankFlux() {
        Flux<Object> blankFlux = fluxService.getBlankFlux();
        StepVerifier.create(blankFlux).expectNext().verifyComplete();
    }
}

Output

Spring WebFlux Flux

Conclusion

In short, this article gave a hands-on look at Spring WebFlux Flux, showing how it works with easy examples. By getting a grasp of Flux’s role in reactive programming and trying out its functions, developers can use it to make Spring apps that react quickly and handle big loads. We made sure our code was solid by testing it well, setting the stage for effective and sturdy software building.

Spring Webflux Mono Example

Spring WebFlux Mono

In Spring WebFlux, Mono is crucial for managing asynchronous data streams. Think of Mono like a reliable source that can provide either no data or just one piece of information. This makes it ideal for situations where you’re expecting either a single result or nothing at all. When we look at a practical “Spring Webflux Mono Example,” Mono’s significance becomes clearer. It shows how effectively it handles asynchronous data streams, which is essential for many real-world applications.

Mono: It can emit 0 or 1 item. Its like CompletableFuture with 0 or 1 result. It’s commonly used when you expect a single result or no result. For example, finding an entity by its ID or saving an entity.

Mono<User> monoUser = Mono.just(new User());

Create a Spring Boot project and include the following dependency.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

Example 1: Using Mono with CoreSubscriber

package com.javadzone.webflux;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;

@SpringBootTest
class BootWebfluxApplicationTests {
    @Test
    public void test() {
        // Creating a Mono publisher with test data
        Mono<String> monoPublisher = Mono.just("Testdata");

        // Subscribing to the Mono publisher
        monoPublisher.subscribe(new CoreSubscriber<String>() {
            // Callback method invoked when subscription starts
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("on subscribe....");
                s.request(1);
            }

            // Callback method invoked when data is emitted
            @Override
            public void onNext(String data) {
                System.out.println("data: " + data);
            }

            // Callback method invoked when an error occurs
            @Override
            public void onError(Throwable t) {
                System.out.println("exception occured: " + t.getMessage());
            }

            // Callback method invoked when subscription is completed
            @Override
            public void onComplete() {
                System.out.println("completed the implementation....");
            }
        });
    }
}

This example demonstrates the usage of Mono with a CoreSubscriber, where we create a Mono publisher with test data and subscribe to it. We handle different callback methods such as onSubscribeonNextonError, and onComplete to manage the data stream.

Example 2: Using Mono with various operators 

package com.javadzone.webflux;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple4;

@SpringBootTest
public class MonoTest {

    @Test
    void testMono(){
        Mono<String> firstMono = Mono.just("First Mono");
        Mono<String> secondMono = Mono.just("Second Mono");
        Mono<String> thirdMono = Mono.just("Third Mono");
        Mono<String> fourthMono = Mono.just("Fourth Mono");

        // Subscribing to Monos and printing the data
        firstMono.subscribe(data -> {
            System.out.println("Subscribed to firstMono: "+data);
        });

        secondMono.subscribe(data -> {
            System.out.println("Subscribed to secondMono: "+ data);
        });
        

        // Combining Monos using zipWith and zip operators
        System.out.println("----------- zipWith() ------------ ");
        Mono<Tuple2<String, String>> tuple2Mono = firstMono.zipWith(secondMono);
        tuple2Mono.subscribe(data -> {
            System.out.println(data.getT1());
            System.out.println(data.getT2());
        });
        

        System.out.println("----------- zip() ------------ ");
        Mono<Tuple4<String, String, String, String>> zip = Mono.zip(firstMono, secondMono, thirdMono, fourthMono);
        zip.subscribe(data ->{
            System.out.println(data.getT1());
            System.out.println(data.getT2());
            System.out.println(data.getT3());
            System.out.println(data.getT4());
        });
        

        // Transforming Mono data using map and flatMap
        System.out.println("----------- map() ------------ ");
        Mono<String> map = firstMono.map(String::toUpperCase);
        map.subscribe(System.out:: println);
        
        

        System.out.println("----------- flatmap() ------------ ");
        //flatmap(): Transform the item emitted by this Mono asynchronously, 
         //returning the value emitted by another Mono (possibly changing the value type).
        Mono<String[]> flatMapMono = firstMono.flatMap(data -> Mono.just(data.split(" ")));
        flatMapMono.subscribe(data-> {
            for(String d: data) {
                System.out.println(d);
            }
            //or
            //Arrays.stream(data).forEach(System.out::println);
        });
        
        

        // Converting Mono into Flux using flatMapMany
        System.out.println("---------- flatMapMany() ------------- ");

        //flatMapMany(): Transform the item emitted by this Mono into a Publisher, 
        //then forward its emissions into the returned Flux.
        Flux<String> stringFlux = firstMono.flatMapMany(data -> Flux.just(data.split(" ")));
        stringFlux.subscribe(System.out::println);



        // Concatenating Monos using concatWith
        System.out.println("----------- concatwith() ------------ ");

        Flux<String> concatMono = firstMono.concatWith(secondMono);
        concatMono.subscribe(System.out::println);
    }

}

Output:

Spring Webflux Mono Example

This example showcases the usage of various Mono operators such as zipWith, zip, map, flatMap, flatMapMany, and concatWith. We create Monos with test data, subscribe to them, combine them using different operators, transform their data, and concatenate them. 

Example 3: Writing Mono examples in a Controller 

package com.javadzone.webflux.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

@RestController
public class WeatherController {

    @GetMapping("/getWeatherDataAsync")
    public Mono<String> getWeatherDataAsync() {
        System.out.println("Real-time Example with Mono:");

        Mono<String> weatherMono = fetchWeatherDataAsync(); // Fetch weather data asynchronously
        weatherMono.subscribe(weather -> System.out.println("Received weather data: " + weather));

        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return weatherMono;
    }


    @GetMapping("getWeatherDataSync")
    public void getWeatherDataSync() {
        System.out.println("Simple Example without Mono:");
        fetchWeatherDataSync(); // Fetch weather data synchronously
        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Mono<String> fetchWeatherDataAsync() {
        System.out.println("Fetching weather data...");
        return Mono.delay(Duration.ofSeconds(5))  // Simulate API call delay of 5 seconds
                .map(delay -> "Weather data: Sunny and 30°C") // Simulated weather data
                .subscribeOn(Schedulers.boundedElastic()); // Execute on separate thread
    }

    public static void fetchWeatherDataSync() {
        System.out.println("Fetching weather data...");
        // Simulate API call delay of 5 seconds
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Weather data: Sunny and 30°C");
    }
}

Example 4: Real-time Use Case with Spring WebFlux Mono Example:

Let’s consider a real-time example of fetching weather data from an external API using Mono, and then contrast it with a simple example without using Mono. 

package com.javadzone.webflux.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

@RestController
public class WeatherController {

    @GetMapping("/getWeatherDataAsync")
    public Mono<String> getWeatherDataAsync() {
        System.out.println("Real-time Example with Mono:");

        Mono<String> weatherMono = fetchWeatherDataAsync(); // Fetch weather data asynchronously
        weatherMono.subscribe(weather -> System.out.println("Received weather data: " + weather));

        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return weatherMono;
    }


    @GetMapping("getWeatherDataSync")
    public void getWeatherDataSync() {
        System.out.println("Simple Example without Mono:");
        fetchWeatherDataSync(); // Fetch weather data synchronously
        System.out.println("Continuing with other tasks...");

        // Sleep for 6 seconds to ensure weather data retrieval completes
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Mono<String> fetchWeatherDataAsync() {
        System.out.println("Fetching weather data...");
        return Mono.delay(Duration.ofSeconds(5))  // Simulate API call delay of 5 seconds
                .map(delay -> "Weather data: Sunny and 30°C") // Simulated weather data
                .subscribeOn(Schedulers.boundedElastic()); // Execute on separate thread
    }

    public static void fetchWeatherDataSync() {
        System.out.println("Fetching weather data...");
        // Simulate API call delay of 5 seconds
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Weather data: Sunny and 30°C");
    }
}

When we access the synchronous endpoint at http://localhost:8080/getWeatherDataSync, the output will be displayed immediately.

Simple Example without Mono:
Fetching weather data...
Weather data: Sunny and 30°C
Continuing with other tasks...

When we access the asynchronous endpoint at http://localhost:8080/getWeatherDataAsync, we will receive the weather data after other tasks have been completed.

Real-time Example with Mono:
Fetching weather data...
Continuing with other tasks...
Received weather data: Weather data: Sunny and 30°C