人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

后端框架-异步请求

有些请求业务处理流程可能比较耗时,比如 IO 操作、长查询、远程调用等,主线程会被一直占用,而 Tomcat 线程池线程有限,处理量就会下降。

Servlet 3.0 以后提供了对异步处理的支持,目的就是将容器线程池和业务线程池分离开,将耗时较长的操作移动到业务线程池中进行,释放容器线程,使得容器线程可以处理其他任务,在业务逻辑执行完毕之后,再通知 Tomcat 容器线程池来继续执行后面的操作。

原始模型在处理业务逻辑的过程中会一直占有容器线程池,而异步 Servlet 模型在业务线程池处理的过程中,有一段时间容器线程池中的那个线程是空闲的,这种设计大大提高了容器的处理请求的能力。

Spring MVC 封装了异步处理,满足用户请求后,主线程很快结束,同时开启其它线程处理任务,并将处理结果异步地响应用户,而主线程就可以接收更多请求。

如果要返回单个异步值,可以在 Controller中 返回 Callable、WebAsyncTask 或 DeferredResult,如果要生成多个异步值并将这些值写入响应,则可以在 Controller 中返回 ResponseBodyEmitter、SseEmitter或 StreamingResponseBody。

Callable

控制器可以用 Callable 包装任何受支持的返回值。返回值由配置的 TaskExecutor 运行给定的任务来获得。

示例:

1
2
3
4
5
6
7
8
9
@GetMapping("callable")
public Callable<String> callable() {
log.info("Main thread name:{}",Thread.currentThread().getName());

return () -> {
log.info("Execution thread name:{}",Thread.currentThread().getName());
return "Hello,World!";
};
}

WebAsyncTask

如果需要超时处理的回调或者错误处理的回调,可以使用 WebAsyncTask 代替 Callable,它包装了 Callable,功能更强大些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@GetMapping("asyncTask")
public WebAsyncTask<String> asyncTask() {
log.info("Main thread name:{}",Thread.currentThread().getName());

WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000,() -> {
log.info("Execution thread name:{}",Thread.currentThread().getName());
return "Hello,World!";
});

// 成功回调
webAsyncTask.onCompletion(() -> System.out.println("Finish!"));

// 超时回调
webAsyncTask.onTimeout(() -> "Time out!");

// 错误回调
webAsyncTask.onError(() -> "Error!");

return webAsyncTask;
}

DeferredResult

DeferredResult 使用方式与 Callable 类似,但在返回结果上不一样,它返回的时候实际结果可能还没有生成,实际的结果可能会由另外的线程里面设置到 DeferredResult 中。这个特性非常重要,要实现复杂的功能(比如服务端推技术、订单过期时间处理、长轮询、模拟MQ的功能等等高级应用)都会用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@GetMapping("/deferred")
public DeferredResult<String> deferredResult() {
log.info("Main thread name:{}",Thread.currentThread().getName());

DeferredResult<String> deferredResult = new DeferredResult<>();

// 实际应用中,可以由消息队列、定时任务或其它事件触发
CompletableFuture
.supplyAsync(() -> {
log.info("Execution thread name:{}",Thread.currentThread().getName());
return "Hello,World!";
})
.whenCompleteAsync((result,throwable) -> {
// 重点:将异步结果赋值到 deferredResult 中
deferredResult.setResult(result);
});

// 成功回调
deferredResult.onCompletion(() -> log.info("Finish!"));

// 超时回调
deferredResult.onTimeout(() -> log.warn("Time out!"));

// 错误回调
deferredResult.onError((e) -> log.error("Error!"));

// 虽然这里有 return,但如果一直没有调用 setResult 设置值,线程就会一直 hold 在这里
return deferredResult;
}

ResponseBodyEmitter

我们可以使用 ResponseBodyEmitter 返回值来生成对象流,其中每个对象都使用 HttpMessageConverter 序列化并写入响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@GetMapping("/emitter")
public ResponseBodyEmitter emitter() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();

// 线程 1 输出
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
emitter.send("Hello,World!");
} catch (IOException | InterruptedException e) {
log.error("Error!",e);
}
});

// 线程 2 输出
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
emitter.send("Hello,World again!");
} catch (IOException | InterruptedException e) {
log.error("Error!",e);
}
});

// 线程 3 标记结束
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
emitter.complete();
} catch (InterruptedException e) {
log.error("Error!",e);
}
});

// 一直阻塞,直到调用 emitter.complete()
return emitter;
}

我们还可以使用 ResponseBodyEmitter 作为 ResponseEntity 中的主体,从而自定义响应的状态和标题。

SseEmitter

SseEmitter (ResponseBodyEmitter 的子类) 提供对服务器发送事件的支持,从服务器发送的事件根据 W3C SSE 规范进行格式化。要从控制器生成 SSE 流,返回 SseEmitter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@GetMapping("/sseEmitter")
public SseEmitter sseEmitter() {
SseEmitter emitter = new SseEmitter ();

// 线程 1 输出
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
emitter.send("Hello,World!");
} catch (IOException | InterruptedException e) {
log.error("Error!",e);
}
});

// 线程 2 输出
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
emitter.send("Hello,World again!");
} catch (IOException | InterruptedException e) {
log.error("Error!",e);
}
});

// 线程 3 标记结束
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
emitter.complete();
} catch (InterruptedException e) {
log.error("Error!",e);
}
});

// 一直阻塞,直到调用 emitter.complete()
return emitter;
}

虽然 SSE 是流媒体到浏览器的主要选项,但请注意 Internet Explorer 不支持服务器发送的事件。考虑使用 Spring 的 WebSocket 消息传递和 SockJS 回退传输(包括 SSE),这些回退传输针对广泛的浏览器。

StreamingResponseBody

有时,绕过消息转换并直接将流发送到响应 OutputStream 是很有用的(例如,对于文件下载)。我们可以使用 StreamingResponseBody 返回值类型来完成。

1
2
3
4
5
6
7
8
@GetMapping("/download")
public StreamingResponseBody download() {
log.info("Main thread name:{}",Thread.currentThread().getName());
return outputStream -> {
log.info("Execution thread name:{}",Thread.currentThread().getName());
// write...
};
}

我们可以使用 StreamingResponseBody 作为 ResponseEntity 中的主体来定制响应的状态和标题。

配置 AsyncTaskExecutor

Spring MVC 执行异步处理需要用到 AsyncTaskExecutor,这个可以在 WebMvcConfigurationSupport.configureAsyncSupport 方法中提供。如果不提供,则使用 SimpleAsyncTaskExecutor,SimpleAsyncTaskExecutor 不使用线程池,因此推荐提供自定义的 AsyncTaskExecutor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class WebMvcConfig extends WebMvcConfigurationSupport {
/**
* 默认的线程池
*/
private final Executor executor;

public WebMvcConfig(@Qualifier("applicationTaskExecutor") Executor executor) {
this.executor = executor;
}

@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor((AsyncTaskExecutor) executor);
super.configureAsyncSupport(configurer);
}
}

处理流程

Servlet 异步处理简述

可以通过调用 request.startAsync() 将 ServletRequest 设置为异步模式。这样做的主要效果是 Servlet (以及任何过滤器)可以退出,但是响应仍然是开放的,以便稍后完成处理。

对 request.startAsync() 的调用将返回 AsyncContext,我们可以使用它进一步控制异步处理。例如,它提供分派方法,该方法类似于 Servlet API 的转发,只是它允许应用程序在 Servlet 容器线程上恢复请求处理。

ServletRequest 提供对当前 DispatcherType 的访问,我们可以使用它来区分当前处理的是原始请求、异步分发请求、转发或是其他类型的请求分发类型。

Callable 处理流程

  • Controller 返回一个 Callable。
  • Spring MVC 调用 request.startAsync() 并将调用提交给 TaskExecutor,以便在单独的线程中进行处理。
  • 同时,DispatcherServlet 和所有过滤器退出 Servlet 容器线程,但是响应保持打开状态。
  • 最后,Callable 产生一个结果,Spring MVC 将请求发送回 Servlet 容器以完成处理。
  • 再次调用 DispatcherServlet,然后处理从 Callable异步生成的返回值。

DeferredResult 处理流程

  • Controller 返回一个 DeferredResult,并将其保存在某个可以访问它的内存队列或列表中。
  • Spring MVC 调用 request.startAsync()。
  • 同时,DispatcherServlet 和所有配置的过滤器退出请求处理线程,但是响应保持打开状态。
  • 应用程序从某个线程设置 DeferredResult,Spring MVC 将请求发送回 Servlet 容器。
  • 再次调用 DispatcherServlet,然后继续处理异步生成的返回值。

对比 WebFlux

Servlet API 最初是为通过 Filter-Servlet 链进行单次传递而构建的。在 Servlet 3.0 中添加的异步请求处理允许应用程序退出 Filter-Servlet 链,但保持响应以供进一步处理。Spring MVC 异步支持就是围绕这种机制构建的。当 Controller 返回一个 DeferredResult 时,Filter-Servlet 链被退出,Servlet 容器线程被释放。稍后,当设置 DeferredResult 时,将进行异步分发(到相同的 URL),在此期间将再次映射 Controller,但不是调用它,而是使用 DeferredResult 值(就像 Controller 返回它一样)来恢复处理。

相比之下,Spring WebFlux 既不是基于 Servlet API 构建的,也不需要这样的异步请求处理特性,因为它在设计上就是异步的。异步处理构建在所有框架契约中,并在请求处理的所有阶段得到本质上的支持。

从编程模型的角度来看,Spring MVC 和 Spring WebFlux 都支持异步和 Reactive Types 作为 Controller 方法的返回值。Spring MVC 甚至支持流,包括 reactive back pressure.。但是,对响应的各个写操作仍然是阻塞的(并且是在单独的线程上执行的),这与 WebFlux 不同,后者依赖于非阻塞 IO,并且每次写操作都不需要额外的线程。

另一个基本区别是,Spring MVC 在 Controller 方法参数中不支持异步或 reactive types(例如,@RequestBody、@RequestPart等),也不支持将异步和响应类型作为模型属性。而 Spring WebFlux 却支持所有这些。

小礼物走一走,来 Github 关注我