SpringCloud + WebFlux 史上最全

疯狂创客圈 2021-01-22 09:40:33
SpringCloud 史上 最全 webflux


前言

webmvc和webflux作为spring framework的两个重要模块,代表了两个IO模型,阻塞式和非阻塞式的。

webmvc是基于servlet的阻塞式模型(一般称为oio),一个请求到达服务器后会单独分配一个线程去处理请求,如果请求包含IO操作,线程在IO操作结束之前一直处于阻塞等待状态,这样线程在等待IO操作结束的时间就浪费了。

webflux是基于reactor的非阻塞模型(一般称为nio),同样,请求到达服务器后也会分配一个线程去处理请求,如果请求包含IO操作,线程在IO操作结束之前不再是处于阻塞等待状态,而是去处理其他事情,等到IO操作结束之后,再通知(得益于系统的机制)线程继续处理请求。

这样线程就有效地利用了IO操作所消耗的时间。

WebFlux 增删改查完整实战 demo

Dao层 (又称 repository 层)

entity(又称 PO对象)

新建User 对象 ,代码如下:


package com.crazymaker.springcloud.reactive.user.info.entity;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table(name = "t_user")
public final class UserEntity extends User
{
@Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Override
public long getUserId()
{
return super.getUserId();
}
@Column(name = "name")
public String getName()
{
return super.getName();
}
}

Dao 实现类

@Repository 用于标注数据访问组件,即 DAO 组件。实现代码中使用名为 repository 的 Map 对象作为内存数据存储,并对对象具体实现了具体业务逻辑。JpaUserRepositoryImpl 负责将 PO 持久层(数据操作)相关的封装组织,完成新增、查询、删除等操作。


package com.crazymaker.springcloud.reactive.user.info.dao.impl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List;
@Repository
@Transactional
public class JpaUserRepositoryImpl
{
@PersistenceContext
private EntityManager entityManager;
public Long insert(final User user)
{
entityManager.persist(user);
return user.getUserId();
}
public void delete(final Long userId)
{
Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
query.executeUpdate();
}
@SuppressWarnings("unchecked")
public List<User> selectAll()
{
return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();
}
@SuppressWarnings("unchecked")
public User selectOne(final Long userId)
{
Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
return (User) query.getSingleResult();
}
}

Service服务层


package com.crazymaker.springcloud.reactive.user.info.service.impl;
import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{
@Resource
private JpaUserRepositoryImpl userRepository;
@Transactional
//增加用户
public User addUser(User dto)
{
User userEntity = new UserEntity();
userEntity.setUserId(dto.getUserId());
userEntity.setName(dto.getName());
userRepository.insert(userEntity);
BeanUtil.copyProperties(userEntity,dto);
return dto;
}
@Transactional
//删除用户
public User delUser(User dto)
{
userRepository.delete(dto.getUserId());
return dto;
}
//查询全部用户
public List<User> selectAllUser()
{
log.info("方法 selectAllUser 被调用了");
return userRepository.selectAll();
}
//查询一个用户
public User selectOne(final Long userId)
{
log.info("方法 selectOne 被调用了");
return userRepository.selectOne(userId);
}
}

Controller控制层

Spring Boot WebFlux也可以使用注解模式来进行API接口开发。

package com.crazymaker.springcloud.reactive.user.info.controller;
import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
/**
* Mono 和 Flux 适用于两个场景,即:
* Mono:实现发布者,并返回 0 或 1 个元素,即单对象。
* Flux:实现发布者,并返回 N 个元素,即 List 列表对象。
* 有人会问,这为啥不直接返回对象,比如返回 City/Long/List。
* 原因是,直接使用 Flux 和 Mono 是非阻塞写法,相当于回调方式。
* 利用函数式可以减少了回调,因此会看不到相关接口。这恰恰是 WebFlux 的好处:集合了非阻塞 + 异步
*/
@Slf4j
@Api(value = "用户信息、基础学习DEMO", tags = {"用户信息DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{
@ApiOperation(value = "回显测试", notes = "提示接口使用者注意事项", httpMethod = "GET")
@RequestMapping(value = "/hello")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名称", required = true)})
public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name)
{
log.info("方法 hello 被调用了");
return Mono.just(RestOut.succeed("hello " + name));
}
@Resource
JpaEntityServiceImpl jpaEntityService;
@PostMapping("/add/v1")
@ApiOperation(value = "插入用户" )
@ApiImplicitParams({
// @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
// @ApiImplicitParam(paramType = "body", dataType="用户", name = "dto", required = true)
@ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto", required = true),
})
// @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User", required = true)
public Mono<User> userAdd(@RequestBody User dto)
{
//命令式写法
// jpaEntityService.delUser(dto);
//响应式写法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}
@PostMapping("/del/v1")
@ApiOperation(value = "响应式的删除")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto", required = true),
})
public Mono<User> userDel(@RequestBody User dto)
{
//命令式写法
// jpaEntityService.delUser(dto);
//响应式写法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));
}
@PostMapping("/list/v1")
@ApiOperation(value = "查询用户")
public Flux<User> listAllUser()
{
log.info("方法 listAllUser 被调用了");
//命令式写法 改为响应式 以下语句,需要在流中执行
// List<User> list = jpaEntityService.selectAllUser();
//响应式写法
Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());
return userFlux;
}
@PostMapping("/detail/v1")
@ApiOperation(value = "响应式的查看")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto", required = true),
})
public Mono<User> getUser(@RequestBody User dto)
{
log.info("方法 getUser 被调用了");
//构造流
Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));
return userMono;
}
@PostMapping("/detail/v2")
@ApiOperation(value = "命令式的查看")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto", required = true),
}) public RestOut<User> getUserV2(@RequestBody User dto)
{
log.info("方法 getUserV2 被调用了");
User user = jpaEntityService.selectOne(dto.getUserId());
return RestOut.success(user);
}
}

从返回值可以看出,Mono 和 Flux 适用于两个场景,即:

  • Mono:实现发布者,并返回 0 或 1 个元素,即单对象
  • Flux:实现发布者,并返回 N 个元素,即 List 列表对象

有人会问,这为啥不直接返回对象,比如返回 City/Long/List。原因是,直接使用 Flux 和 Mono 是非阻塞写法,相当于回调方式。利用函数式可以减少了回调,因此会看不到相关接口。这恰恰是 WebFlux 的好处:集合了非阻塞 + 异步。

Mono

Mono 是什么? 官方描述如下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Mono 是响应流 Publisher 具有基础 rx 操作符。可以成功发布元素或者错误。如图所示:

img

file

Mono 常用的方法有:

  • Mono.create():使用 MonoSink 来创建 Mono
  • Mono.justOrEmpty():从一个 Optional 对象或 null 对象中创建 Mono。
  • Mono.error():创建一个只包含错误消息的 Mono
  • Mono.never():创建一个不包含任何消息通知的 Mono
  • Mono.delay():在指定的延迟时间之后,创建一个 Mono,产生数字 0 作为唯一值

Flux

Flux 是什么? 官方描述如下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

Flux 是响应流 Publisher 具有基础 rx 操作符。可以成功发布 0 到 N 个元素或者错误。Flux 其实是 Mono 的一个补充。如图所示:

img

file

所以要注意:如果知道 Publisher 是 0 或 1 个,则用 Mono。

Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 可以发布 Iterable 类型的元素。当然,Flux 也包含了基础的操作:map、merge、concat、flatMap、take,这里就不展开介绍了。

使用配置模式进行WebFlux 接口开发

1 可以编写一个处理器类 Handler代替 Controller , Service 、dao层保持不变。

2 配置请求的路由

处理器类 Handler

处理器类 Handler需要从请求解析参数,并且封装响应,代码如下:

package com.crazymaker.springcloud.reactive.user.info.config.handler;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Slf4j
@Component
public class UserReactiveHandler
{
@Resource
private JpaEntityServiceImpl jpaEntityService;
/**
* 得到所有用户
*
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request)
{
log.info("方法 getAllUser 被调用了");
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);
}
/**
* 创建用户
*
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request)
{
// 2.0.0 是可以工作, 但是2.0.1 下面这个模式是会报异常
Mono<User> user = request.bodyToMono(User.class);
/**Mono 使用响应式的,时候都是一个流,是一个发布者,任何时候都不能调用发布者的订阅方法
也就是不能消费它, 最终的消费还是交给我们的Springboot来对它进行消费,任何时候不能调用它的
user.subscribe();
不能调用block
把异常放在统一的地方来处理
*/
return user.flatMap(dto ->
{
// 校验代码需要放在这里
if (StringUtils.isBlank(dto.getName()))
{
throw new BusinessException("用户名不能为空");
}
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);
});
}
/**
* 根据id删除用户
*
* @param request
* @return
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request)
{
String id = request.pathVariable("id");
// 校验代码需要放在这里
if (StringUtils.isBlank(id))
{
throw new BusinessException("id不能为空");
}
User dto = new User();
dto.setUserId(Long.parseLong(id));
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);
}
}

路由配置

package com.crazymaker.springcloud.reactive.user.info.config;
import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
@Configuration
public class RoutersConfig
{
@Bean
RouterFunction<ServerResponse> routes(UserReactiveHandler handler)
{
// 下面的相当于类里面的 @RequestMapping
// 得到所有用户
return RouterFunctions.route(GET("/user"), handler::getAllUser)
// 创建用户
.andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
// 删除用户
.andRoute(DELETE("/user/{id}"), handler::deleteUserById);
}
@Value("${server.servlet.context-path}")
private String contextPath;
//处理上下文路径,没有上下文路径,此函数可以忽略
@Bean
public WebFilter contextPathWebFilter()
{
return (exchange, chain) ->
{
ServerHttpRequest request = exchange.getRequest();
String requestPath = request.getURI().getPath();
if (requestPath.startsWith(contextPath))
{
return chain.filter(
exchange.mutate()
.request(request.mutate().contextPath(contextPath).build())
.build());
}
return chain.filter(exchange);
};
}
}

集成Swagger

本文主要展示一下如何使用支持WebFlux的Swagger

maven依赖

 <dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-spring-webflux</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
  • swagger.version目前是3.0.0,Spring 5引入了WebFlux,而当前版本的SpringFox Swagger2(2.9.2)还不支持WebFlux,得使用3.0.0才支持

swagger 配置

package com.crazymaker.springcloud.reactive.user.info.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;
@Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{
@Bean
public Docket createRestApi()
{
// return new Docket(DocumentationType.OAS_30)
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.pathMapping(servletContextPath) //注意webflux没有context-path配置,如果不加这句话的话,接口测试时路径没有前缀
.select()
.apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller"))
.paths(PathSelectors.any())
.build();
}
@Value("${server.servlet.context-path}")
private String servletContextPath;
//构建 api文档的详细信息函数
private ApiInfo apiInfo()
{
return new ApiInfoBuilder()
//页面标题
.title("疯狂创客圈 springcloud + Nginx 高并发核心编程")
//描述
.description("Zuul+Swagger2 构建 RESTful APIs")
//条款地址
.termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/")
.contact(new Contact("疯狂创客圈", "https://www.cnblogs.com/crazymakercircle/", ""))
.version("1.0")
.build();
}
/**
* 重写 PathProvider ,解决 context-path 重复问题
* @return
*/
@Order(Ordered.HIGHEST_PRECEDENCE)
@Bean
public PathProvider pathProvider() {
return new DefaultPathProvider() {
@Override
public String getOperationPath(String operationPath) {
operationPath = operationPath.replaceFirst(servletContextPath, "/");
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");
return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());
}
@Override
public String getResourceListingPath(String groupName, String apiDeclaration) {
apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);
return apiDeclaration;
}
};
}
}

测试

配置模式的 WebFlux Rest接口测试

配置模式的 WebFlux Rest接口只能使用PostMan测试,例子如下:

在这里插入图片描述

注意,不能带上下文路径:

http://192.168.68.1:7705/uaa-react-provider/user

注解模式的WebFlux Rest接口测试

swagger 增加界面

在这里插入图片描述

CRUD其他的界面,略过

配置大全

静态资源配置

@Configuration
@EnableWebFlux //使用注解@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer { //继承WebFluxConfigurer
//配置静态资源
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/static/");
registry.addResourceHandler("/file/**")
.addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);
registry.addResourceHandler("/swagger-ui.html**")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}
//配置拦截器
//配置编解码
...
}

WebFluxSecurity配置

@Configuration
@EnableWebFluxSecurity //使用注解@EnableWebFluxSecurity
public class WebFluxSecurityConfig implements
WebFilter, //拦截器
ServerLogoutSuccessHandler, //登出成功回调
ServerAuthenticationEntryPoint, //验证入口
ServerAuthenticationFailureHandler, //验证成功回调
ServerAuthenticationSuccessHandler { //验证失败回调
//实现接口的方法
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
//配置webflux的context-path
ServerHttpRequest request = exchange.getRequest();
if (request.getURI().getPath().startsWith(contextPath)) {
exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();
}
//把查询参数转移到FormData中,不然验证过滤器(ServerFormLoginAuthenticationConverter)接受不到参数
if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {
ServerWebExchange finalExchange = exchange;
ServerWebExchange realExchange = new Decorator(exchange) {
@Override
public Mono<MultiValueMap<String, String>> getFormData() {
return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {
@Override
public MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {
if (stringStringMultiValueMap.size() == 0) {
return finalExchange.getRequest().getQueryParams();
} else {
return stringStringMultiValueMap;
}
}
});
}
};
return chain.filter(realExchange);
}
return chain.filter(exchange);
}
@Override
public Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功"));
}
@Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {
return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未验证"));
}
@Override
public Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {
return sendJson(webFilterExchange.getExchange(), new Response<>(1, "验证失败"));
}
@Override
public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return webFilterExchange.getChain().filter(
webFilterExchange.getExchange().mutate()
.request(t -> t.method(HttpMethod.POST).path("/user/login")) //转发到自定义控制器
.build()
);
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST)
.csrf().disable()
.authorizeExchange()
.pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs") //swagger
.permitAll()
.and()
.authorizeExchange()
.pathMatchers("/static/**", "/file/**") //静态资源
.permitAll()
.and()
.authorizeExchange()
.anyExchange()
.authenticated()
.and()
.logout() //登出
.logoutUrl("/user/logout")
.logoutSuccessHandler(this)
.and()
.exceptionHandling() //未验证回调
.authenticationEntryPoint(this)
.and()
.formLogin()
.loginPage("/user/login")
.authenticationFailureHandler(this) //验证失败回调
.authenticationSuccessHandler(this) //验证成功回调
.and()
.httpBasic()
.authenticationEntryPoint(this); //basic验证,一般用于移动端
return http.build();
}
}

WebSession配置

@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用注解@EnableRedisWebSession ,maxInactiveIntervalInSeconds设置数据过期时间,spring.session.timeout不管用
public class RedisWebSessionConfig { //考虑到分布式系统,一般使用redis存储session
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory();
}
}

文件上传配置

//参数上传
//定义参数bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{
@ApiModelProperty(value = "普通参数", required = false, example = "")
private String query;
@ApiModelProperty(value = "文件参数", required = false, example = "")
private FilePart image; //强调,webflux中使用FilePart作为接收文件的类型
}
//定义接口
@ApiOperation("一个接口")
@PostMapping("/path")
//这里需要使用@ApiImplicitParam显示配置【文件参数】才能使swagger界面显示上传文件按钮
@ApiImplicitParams({
@ApiImplicitParam(
paramType = "form", //表单参数
dataType = "__file", //最新版本使用__file表示文件,以前用的是file
name = "image", //和QueryBean里面的【文件参数image】同名
value = "文件") //注释
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) {
}

WebFlux 执行流程

userAdd方法代码如下:

 public Mono<User> userAdd(@RequestBody User dto)
{
//命令式写法
// jpaEntityService.delUser(dto);
//响应式写法
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}

由于返回的数据只有一个所以使用的是Mono作为返回数据,使用Mono类静态create方法创建Mono对象,代码如下:

public abstract class Mono<T> implements Publisher<T> {
static final BiPredicate EQUALS_BIPREDICATE = Object::equals;
public Mono() {
}
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}
...
}

​ 可以到create方法接收一个参数,参数是Consumer对象,通过callback可以看出,这里使用的是callback回调,下面看看Consumer接口的定义:


@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
/**
* Returns a composed {@code Consumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code Consumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}

通过上面的代码可以看出,有两个方法,一个是默认的方法andThen,还有一个accept方法,

Mono.create()方法的参数需要一个实现类,实现Consumer接口;Mono.create方法的参数指向的实例对象, 就是要实现这个accept方法。

例子中,下面的lambda表达式,就是accept方法的实现,实参的类型为 Consumer<MonoSink > , accept的实现为 如下:

cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))

来来来,重复看一下,create方法的实现:

 public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}

​ 在方法内部调用了onAssembly方法,参数是MonoCreate对象,然后我们看看MonoCreate类,代码如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
final class MonoCreate<T> extends Mono<T> {
final Consumer<MonoSink<T>> callback;
MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
}
public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter);
try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
}
}
static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {
final CoreSubscriber<? super T> actual;
volatile Disposable disposable;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");
volatile int state;
static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");
volatile LongConsumer requestConsumer;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");
T value;
static final int NO_REQUEST_HAS_VALUE = 1;
static final int HAS_REQUEST_NO_VALUE = 2;
static final int HAS_REQUEST_HAS_VALUE = 3;
DefaultMonoSink(CoreSubscriber<? super T> actual) {
this.actual = actual;
}
public Context currentContext() {
return this.actual.currentContext();
}
@Nullable
public Object scanUnsafe(Attr key) {
if (key != Attr.TERMINATED) {
return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);
} else {
return this.state == 3 || this.state == 1;
}
}
public void success() {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
}
public void success(@Nullable T value) {
if (value == null) {
this.success();
} else {
int s;
do {
s = this.state;
if (s == 3 || s == 1) {
Operators.onNextDropped(value, this.actual.currentContext());
return;
}
if (s == 2) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
return;
}
this.value = value;
} while(!STATE.compareAndSet(this, s, 1));
}
}
public void error(Throwable e) {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onError(e);
} finally {
this.disposeResource(false);
}
} else {
Operators.onOperatorError(e, this.actual.currentContext());
}
}
public MonoSink<T> onRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "onRequest");
if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {
throw new IllegalStateException("A consumer has already been assigned to consume requests");
} else {
return this;
}
}
public CoreSubscriber<? super T> actual() {
return this.actual;
}
public MonoSink<T> onCancel(Disposable d) {
Objects.requireNonNull(d, "onCancel");
SinkDisposable sd = new SinkDisposable((Disposable)null, d);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.onCancel == null) {
current.onCancel = d;
} else {
d.dispose();
}
}
}
return this;
}
public MonoSink<T> onDispose(Disposable d) {
Objects.requireNonNull(d, "onDispose");
SinkDisposable sd = new SinkDisposable(d, (Disposable)null);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.disposable == null) {
current.disposable = d;
} else {
d.dispose();
}
}
}
return this;
}
public void request(long n) {
if (Operators.validate(n)) {
LongConsumer consumer = this.requestConsumer;
if (consumer != null) {
consumer.accept(n);
}
int s;
do {
s = this.state;
if (s == 2 || s == 3) {
return;
}
if (s == 1) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(this.value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
return;
}
} while(!STATE.compareAndSet(this, s, 2));
}
}
public void cancel() {
if (STATE.getAndSet(this, 3) != 3) {
this.value = null;
this.disposeResource(true);
}
}
void disposeResource(boolean isCancel) {
Disposable d = this.disposable;
if (d != OperatorDisposables.DISPOSED) {
d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);
if (d != null && d != OperatorDisposables.DISPOSED) {
if (isCancel && d instanceof SinkDisposable) {
((SinkDisposable)d).cancel();
}
d.dispose();
}
}
}
}
}

上面的代码比较多,我们主要关注下面两个函数:

MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
}
public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter);
try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
}
}

通过上面的代码可以看出,一个是构造器,参数是Consumer,里面进行操作保存了Consumer对象,然后在subscribe方法里面有一句代码是this.callback.accept(emitter),就是在这里进行了接口的回调,回调Consumer的accept方法,这个方法是在调用Mono.create()方法的时候实现了。然后在细看subscribe方法,这里面有一个actual.onSubscribe方法,通过方法名可以知道,这里是订阅了消息。webflux是基于reactor模型,基于事件消息和异步,这里也体现了一个异步。

Mono和Flux的其他用法可以参照上面的源码流程自己看看,就不细说了。

版权声明
本文为[疯狂创客圈]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/crazymakercircle/p/14311570.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云