I have an external API which returns a List
of Student
along with count
and offset
parameters which represents number of students remaining in the database.
It is similar to a paginated response but it does not send Pageable
information in the response.
The response is in the below format.
{
"students":
[
{
"id":1,
"name":"Adam"
},
{
"id":2,
"name":"Alan"
}
],
"count":2,
"offset":10
}
Question:
I need to write a recursive function in Spring Reactive which calls this API and accumulates all Students Flux<Student>
and send it to the front end.
Limitations:
The maximum number of Students I can fetch per call is 2. There could be about 20 Students in the Database.
Rough algorithm:
do(getStudents(offset))
while(response.count <2);
I want to be able to perform this operation using Reactive Spring.
I am guessing I could use something like Flux.generate.takeUntil
etc.
However I am not sure of the right Syntax as takeUntil
accepts final value and not a variable.
Please suggest a way to achieve the above functionality in Reactive Spring
Any help is appreciated. Thanks in advance.
I have an external API which returns a List
of Student
along with count
and offset
parameters which represents number of students remaining in the database.
It is similar to a paginated response but it does not send Pageable
information in the response.
The response is in the below format.
{
"students":
[
{
"id":1,
"name":"Adam"
},
{
"id":2,
"name":"Alan"
}
],
"count":2,
"offset":10
}
Question:
I need to write a recursive function in Spring Reactive which calls this API and accumulates all Students Flux<Student>
and send it to the front end.
Limitations:
The maximum number of Students I can fetch per call is 2. There could be about 20 Students in the Database.
Rough algorithm:
do(getStudents(offset))
while(response.count <2);
I want to be able to perform this operation using Reactive Spring.
I am guessing I could use something like Flux.generate.takeUntil
etc.
However I am not sure of the right Syntax as takeUntil
accepts final value and not a variable.
Please suggest a way to achieve the above functionality in Reactive Spring
Any help is appreciated. Thanks in advance.
Share Improve this question asked Feb 6 at 6:29 Mohammed IdrisMohammed Idris 456 bronze badges 1- What's your use case? Why don't you use Rest API ? – Exotic Cut 5276 Commented Feb 6 at 21:07
1 Answer
Reset to default 0For recursion kind of use-cases, Reactor offers Mono.expand (and other variants like Flux.expand or Flux.expandDeep).
The expand operator is like flatMap, except that is it also re-applied on elements it produces (therefore, making a recursion effect).
For your use case, a pseudo-code could be:
int count = 2;
Mono<Response> firstPage = service.getFirstPage(count);
Flux<Response> allPages = firstPage.expand(response -> {
if (response.moreAvailable) return service.nextPage(response.offset, count);
else return Mono.empty();
});
Here is a complete minimal reproducible example that mock the service returning pages with in-memory records:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.List;
import static java.lang.Math.min;
public class ExpandPages {
// Data models
record Student(int id, String name) {}
record Page(int remaining, int nextOffset, List<Student> students) {}
/**
* Mock a service that sends pages of students
* @param all All available students
*/
record StudentRegistry(List<Student> all) {
StudentRegistry {
if (all == null || all.isEmpty()) throw new IllegalArgumentException("Null or empty student list");
all = Collections.unmodifiableList(all);
}
/**
* Request a page of students.
*
* @return A single page of students, starting at provided offset, with a maximum count of provided count.
*/
public Mono<Page> next(int offset, int count) {
if (offset < 0 || offset >= all.size()) throw new IllegalArgumentException("Bad offset");
if (count < 1) throw new IllegalArgumentException("Bad count");
count = min(count, all.size() - offset);
int nextOffset = offset + count;
int remaining = all.size() - nextOffset;
return Mono.just(new Page(remaining, nextOffset, all.subList(offset, offset + count)));
}
}
public static void main(String[] args) {
final var registry = new StudentRegistry(List.of(
new Student(1, "John"),
new Student(2, "Jane"),
new Student(3, "Jack"),
new Student(4, "Jules"),
new Student(5, "Julie"),
new Student(6, "James"),
new Student(7, "Joe"),
new Student(8, "Johanna"),
new Student(9, "Jolly Jumper")
));
final int queriedCount = 2;
Flux<Page> pages = registry
// Get first page
.next(0, queriedCount)
// Recurse on each received page: check if there's more, then ask for the next available page
.expand(response -> {
System.out.println("Received page "+response);
if (response.remaining() <= 0) {
System.out.println("No more page to fetch.");
return Mono.empty(); // Ends recursion
} else {
return registry.next(response.nextOffset(), min(queriedCount, response.remaining()));
}
});
// Trigger flow consumption: print all received students
pages.flatMapIterable(Page::students)
.doOnNext(System.out::println)
.blockLast();
}
}