I am trying to use RecordParser
for large response from WebClient
.
Vert.x documentation says:
When large response are expected, use the BodyCodec.pipe. This body codec pumps the response body buffers to a WriteStream and signals the success or the failure of the operation in the async result response
But I don't see an easy way to pass that WriteStream
to RecordParser
. I have used simplified code pasted below which works but implementing such is a source of potential bugs as async protocols are easier to mess up. Does vert.x offers out of box such integration.
RecordParser parser = RecordParser.newDelimited("\n", b -> log.info("r={}", b.toString()));
RecordParserWriteStream bridge = new RecordParserWriteStream(parser);
client
.get(sut.actualPort(), "localhost", "/stream?file=stream2.txt")
.as(BodyCodec.pipe(bridge))
.send(
ar -> {
if (ar.succeeded()) {
ctxpleteNow();
} else {
ctx.failNow(ar.cause());
}
});
@Slf4j
@RequiredArgsConstructor
public class RecordParserWriteStream implements WriteStream<Buffer> {
private final RecordParser recordParser;
@Override
public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
log.info("write {}", data.length());
recordParser.handle(data);
Promise<Void> promise = Promise.promise();
promiseplete();
handler.handle(promise.future());
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
Promise<Void> promise = Promise.promise();
promiseplete();
handler.handle(promise.future());
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
return this;
}
@Override
public Future<Void> write(Buffer data) {
throw new UnsupportedOperationException();
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
throw new UnsupportedOperationException();
}
}
I found few older SO answers where it was suggested to use HttpClient
but official documentation still recommends WebClient
with BodyCodec.pipe/WriteStream
.
With using HttpClient
it looks as
RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
.request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
pose(HttpClientRequest::send)
.onComplete(
ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
response.handler(parser);
response.endHandler(e -> ctxpleteNow());
} else {
ctx.failNow(ar.cause());
}
});
What is a best way to call HTTP service and pass the response to RecordParser
?
I am trying to use RecordParser
for large response from WebClient
.
Vert.x documentation says:
When large response are expected, use the BodyCodec.pipe. This body codec pumps the response body buffers to a WriteStream and signals the success or the failure of the operation in the async result response
But I don't see an easy way to pass that WriteStream
to RecordParser
. I have used simplified code pasted below which works but implementing such is a source of potential bugs as async protocols are easier to mess up. Does vert.x offers out of box such integration.
RecordParser parser = RecordParser.newDelimited("\n", b -> log.info("r={}", b.toString()));
RecordParserWriteStream bridge = new RecordParserWriteStream(parser);
client
.get(sut.actualPort(), "localhost", "/stream?file=stream2.txt")
.as(BodyCodec.pipe(bridge))
.send(
ar -> {
if (ar.succeeded()) {
ctxpleteNow();
} else {
ctx.failNow(ar.cause());
}
});
@Slf4j
@RequiredArgsConstructor
public class RecordParserWriteStream implements WriteStream<Buffer> {
private final RecordParser recordParser;
@Override
public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
log.info("write {}", data.length());
recordParser.handle(data);
Promise<Void> promise = Promise.promise();
promiseplete();
handler.handle(promise.future());
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
Promise<Void> promise = Promise.promise();
promiseplete();
handler.handle(promise.future());
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
return this;
}
@Override
public Future<Void> write(Buffer data) {
throw new UnsupportedOperationException();
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
throw new UnsupportedOperationException();
}
}
I found few older SO answers where it was suggested to use HttpClient
but official documentation still recommends WebClient
with BodyCodec.pipe/WriteStream
.
With using HttpClient
it looks as
RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
.request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
pose(HttpClientRequest::send)
.onComplete(
ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
response.handler(parser);
response.endHandler(e -> ctxpleteNow());
} else {
ctx.failNow(ar.cause());
}
});
What is a best way to call HTTP service and pass the response to RecordParser
?
- This question is similar to: Vert.x httpClient/webClient process response chunk by chunk or as stream. If you believe it’s different, please edit the question, make it clear how it’s different and/or how the answers on that question are not helpful for your problem. – tsegismont Commented Dec 12, 2024 at 11:43
- @tsegismont I found that question before posting this one (even asked a question there). "you can do" mindset sometime relies on internal contracts and from someone from the outside it is not obvious if that's comes with hidden bugs. Thus recommendation from the Vert.x team "do ... to connect RecordParser with HTTP streaming" is highly appreciated. Also that SO answer is 4 years old and latest doc suggested me to use BodyCodec.pipe. I understand that my second solution is now a preferred for this problem. Thank you for great framework! – kodstark Commented Mar 19 at 14:20
1 Answer
Reset to default 0@tsegismont posted a comment that a answer for Vert.x httpClient/webClient process response chunk by chunk or as stream is still up to date and HttpClient
should be used when HTTP streaming must be connected with RecordParser. It means second solution from the question is preferred:
RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
.request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
pose(HttpClientRequest::send)
.onComplete(
ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
response.handler(parser);
response.endHandler(e -> ctxpleteNow());
} else {
ctx.failNow(ar.cause());
}
});
Ideally PR for Vert.x documentation should clarify it.