New features of jdk11: http API and reactive streams
New features of jdk11: http API and reactive streams
brief introduction
In the new feature of jdk11: new HTTP API, we introduced that through the new HTTP API, we can send synchronous or asynchronous requests and obtain the returned results.
Today, we want to explore the relationship between these synchronous or asynchronous requests and responses and reactive streams.
How to use reactive streams in Java
For the introduction of reactive streams, you can refer to the reactive stream protocol for detailed explanation. The purpose of using reactive streams is to solve the communication problem between the sender and the consumer. The sender will not send information beyond the ability of the consumer.
Let's review some key concepts in reactive streams:
Further, if we want to implement a reactive streams ourselves, we need to do these things:
Example of post request
Remember the example we used when we talked about the new features of HTTP API in the last article?
In the example, we use an httprequest Bodypublisher is used to build post requests, and bodypublisher is a flow Publisher:
public interface BodyPublisher extends Flow.Publisher<ByteBuffer>
In other words, reactive streams has been used since bodypublisher.
In order to better understand the working principle of reactive streams, we create several wrapper classes to wrap publisher, subscriber and subscription, and output the corresponding logs.
There are a lot of codes, so we won't list them one by one. Here is only a specific implementation of custbodypublisher:
public class CustBodyPublisher implements HttpRequest.BodyPublisher {
private final HttpRequest.BodyPublisher bodyPublisher;
public CustBodyPublisher(HttpRequest.BodyPublisher bodyPublisher){
this.bodyPublisher=bodyPublisher;
}
@Override
public long contentLength() {
long contentLength=bodyPublisher.contentLength();
log.info("contentLength:{}",contentLength);
return contentLength;
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
log.info("CustBodyPublisher subscribe {}",subscriber);
bodyPublisher.subscribe(new CustSubscriber(subscriber));
}
}
The wrapper class is very simple, passing the constructor into the class that wants wrapper, and then calling the actual wrapper class in the corresponding method.
Finally, we will transform the example of calling HTTP API used earlier:
public void testCustPost() throws IOException,InterruptedException {
HttpClient client = HttpClient.newBuilder().build();
HttpRequest.BodyPublisher requestBody = HttpRequest.BodyPublishers
.ofString("{ 我是body }");
CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody);
HttpRequest postRequest = HttpRequest.newBuilder()
.POST(custBodyPublisher)
.uri(URI.create("http://www.flydean.com"))
.build();
HttpResponse<String> response = client
.send(postRequest,HttpResponse.BodyHandlers.ofString());
log.info("response {}",response);
}
Note that custbodypublisher custbodypublisher = new custbodypublisher (requestbody), we have created a new wrapper class.
Run it and observe the output:
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - contentLength:14
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - CustBodyPublisher subscribe jdk.internal.net.http.Http1Request$FixedContentSubscriber@672776b6
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onSubscribe jdk.internal.net.http.PullPublisher$Subscription@580ce038
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onNext length 14
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onComplete
[main] INFO com.flydean.ReactiveHttpUsage - response (POST http://www.flydean.com) 200
You can see the specific workflow of reactive stream. First, CustBodyPublisher is created, then the subscribe method is invoked.
Then custsubscriber calls onsubscribe to create a subscription.
Each time the request method of custsubscription causes the onnext method of custsubscriber to be called.
Finally, when custsubscription requests no result again, custsubscriber calls oncomplete method to end the whole process.