Spring cloud + Webflux

Crazy maker circle 2021-01-22 09:41:19
spring cloud webflux


Preface

webmvc and webflux As spring framework Two important modules of , Represents two IO Model , Blocking and non blocking .

webmvc Is based on servlet The blocking model of ( Commonly referred to as oio), When a request arrives at the server, a separate thread will be allocated to process the request , If the request contains IO operation , The thread is in IO The operation is blocked and waiting until it is finished , So the thread is waiting IO Time is wasted at the end of the operation .

webflux Is based on reactor A non blocking model based on fuzzy logic ( Commonly referred to as nio), Again , After the request arrives at the server, a thread will be allocated to process the request , If the request contains IO operation , The thread is in IO Before the end of the operation is no longer in the blocking wait state , It's about dealing with other things , wait until IO After the operation is over , Notice again ( Thanks to the mechanism of the system ) The thread continues to process the request .

In this way, threads can effectively use IO Time spent in operation .

WebFlux Add, delete, change and check the complete actual combat demo

Dao layer ( also called repository layer )

entity( also called PO object )

newly build User object , The code is as follows :


package com.crazymaker.springcloud.reactive.user.info.entity;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
@Entity
@Table(name = "t_user")
public final class UserEntity extends User
{
@Id
@Column(name = "id")
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Override
public long getUserId()
{
return super.getUserId();
}
@Column(name = "name")
public String getName()
{
return super.getName();
}
}

Dao Implementation class

@Repository Used to label data access components , namely DAO Components . The implementation code uses the name repository Of Map Object as memory data store , And specific business logic is realized for the object .JpaUserRepositoryImpl Responsible for PO Persistence layer ( Data manipulation ) Related packaging organization , Finish adding 、 Inquire about 、 Delete and other operations .


package com.crazymaker.springcloud.reactive.user.info.dao.impl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List;
@Repository
@Transactional
public class JpaUserRepositoryImpl
{
@PersistenceContext
private EntityManager entityManager;
public Long insert(final User user)
{
entityManager.persist(user);
return user.getUserId();
}
public void delete(final Long userId)
{
Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
query.executeUpdate();
}
@SuppressWarnings("unchecked")
public List<User> selectAll()
{
return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();
}
@SuppressWarnings("unchecked")
public User selectOne(final Long userId)
{
Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");
query.setParameter(1, userId);
return (User) query.getSingleResult();
}
}

Service Service layer


package com.crazymaker.springcloud.reactive.user.info.service.impl;
import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{
@Resource
private JpaUserRepositoryImpl userRepository;
@Transactional
// Add user
public User addUser(User dto)
{
User userEntity = new UserEntity();
userEntity.setUserId(dto.getUserId());
userEntity.setName(dto.getName());
userRepository.insert(userEntity);
BeanUtil.copyProperties(userEntity,dto);
return dto;
}
@Transactional
// Delete user
public User delUser(User dto)
{
userRepository.delete(dto.getUserId());
return dto;
}
// Query all users
public List<User> selectAllUser()
{
log.info(" Method selectAllUser Is called the ");
return userRepository.selectAll();
}
// Query a user
public User selectOne(final Long userId)
{
log.info(" Method selectOne Is called the ");
return userRepository.selectOne(userId);
}
}

Controller Control layer

Spring Boot WebFlux You can also use annotation mode to API Interface development .

package com.crazymaker.springcloud.reactive.user.info.controller;
import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
/**
* Mono and Flux For two scenarios , namely :
* Mono: Implementation publisher , And back to 0 or 1 Elements , Single object .
* Flux: Implementation publisher , And back to N Elements , namely List List objects .
* Someone will ask. , Why don't you return the object directly , Such as return City/Long/List.
* as a result of , Use it directly Flux and Mono Yes no block writing , Equivalent to callback mode .
* Using function formula can reduce callback , So you don't see the relevant interfaces . This is exactly WebFlux The benefits of : Set up non blocking + asynchronous
*/
@Slf4j
@Api(value = " User information 、 Based on learning DEMO", tags = {" User information DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{
@ApiOperation(value = " Echo test ", notes = " Tips for interface users ", httpMethod = "GET")
@RequestMapping(value = "/hello")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = " name ", required = true)})
public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name)
{
log.info(" Method hello Is called the ");
return Mono.just(RestOut.succeed("hello " + name));
}
@Resource
JpaEntityServiceImpl jpaEntityService;
@PostMapping("/add/v1")
@ApiOperation(value = " Insert user " )
@ApiImplicitParams({
// @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
// @ApiImplicitParam(paramType = "body", dataType=" user ", name = "dto", required = true)
@ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto", required = true),
})
// @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User", required = true)
public Mono<User> userAdd(@RequestBody User dto)
{
// Imperative writing
// jpaEntityService.delUser(dto);
// Responsive writing
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}
@PostMapping("/del/v1")
@ApiOperation(value = " Responsive deletion ")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto", required = true),
})
public Mono<User> userDel(@RequestBody User dto)
{
// Imperative writing
// jpaEntityService.delUser(dto);
// Responsive writing
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));
}
@PostMapping("/list/v1")
@ApiOperation(value = " Query the user ")
public Flux<User> listAllUser()
{
log.info(" Method listAllUser Is called the ");
// Imperative writing Change to responsive The following sentence , It needs to be executed in the stream
// List<User> list = jpaEntityService.selectAllUser();
// Responsive writing
Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());
return userFlux;
}
@PostMapping("/detail/v1")
@ApiOperation(value = " Responsive view ")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto", required = true),
})
public Mono<User> getUser(@RequestBody User dto)
{
log.info(" Method getUser Is called the ");
// Tectonic flow
Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));
return userMono;
}
@PostMapping("/detail/v2")
@ApiOperation(value = " Imperative view ")
@ApiImplicitParams({
@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto", required = true),
}) public RestOut<User> getUserV2(@RequestBody User dto)
{
log.info(" Method getUserV2 Is called the ");
User user = jpaEntityService.selectOne(dto.getUserId());
return RestOut.success(user);
}
}

As you can see from the return value ,Mono and Flux For two scenarios , namely :

  • Mono: Implementation publisher , And back to 0 or 1 Elements , Single object
  • Flux: Implementation publisher , And back to N Elements , namely List List objects

Someone will ask. , Why don't you return the object directly , Such as return City/Long/List. as a result of , Use it directly Flux and Mono Yes no block writing , Equivalent to callback mode . Using function formula can reduce callback , So you don't see the relevant interfaces . This is exactly WebFlux The benefits of : Set up non blocking + asynchronous .

Mono

Mono What is it? ? The official description is as follows :A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Mono It's the response flow Publisher Have a foundation rx The operator . You can successfully publish elements or errors . As shown in the figure :

img

file

Mono Common methods are :

  • Mono.create(): Use MonoSink To create Mono
  • Mono.justOrEmpty(): From a Optional Object or null Object Mono.
  • Mono.error(): Create an... That contains only error messages Mono
  • Mono.never(): Create a... That does not contain any message notifications Mono
  • Mono.delay(): After the specified delay time , Create a Mono, Generate numbers 0 As the only value

Flux

Flux What is it? ? The official description is as follows :A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

Flux It's the response flow Publisher Have a foundation rx The operator . Can be published successfully 0 To N Elements or errors .Flux It's actually Mono A supplement to . As shown in the figure :

img

file

So pay attention to : If you know Publisher yes 0 or 1 individual , Then use Mono.

Flux Most notably fromIterable Method . fromIterable(Iterable<? extends T> it) You can publish Iterable Element of type . Of course ,Flux It also includes basic operations :map、merge、concat、flatMap、take, I will not introduce it here .

Using configuration mode WebFlux Interface development

1 You can write a processor class Handler Instead of Controller , Service 、dao The layers remain the same .

2 Configure the route of the request

Processor class Handler

Processor class Handler You need to parse the parameters from the request , And encapsulate the response , The code is as follows :

package com.crazymaker.springcloud.reactive.user.info.config.handler;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Slf4j
@Component
public class UserReactiveHandler
{
@Resource
private JpaEntityServiceImpl jpaEntityService;
/**
* Get all users
*
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request)
{
log.info(" Method getAllUser Is called the ");
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);
}
/**
* Create user
*
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request)
{
// 2.0.0 It can work , however 2.0.1 The following mode will report an exception
Mono<User> user = request.bodyToMono(User.class);
/**Mono Use responsive , It's always a stream , It's a publisher , The subscription method of the publisher cannot be called at any time
That is, you can't consume it , The final consumption is still handed over to us Springboot To consume it , It can't be called at any time
user.subscribe();
Cannot call block
Put the exception in a unified place to handle
*/
return user.flatMap(dto ->
{
// Check code needs to be put here
if (StringUtils.isBlank(dto.getName()))
{
throw new BusinessException(" The username cannot be empty ");
}
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);
});
}
/**
* according to id Delete user
*
* @param request
* @return
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request)
{
String id = request.pathVariable("id");
// Check code needs to be put here
if (StringUtils.isBlank(id))
{
throw new BusinessException("id Can't be empty ");
}
User dto = new User();
dto.setUserId(Long.parseLong(id));
return ok().contentType(APPLICATION_JSON_UTF8)
.body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);
}
}

Routing configuration

package com.crazymaker.springcloud.reactive.user.info.config;
import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
@Configuration
public class RoutersConfig
{
@Bean
RouterFunction<ServerResponse> routes(UserReactiveHandler handler)
{
// The following is equivalent to @RequestMapping
// Get all users
return RouterFunctions.route(GET("/user"), handler::getAllUser)
// Create user
.andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
// Delete user
.andRoute(DELETE("/user/{id}"), handler::deleteUserById);
}
@Value("${server.servlet.context-path}")
private String contextPath;
// Processing context path , No context path , This function can ignore
@Bean
public WebFilter contextPathWebFilter()
{
return (exchange, chain) ->
{
ServerHttpRequest request = exchange.getRequest();
String requestPath = request.getURI().getPath();
if (requestPath.startsWith(contextPath))
{
return chain.filter(
exchange.mutate()
.request(request.mutate().contextPath(contextPath).build())
.build());
}
return chain.filter(exchange);
};
}
}

Integrate Swagger

This article mainly shows how to use support WebFlux Of Swagger

maven rely on

 <dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-spring-webflux</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>${swagger.version}</version>
</dependency>
  • swagger.version At present, it is 3.0.0,Spring 5 Introduced WebFlux, And the current version of SpringFox Swagger2(2.9.2) Not yet WebFlux, Have to use 3.0.0 To support

swagger To configure

package com.crazymaker.springcloud.reactive.user.info.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;
@Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{
@Bean
public Docket createRestApi()
{
// return new Docket(DocumentationType.OAS_30)
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.pathMapping(servletContextPath) // Be careful webflux No, context-path To configure , If you don't add this sentence , The path does not have a prefix when testing the interface
.select()
.apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller"))
.paths(PathSelectors.any())
.build();
}
@Value("${server.servlet.context-path}")
private String servletContextPath;
// structure api Document details function
private ApiInfo apiInfo()
{
return new ApiInfoBuilder()
// The page title
.title(" Crazy maker circle springcloud + Nginx High concurrency core programming ")
// describe
.description("Zuul+Swagger2 structure RESTful APIs")
// Clause address
.termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/")
.contact(new Contact(" Crazy maker circle ", "https://www.cnblogs.com/crazymakercircle/", ""))
.version("1.0")
.build();
}
/**
* rewrite PathProvider , solve context-path Repetitive questions
* @return
*/
@Order(Ordered.HIGHEST_PRECEDENCE)
@Bean
public PathProvider pathProvider() {
return new DefaultPathProvider() {
@Override
public String getOperationPath(String operationPath) {
operationPath = operationPath.replaceFirst(servletContextPath, "/");
UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");
return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());
}
@Override
public String getResourceListingPath(String groupName, String apiDeclaration) {
apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);
return apiDeclaration;
}
};
}
}

test

Configuration mode WebFlux Rest The interface test

Configuration mode WebFlux Rest The interface can only use PostMan test , Examples are as follows :

 Insert picture description here

Be careful , Cannot take context path :

http://192.168.68.1:7705/uaa-react-provider/user

Annotation mode WebFlux Rest The interface test

swagger Add interface

 Insert picture description here

CRUD Other interfaces , skip

Configuration encyclopedia

Static resource allocation

@Configuration
@EnableWebFlux // Using annotations @EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer { // Inherit WebFluxConfigurer
// Configuring static resources
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/static/**")
.addResourceLocations("classpath:/static/");
registry.addResourceHandler("/file/**")
.addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);
registry.addResourceHandler("/swagger-ui.html**")
.addResourceLocations("classpath:/META-INF/resources/");
registry.addResourceHandler("/webjars/**")
.addResourceLocations("classpath:/META-INF/resources/webjars/");
}
// Configure interceptors
// Configure codec
...
}

WebFluxSecurity To configure

@Configuration
@EnableWebFluxSecurity // Using annotations @EnableWebFluxSecurity
public class WebFluxSecurityConfig implements
WebFilter, // Interceptor
ServerLogoutSuccessHandler, // Login successful callback
ServerAuthenticationEntryPoint, // Verify entrance
ServerAuthenticationFailureHandler, // Verify successful callback
ServerAuthenticationSuccessHandler { // Verification failed callback
// How to implement the interface
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
// To configure webflux Of context-path
ServerHttpRequest request = exchange.getRequest();
if (request.getURI().getPath().startsWith(contextPath)) {
exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();
}
// Transfer the query parameters to FormData in , Or verify the filter (ServerFormLoginAuthenticationConverter) Parameter not accepted
if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {
ServerWebExchange finalExchange = exchange;
ServerWebExchange realExchange = new Decorator(exchange) {
@Override
public Mono<MultiValueMap<String, String>> getFormData() {
return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {
@Override
public MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {
if (stringStringMultiValueMap.size() == 0) {
return finalExchange.getRequest().getQueryParams();
} else {
return stringStringMultiValueMap;
}
}
});
}
};
return chain.filter(realExchange);
}
return chain.filter(exchange);
}
@Override
public Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return sendJson(webFilterExchange.getExchange(), new Response<>(" Log out successfully "));
}
@Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {
return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), " Not verified "));
}
@Override
public Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {
return sendJson(webFilterExchange.getExchange(), new Response<>(1, " Validation failed "));
}
@Override
public Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {
return webFilterExchange.getChain().filter(
webFilterExchange.getExchange().mutate()
.request(t -> t.method(HttpMethod.POST).path("/user/login")) // Forward to custom controller
.build()
);
}
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST)
.csrf().disable()
.authorizeExchange()
.pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs") //swagger
.permitAll()
.and()
.authorizeExchange()
.pathMatchers("/static/**", "/file/**") // Static resources
.permitAll()
.and()
.authorizeExchange()
.anyExchange()
.authenticated()
.and()
.logout() // Log out
.logoutUrl("/user/logout")
.logoutSuccessHandler(this)
.and()
.exceptionHandling() // Callback not verified
.authenticationEntryPoint(this)
.and()
.formLogin()
.loginPage("/user/login")
.authenticationFailureHandler(this) // Verification failed callback
.authenticationSuccessHandler(this) // Verify successful callback
.and()
.httpBasic()
.authenticationEntryPoint(this); //basic verification , It is generally used for mobile end
return http.build();
}
}

WebSession To configure

@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) // Using annotations @EnableRedisWebSession ,maxInactiveIntervalInSeconds Set data expiration time ,spring.session.timeout No use
public class RedisWebSessionConfig { // Considering distributed systems , In general use redis Storage session
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
return new LettuceConnectionFactory();
}
}

File upload configuration

// Parameter upload
// Defining parameters bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{
@ApiModelProperty(value = " General parameter ", required = false, example = "")
private String query;
@ApiModelProperty(value = " File parameters ", required = false, example = "")
private FilePart image; // emphasize ,webflux Use in FilePart As the type of receiving file
}
// Defining interfaces
@ApiOperation(" An interface ")
@PostMapping("/path")
// You need to use @ApiImplicitParam Display configuration 【 File parameters 】 Can we make swagger The interface displays the upload file button
@ApiImplicitParams({
@ApiImplicitParam(
paramType = "form", // Form parameters
dataType = "__file", // The latest version uses __file Said file , It used to be file
name = "image", // and QueryBean Inside 【 File parameters image】 The same name
value = " file ") // notes
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) {
}

WebFlux Execute the process

userAdd The method code is as follows :

 public Mono<User> userAdd(@RequestBody User dto)
{
// Imperative writing
// jpaEntityService.delUser(dto);
// Responsive writing
return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));
}

Since only one data is returned, we use Mono As return data , Use Mono Class static create Method creation Mono object , The code is as follows :

public abstract class Mono<T> implements Publisher<T> {
static final BiPredicate EQUALS_BIPREDICATE = Object::equals;
public Mono() {
}
public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}
...
}

​ You can go to create Method to receive a parameter , Parameter is Consumer object , adopt callback It can be seen that , What we use here is callback Callback , Let's see below. Consumer Definition of interface :


@FunctionalInterface
public interface Consumer<T> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
*/
void accept(T t);
/**
* Returns a composed {@code Consumer} that performs, in sequence, this
* operation followed by the {@code after} operation. If performing either
* operation throws an exception, it is relayed to the caller of the
* composed operation. If performing this operation throws an exception,
* the {@code after} operation will not be performed.
*
* @param after the operation to perform after this operation
* @return a composed {@code Consumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default Consumer<T> andThen(Consumer<? super T> after) {
Objects.requireNonNull(after);
return (T t) -> { accept(t); after.accept(t); };
}
}

You can see from the above code that , There are two ways , One is the default method andThen, One more accept Method ,

Mono.create() Method parameters need an implementation class , Realization Consumer Interface ;Mono.create The instance object that the parameter of the method points to , It's about achieving this accept Method .

In the example , Below lambda expression , Namely accept Method implementation , The type of argument is Consumer<MonoSink > , accept The realization of is as follows :

cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))

Come on, come on , Repeat that ,create Method implementation :

 public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
return onAssembly(new MonoCreate(callback));
}

​ Called inside the method onAssembly Method , Parameter is MonoCreate object , Then let's see MonoCreate class , The code is as follows :

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
final class MonoCreate<T> extends Mono<T> {
final Consumer<MonoSink<T>> callback;
MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
}
public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter);
try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
}
}
static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {
final CoreSubscriber<? super T> actual;
volatile Disposable disposable;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");
volatile int state;
static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");
volatile LongConsumer requestConsumer;
static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");
T value;
static final int NO_REQUEST_HAS_VALUE = 1;
static final int HAS_REQUEST_NO_VALUE = 2;
static final int HAS_REQUEST_HAS_VALUE = 3;
DefaultMonoSink(CoreSubscriber<? super T> actual) {
this.actual = actual;
}
public Context currentContext() {
return this.actual.currentContext();
}
@Nullable
public Object scanUnsafe(Attr key) {
if (key != Attr.TERMINATED) {
return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);
} else {
return this.state == 3 || this.state == 1;
}
}
public void success() {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
}
public void success(@Nullable T value) {
if (value == null) {
this.success();
} else {
int s;
do {
s = this.state;
if (s == 3 || s == 1) {
Operators.onNextDropped(value, this.actual.currentContext());
return;
}
if (s == 2) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
return;
}
this.value = value;
} while(!STATE.compareAndSet(this, s, 1));
}
}
public void error(Throwable e) {
if (STATE.getAndSet(this, 3) != 3) {
try {
this.actual.onError(e);
} finally {
this.disposeResource(false);
}
} else {
Operators.onOperatorError(e, this.actual.currentContext());
}
}
public MonoSink<T> onRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "onRequest");
if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {
throw new IllegalStateException("A consumer has already been assigned to consume requests");
} else {
return this;
}
}
public CoreSubscriber<? super T> actual() {
return this.actual;
}
public MonoSink<T> onCancel(Disposable d) {
Objects.requireNonNull(d, "onCancel");
SinkDisposable sd = new SinkDisposable((Disposable)null, d);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.onCancel == null) {
current.onCancel = d;
} else {
d.dispose();
}
}
}
return this;
}
public MonoSink<T> onDispose(Disposable d) {
Objects.requireNonNull(d, "onDispose");
SinkDisposable sd = new SinkDisposable(d, (Disposable)null);
if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {
Disposable c = this.disposable;
if (c instanceof SinkDisposable) {
SinkDisposable current = (SinkDisposable)c;
if (current.disposable == null) {
current.disposable = d;
} else {
d.dispose();
}
}
}
return this;
}
public void request(long n) {
if (Operators.validate(n)) {
LongConsumer consumer = this.requestConsumer;
if (consumer != null) {
consumer.accept(n);
}
int s;
do {
s = this.state;
if (s == 2 || s == 3) {
return;
}
if (s == 1) {
if (STATE.compareAndSet(this, s, 3)) {
try {
this.actual.onNext(this.value);
this.actual.onComplete();
} finally {
this.disposeResource(false);
}
}
return;
}
} while(!STATE.compareAndSet(this, s, 2));
}
}
public void cancel() {
if (STATE.getAndSet(this, 3) != 3) {
this.value = null;
this.disposeResource(true);
}
}
void disposeResource(boolean isCancel) {
Disposable d = this.disposable;
if (d != OperatorDisposables.DISPOSED) {
d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);
if (d != null && d != OperatorDisposables.DISPOSED) {
if (isCancel && d instanceof SinkDisposable) {
((SinkDisposable)d).cancel();
}
d.dispose();
}
}
}
}
}

There's a lot of code above , We focus on the following two functions :

MonoCreate(Consumer<MonoSink<T>> callback) {
this.callback = callback;
}
public void subscribe(CoreSubscriber<? super T> actual) {
MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);
actual.onSubscribe(emitter);
try {
this.callback.accept(emitter);
} catch (Throwable var4) {
emitter.error(Operators.onOperatorError(var4, actual.currentContext()));
}
}

You can see from the above code that , One is the constructor , Parameter is Consumer, It's operated and saved Consumer object , And then in subscribe There is a code in the method that is this.callback.accept(emitter), It is here that the interface callback is carried out , Callback Consumer Of accept Method , This method is calling Mono.create() Method . And I'm looking at it subscribe Method , There's one in here actual.onSubscribe Method , You can tell by the method name , Here's a subscription message .webflux Is based on reactor Model , Based on event messages and asynchronous , Here is also an asynchronous .

Mono and Flux You can refer to the above source code process to see for yourself , I won't go into details .

版权声明
本文为[Crazy maker circle]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210122094025170H.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云