python3 web框架,webflux + springboot 整合(史上最全)

 2023-11-19 阅读 28 评论 0

摘要:前言 webmvc和webflux作為spring framework的兩個重要模塊,代表了兩個IO模型,阻塞式和非阻塞式的。 webmvc是基于servlet的阻塞式模型(一般稱為oio),一個請求到達服務器后會單獨分配一個線程去處理請求,如果請求包含IO操作,

前言

webmvc和webflux作為spring framework的兩個重要模塊,代表了兩個IO模型,阻塞式和非阻塞式的。

webmvc是基于servlet的阻塞式模型(一般稱為oio),一個請求到達服務器后會單獨分配一個線程去處理請求,如果請求包含IO操作,線程在IO操作結束之前一直處于阻塞等待狀態,這樣線程在等待IO操作結束的時間就浪費了。

webflux是基于reactor的非阻塞模型(一般稱為nio),同樣,請求到達服務器后也會分配一個線程去處理請求,如果請求包含IO操作,線程在IO操作結束之前不再是處于阻塞等待狀態,而是去處理其他事情,等到IO操作結束之后,再通知(得益于系統的機制)線程繼續處理請求。

這樣線程就有效地利用了IO操作所消耗的時間。

WebFlux 增刪改查完整實戰 demo

Dao層 (又稱 repository 層)

entity(又稱 PO對象)

新建User 對象 ,代碼如下:

package com.crazymaker.springcloud.reactive.user.info.entity;import com.crazymaker.springcloud.reactive.user.info.dto.User;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;@Entity
@Table(name = "t_user")
public final class UserEntity extends User
{@Id@Column(name = "id")@GeneratedValue(strategy = GenerationType.IDENTITY)@Overridepublic long getUserId(){return super.getUserId();}@Column(name = "name")public String getName(){return super.getName();}
}

Dao 實現類

python3 web框架。@Repository 用于標注數據訪問組件,即 DAO 組件。實現代碼中使用名為 repository 的 Map 對象作為內存數據存儲,并對對象具體實現了具體業務邏輯。JpaUserRepositoryImpl 負責將 PO 持久層(數據操作)相關的封裝組織,完成新增、查詢、刪除等操作。

package com.crazymaker.springcloud.reactive.user.info.dao.impl;import com.crazymaker.springcloud.reactive.user.info.dto.User;
import org.springframework.stereotype.Repository;import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.Query;
import javax.transaction.Transactional;
import java.util.List;@Repository
@Transactional
public class JpaUserRepositoryImpl
{@PersistenceContextprivate EntityManager entityManager;public Long insert(final User user){entityManager.persist(user);return user.getUserId();}public void delete(final Long userId){Query query = entityManager.createQuery("DELETE FROM UserEntity o WHERE o.userId = ?1");query.setParameter(1, userId);query.executeUpdate();}@SuppressWarnings("unchecked")public List<User> selectAll(){return (List<User>) entityManager.createQuery("SELECT o FROM UserEntity o").getResultList();}@SuppressWarnings("unchecked")public User selectOne(final Long userId){Query query = entityManager.createQuery("SELECT o FROM UserEntity o WHERE o.userId = ?1");query.setParameter(1, userId);return (User) query.getSingleResult();}
}

Service服務層


package com.crazymaker.springcloud.reactive.user.info.service.impl;import com.crazymaker.springcloud.common.util.BeanUtil;
import com.crazymaker.springcloud.reactive.user.info.dao.impl.JpaUserRepositoryImpl;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.entity.UserEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.Resource;
import java.util.List;@Slf4j
@Service
@Transactional
public class JpaEntityServiceImpl
{@Resourceprivate JpaUserRepositoryImpl userRepository;@Transactional//增加用戶public User addUser(User dto){User userEntity = new UserEntity();userEntity.setUserId(dto.getUserId());userEntity.setName(dto.getName());userRepository.insert(userEntity);BeanUtil.copyProperties(userEntity,dto);return dto;}@Transactional//刪除用戶public User delUser(User dto){userRepository.delete(dto.getUserId());return dto;}//查詢全部用戶public List<User> selectAllUser(){log.info("方法 selectAllUser 被調用了");return userRepository.selectAll();}//查詢一個用戶public User selectOne(final Long userId){log.info("方法 selectOne 被調用了");return userRepository.selectOne(userId);}}

Controller控制層

Spring Boot WebFlux也可以使用注解模式來進行API接口開發。

package com.crazymaker.springcloud.reactive.user.info.controller;import com.crazymaker.springcloud.common.result.RestOut;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import javax.annotation.Resource;/*** Mono 和 Flux 適用于兩個場景,即:* Mono:實現發布者,并返回 0 或 1 個元素,即單對象。* Flux:實現發布者,并返回 N 個元素,即 List 列表對象。* 有人會問,這為啥不直接返回對象,比如返回 City/Long/List。* 原因是,直接使用 Flux 和 Mono 是非阻塞寫法,相當于回調方式。* 利用函數式可以減少了回調,因此會看不到相關接口。這恰恰是 WebFlux 的好處:集合了非阻塞 + 異步*/
@Slf4j
@Api(value = "用戶信息、基礎學習DEMO", tags = {"用戶信息DEMO"})
@RestController
@RequestMapping("/api/user")
public class UserReactiveController
{@ApiOperation(value = "回顯測試", notes = "提示接口使用者注意事項", httpMethod = "GET")@RequestMapping(value = "/hello")@ApiImplicitParams({@ApiImplicitParam(paramType = "query", dataType="string",dataTypeClass = String.class, name = "name",value = "名稱", required = true)})public Mono<RestOut<String>> hello(@RequestParam(name = "name") String name){log.info("方法 hello 被調用了");return  Mono.just(RestOut.succeed("hello " + name));}@ResourceJpaEntityServiceImpl jpaEntityService;@PostMapping("/add/v1")@ApiOperation(value = "插入用戶" )@ApiImplicitParams({
//                @ApiImplicitParam(paramType = "body", dataType="java.lang.Long", name = "userId", required = false),
//                @ApiImplicitParam(paramType = "body", dataType="用戶", name = "dto", required = true)@ApiImplicitParam(paramType = "body",dataTypeClass = User.class, dataType="User", name = "dto",  required = true),})
//    @ApiImplicitParam(paramType = "body", dataType="com.crazymaker.springcloud.reactive.user.info.dto.User",  required = true)public Mono<User> userAdd(@RequestBody User dto){//命令式寫法
//        jpaEntityService.delUser(dto);//響應式寫法return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));}@PostMapping("/del/v1")@ApiOperation(value = "響應式的刪除")@ApiImplicitParams({@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class,name = "dto",  required = true),})public Mono<User> userDel(@RequestBody User dto){//命令式寫法//        jpaEntityService.delUser(dto);//響應式寫法return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto)));}@PostMapping("/list/v1")@ApiOperation(value = "查詢用戶")public Flux<User> listAllUser(){log.info("方法 listAllUser 被調用了");//命令式寫法 改為響應式 以下語句,需要在流中執行
//        List<User> list = jpaEntityService.selectAllUser();//響應式寫法Flux<User> userFlux = Flux.fromIterable(jpaEntityService.selectAllUser());return userFlux;}@PostMapping("/detail/v1")@ApiOperation(value = "響應式的查看")@ApiImplicitParams({@ApiImplicitParam(paramType = "body", dataTypeClass = User.class,dataType="User", name = "dto",  required = true),})public Mono<User> getUser(@RequestBody User dto){log.info("方法 getUser 被調用了");//構造流Mono<User> userMono = Mono.justOrEmpty(jpaEntityService.selectOne(dto.getUserId()));return userMono;}@PostMapping("/detail/v2")@ApiOperation(value = "命令式的查看")@ApiImplicitParams({@ApiImplicitParam(paramType = "body", dataType="User",dataTypeClass = User.class, name = "dto",  required = true),})        public RestOut<User> getUserV2(@RequestBody User dto){log.info("方法 getUserV2 被調用了");User user = jpaEntityService.selectOne(dto.getUserId());return RestOut.success(user);}}

從返回值可以看出,Mono 和 Flux 適用于兩個場景,即:

  • Mono:實現發布者,并返回 0 或 1 個元素,即單對象
  • Flux:實現發布者,并返回 N 個元素,即 List 列表對象

有人會問,這為啥不直接返回對象,比如返回 City/Long/List。原因是,直接使用 Flux 和 Mono 是非阻塞寫法,相當于回調方式。利用函數式可以減少了回調,因此會看不到相關接口。這恰恰是 WebFlux 的好處:集合了非阻塞 + 異步。

Mono

Mono 是什么? 官方描述如下:A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.

Mono 是響應流 Publisher 具有基礎 rx 操作符。可以成功發布元素或者錯誤。如圖所示:

ruby web框架、img

file

Mono 常用的方法有:

  • Mono.create():使用 MonoSink 來創建 Mono
  • Mono.justOrEmpty():從一個 Optional 對象或 null 對象中創建 Mono。
  • Mono.error():創建一個只包含錯誤消息的 Mono
  • Mono.never():創建一個不包含任何消息通知的 Mono
  • Mono.delay():在指定的延遲時間之后,創建一個 Mono,產生數字 0 作為唯一值

Flux

Flux 是什么? 官方描述如下:A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).

Flux 是響應流 Publisher 具有基礎 rx 操作符。可以成功發布 0 到 N 個元素或者錯誤。Flux 其實是 Mono 的一個補充。如圖所示:

img

bootstrap-vue、file

所以要注意:如果知道 Publisher 是 0 或 1 個,則用 Mono。

Flux 最值得一提的是 fromIterable 方法。 fromIterable(Iterable<? extends T> it) 可以發布 Iterable 類型的元素。當然,Flux 也包含了基礎的操作:map、merge、concat、flatMap、take,這里就不展開介紹了。

使用配置模式進行WebFlux 接口開發

1 可以編寫一個處理器類 Handler代替 Controller , Service 、dao層保持不變。

2 配置請求的路由

處理器類 Handler

處理器類 Handler需要從請求解析參數,并且封裝響應,代碼如下:

package com.crazymaker.springcloud.reactive.user.info.config.handler;import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.reactive.user.info.dto.User;
import com.crazymaker.springcloud.reactive.user.info.service.impl.JpaEntityServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import javax.annotation.Resource;import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;@Slf4j
@Component
public class UserReactiveHandler
{@Resourceprivate JpaEntityServiceImpl jpaEntityService;/*** 得到所有用戶** @param request* @return*/public Mono<ServerResponse> getAllUser(ServerRequest request){log.info("方法 getAllUser 被調用了");return ok().contentType(APPLICATION_JSON_UTF8).body(Flux.fromIterable(jpaEntityService.selectAllUser()), User.class);}/*** 創建用戶** @param request* @return*/public Mono<ServerResponse> createUser(ServerRequest request){// 2.0.0 是可以工作, 但是2.0.1 下面這個模式是會報異常Mono<User> user = request.bodyToMono(User.class);/**Mono 使用響應式的,時候都是一個流,是一個發布者,任何時候都不能調用發布者的訂閱方法也就是不能消費它, 最終的消費還是交給我們的Springboot來對它進行消費,任何時候不能調用它的user.subscribe();不能調用block把異常放在統一的地方來處理*/return user.flatMap(dto ->{// 校驗代碼需要放在這里if (StringUtils.isBlank(dto.getName())){throw new BusinessException("用戶名不能為空");}return ok().contentType(APPLICATION_JSON_UTF8).body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))), User.class);});}/*** 根據id刪除用戶** @param request* @return*/public Mono<ServerResponse> deleteUserById(ServerRequest request){String id = request.pathVariable("id");// 校驗代碼需要放在這里if (StringUtils.isBlank(id)){throw new BusinessException("id不能為空");}User dto = new User();dto.setUserId(Long.parseLong(id));return ok().contentType(APPLICATION_JSON_UTF8).body(Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.delUser(dto))), User.class);}}

路由配置

package com.crazymaker.springcloud.reactive.user.info.config;import com.crazymaker.springcloud.reactive.user.info.config.handler.UserReactiveHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.WebFilter;import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;@Configuration
public class RoutersConfig
{@BeanRouterFunction<ServerResponse> routes(UserReactiveHandler handler){// 下面的相當于類里面的 @RequestMapping// 得到所有用戶return RouterFunctions.route(GET("/user"), handler::getAllUser)// 創建用戶.andRoute(POST("/user").and(accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)// 刪除用戶.andRoute(DELETE("/user/{id}"), handler::deleteUserById);}@Value("${server.servlet.context-path}")private String contextPath;//處理上下文路徑,沒有上下文路徑,此函數可以忽略@Beanpublic WebFilter contextPathWebFilter(){return (exchange, chain) ->{ServerHttpRequest request = exchange.getRequest();String requestPath = request.getURI().getPath();if (requestPath.startsWith(contextPath)){return chain.filter(exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build());}return chain.filter(exchange);};}
}

集成Swagger

spring web,本文主要展示一下如何使用支持WebFlux的Swagger

maven依賴

        <dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>${swagger.version}</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-spring-webflux</artifactId><version>${swagger.version}</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>${swagger.version}</version></dependency>
  • swagger.version目前是3.0.0,Spring 5引入了WebFlux,而當前版本的SpringFox Swagger2(2.9.2)還不支持WebFlux,得使用3.0.0才支持

swagger 配置

package com.crazymaker.springcloud.reactive.user.info.config;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.util.UriComponentsBuilder;
import springfox.documentation.PathProvider;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.paths.DefaultPathProvider;
import springfox.documentation.spring.web.paths.Paths;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebFlux;@Configuration
@EnableSwagger2WebFlux
public class SwaggerConfig
{@Beanpublic Docket createRestApi(){
//        return new Docket(DocumentationType.OAS_30)return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).pathMapping(servletContextPath)  //注意webflux沒有context-path配置,如果不加這句話的話,接口測試時路徑沒有前綴.select().apis(RequestHandlerSelectors.basePackage("com.crazymaker.springcloud.reactive.user.info.controller")).paths(PathSelectors.any()).build();}@Value("${server.servlet.context-path}")private String servletContextPath;//構建 api文檔的詳細信息函數private ApiInfo apiInfo(){return new ApiInfoBuilder()//頁面標題.title("瘋狂創客圈 springcloud + Nginx 高并發核心編程")//描述.description("Zuul+Swagger2  構建  RESTful APIs")//條款地址.termsOfServiceUrl("https://www.cnblogs.com/crazymakercircle/").contact(new Contact("瘋狂創客圈", "https://www.cnblogs.com/crazymakercircle/", "")).version("1.0").build();}/*** 重寫 PathProvider ,解決 context-path 重復問題* @return*/@Order(Ordered.HIGHEST_PRECEDENCE)@Beanpublic PathProvider pathProvider() {return new DefaultPathProvider() {@Overridepublic String getOperationPath(String operationPath) {operationPath = operationPath.replaceFirst(servletContextPath, "/");UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromPath("/");return Paths.removeAdjacentForwardSlashes(uriComponentsBuilder.path(operationPath).build().toString());}@Overridepublic String getResourceListingPath(String groupName, String apiDeclaration) {apiDeclaration = super.getResourceListingPath(groupName, apiDeclaration);return apiDeclaration;}};}
}

測試

配置模式的 WebFlux Rest接口測試

配置模式的 WebFlux Rest接口只能使用PostMan測試,例子如下:

在這里插入圖片描述

注意,不能帶上下文路徑:

http://192.168.68.1:7705/uaa-react-provider/user

注解模式的WebFlux Rest接口測試

swagger 增加界面

在這里插入圖片描述

Spring boot。CRUD其他的界面,略過

配置大全

靜態資源配置

@Configuration
@EnableWebFlux		//使用注解@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer {		//繼承WebFluxConfigurer //配置靜態資源@Overridepublic void addResourceHandlers(ResourceHandlerRegistry registry) {registry.addResourceHandler("/static/**").addResourceLocations("classpath:/static/");registry.addResourceHandler("/file/**").addResourceLocations("file:" + System.getProperty("user.dir") + File.separator + "file" + File.separator);registry.addResourceHandler("/swagger-ui.html**").addResourceLocations("classpath:/META-INF/resources/");registry.addResourceHandler("/webjars/**").addResourceLocations("classpath:/META-INF/resources/webjars/");}//配置攔截器//配置編解碼...
}

WebFluxSecurity配置

@Configuration
@EnableWebSecurity
public class WebMvcSecurityConfig extends WebSecurityConfigurerAdapter implements 
AuthenticationEntryPoint,		//未驗證回調
AuthenticationSuccessHandler,		//驗證成功回調
AuthenticationFailureHandler,		//驗證失敗回調
LogoutSuccessHandler {		//登出成功回調@Overridepublic void commence(HttpServletRequest request, HttpServletResponse response, AuthenticationException authException) throws IOException, ServletException {sendJson(response, new Response<>(HttpStatus.UNAUTHORIZED.value(), "Unauthorized"));}@Overridepublic void onAuthenticationFailure(HttpServletRequest request, HttpServletResponse response, AuthenticationException exception) throws IOException, ServletException {sendJson(response, new Response<>(1, "Incorrect"));}@Overridepublic void onAuthenticationSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {sendJson(response, new Response<>(0, authentication.getClass().getSimpleName()));}@Overridepublic void onLogoutSuccess(HttpServletRequest request, HttpServletResponse response, Authentication authentication) throws IOException, ServletException {sendJson(response, new Response<>(0, "Success"));}@Overrideprotected void configure(HttpSecurity http) throws Exception {http.csrf().disable().authorizeRequests().antMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs").permitAll().and().authorizeRequests().antMatchers("/static/**", "/file/**").permitAll().and().authorizeRequests().anyRequest().authenticated().and().logout().logoutUrl("/user/logout")		//虛擬路徑,不是控制器定義的路徑.logoutSuccessHandler(this).permitAll().and().exceptionHandling().authenticationEntryPoint(this).and().formLogin().usernameParameter("username").passwordParameter("password").loginProcessingUrl("/user/login")		//虛擬路徑,不是控制器定義的路徑.successForwardUrl("/user/login")		//是控制器定義的路徑.failureHandler(this).and().httpBasic().authenticationEntryPoint(this);}@Overrideprotected void configure(AuthenticationManagerBuilder auth) throws Exception {auth.userDetailsService(userDetailService);}

webflux-驗證依賴于用戶數據服務,需定義實現ReactiveUserDetailsService的Bean

@Configuration
@EnableWebFluxSecurity		//使用注解@EnableWebFluxSecurity
public class WebFluxSecurityConfig implements 
WebFilter,		//攔截器
ServerLogoutSuccessHandler,		//登出成功回調
ServerAuthenticationEntryPoint,		//驗證入口
ServerAuthenticationFailureHandler,		//驗證成功回調 
ServerAuthenticationSuccessHandler {		//驗證失敗回調//實現接口的方法@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {//配置webflux的context-pathServerHttpRequest request = exchange.getRequest();if (request.getURI().getPath().startsWith(contextPath)) {exchange = exchange.mutate().request(request.mutate().contextPath(contextPath).build()).build();}//把查詢參數轉移到FormData中,不然驗證過濾器(ServerFormLoginAuthenticationConverter)接受不到參數if (exchange.getRequest().getMethod() == HttpMethod.POST && exchange.getRequest().getQueryParams().size() > 0) {ServerWebExchange finalExchange = exchange;ServerWebExchange realExchange = new Decorator(exchange) {@Overridepublic Mono<MultiValueMap<String, String>> getFormData() {return super.getFormData().map(new Function<MultiValueMap<String, String>, MultiValueMap<String, String>>() {@Overridepublic MultiValueMap<String, String> apply(MultiValueMap<String, String> stringStringMultiValueMap) {if (stringStringMultiValueMap.size() == 0) {return finalExchange.getRequest().getQueryParams();} else {return stringStringMultiValueMap;}}});}};return chain.filter(realExchange);}return chain.filter(exchange);}@Overridepublic Mono<Void> onLogoutSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {return sendJson(webFilterExchange.getExchange(), new Response<>("登出成功"));}@Overridepublic Mono<Void> commence(ServerWebExchange exchange, AuthenticationException e) {return sendJson(exchange, new Response<>(HttpStatus.UNAUTHORIZED.value(), "未驗證"));}@Overridepublic Mono<Void> onAuthenticationFailure(WebFilterExchange webFilterExchange, AuthenticationException exception) {return sendJson(webFilterExchange.getExchange(), new Response<>(1, "驗證失敗"));}@Overridepublic Mono<Void> onAuthenticationSuccess(WebFilterExchange webFilterExchange, Authentication authentication) {return webFilterExchange.getChain().filter(webFilterExchange.getExchange().mutate().request(t -> t.method(HttpMethod.POST).path("/user/login"))		//轉發到自定義控制器.build());}@Beanpublic SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {http.addFilterAfter(this, SecurityWebFiltersOrder.FIRST).csrf().disable().authorizeExchange().pathMatchers("/swagger*/**", "/webjars/**", "/v2/api-docs")		//swagger.permitAll().and().authorizeExchange().pathMatchers("/static/**", "/file/**")		//靜態資源.permitAll().and().authorizeExchange().anyExchange().authenticated().and().logout()		//登出.logoutUrl("/user/logout").logoutSuccessHandler(this).and().exceptionHandling()		//未驗證回調.authenticationEntryPoint(this).and().formLogin().loginPage("/user/login").authenticationFailureHandler(this)		//驗證失敗回調.authenticationSuccessHandler(this)		//驗證成功回調.and().httpBasic().authenticationEntryPoint(this);		//basic驗證,一般用于移動端return http.build();}
}

WebSession配置

@Configuration
@EnableRedisWebSession(maxInactiveIntervalInSeconds = 60) //使用注解@EnableRedisWebSession ,maxInactiveIntervalInSeconds設置數據過期時間,spring.session.timeout不管用
public class RedisWebSessionConfig { //考慮到分布式系統,一般使用redis存儲session@Beanpublic LettuceConnectionFactory lettuceConnectionFactory() {return new LettuceConnectionFactory();}}
//單點登錄使用ReactiveRedisSessionRepository.getSessionRedisOperations().scan方法查詢相同用戶名的session,刪除其他session即可
public Mono<Map<String, String>> findByPrincipalName(String name) {return reactiveSessionRepository.getSessionRedisOperations().scan(ScanOptions.scanOptions().match(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:*").build()).flatMap(new Function<String, Publisher<Tuple2<String, Map.Entry<Object, Object>>>>() {@Overridepublic Publisher<Tuple2<String, Map.Entry<Object, Object>>> apply(String s) {return reactiveSessionRepository.getSessionRedisOperations().opsForHash().entries(s).map(new Function<Map.Entry<Object, Object>, Tuple2<String, Map.Entry<Object, Object>>>() {@Overridepublic Tuple2<String, Map.Entry<Object, Object>> apply(Map.Entry<Object, Object> objectObjectEntry) {return Tuples.of(s, objectObjectEntry);}});}}).filter(new Predicate<Tuple2<String, Map.Entry<Object, Object>>>() {@Overridepublic boolean test(Tuple2<String, Map.Entry<Object, Object>> rule) {Map.Entry<Object, Object> t = rule.getT2();String key = "sessionAttr:" + HttpSessionSecurityContextRepository.SPRING_SECURITY_CONTEXT_KEY;if (key.equals(t.getKey())) {User sci = (User) ((SecurityContextImpl) t.getValue()).getAuthentication().getPrincipal();return sci.getUsername().equals(name);}return false;}}).collectMap(new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {@Overridepublic String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {return name;}}, new Function<Tuple2<String, Map.Entry<Object, Object>>, String>() {@Overridepublic String apply(Tuple2<String, Map.Entry<Object, Object>> rule) {return rule.getT1().replace(ReactiveRedisSessionRepository.DEFAULT_NAMESPACE + ":sessions:", "");}});}

對標的 SpringWebMVC配置

@Configuration
@EnableRedisHttpSession	//使用注解@EnableRedisHttpSession	
public class RedisHttpSessionConfig { //考慮到分布式系統,一般使用redis存儲session@Beanpublic LettuceConnectionFactory redisConnectionFactory() {return new LettuceConnectionFactory();}}
//單點登錄使用FindByIndexNameSessionRepository根據用戶名查詢session,刪除其他session即可
Map<String, Session> map = findByIndexNameSessionRepository.findByPrincipalName(name);

文件上傳配置

//參數上傳
//定義參數bean
@Setter
@Getter
@ToString
@ApiModel
public class QueryBean{@ApiModelProperty(value = "普通參數", required = false, example = "")private String query;@ApiModelProperty(value = "文件參數", required = false, example = "")private FilePart image;		//強調,webflux中使用FilePart作為接收文件的類型
}
//定義接口
@ApiOperation("一個接口")
@PostMapping("/path")
//這里需要使用@ApiImplicitParam顯示配置【文件參數】才能使swagger界面顯示上傳文件按鈕
@ApiImplicitParams({@ApiImplicitParam(paramType = "form", //表單參數dataType = "__file", //最新版本使用__file表示文件,以前用的是filename = "image", //和QueryBean里面的【文件參數image】同名value = "文件")	//注釋
})
public Mono<Response> bannerAddOrUpdate(QueryBean q) {}

WebFlux 執行流程

userAdd方法代碼如下:

        public Mono<User> userAdd(@RequestBody User dto){//命令式寫法
//        jpaEntityService.delUser(dto);//響應式寫法return Mono.create(cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto)));}

由于返回的數據只有一個所以使用的是Mono作為返回數據,使用Mono類靜態create方法創建Mono對象,代碼如下:

public abstract class Mono<T> implements Publisher<T> {static final BiPredicate EQUALS_BIPREDICATE = Object::equals;public Mono() {}public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {return onAssembly(new MonoCreate(callback));}
...
}

? 可以到create方法接收一個參數,參數是Consumer對象,通過callback可以看出,這里使用的是callback回調,下面看看Consumer接口的定義:


@FunctionalInterface
public interface Consumer<T> {/*** Performs this operation on the given argument.** @param t the input argument*/void accept(T t);/*** Returns a composed {@code Consumer} that performs, in sequence, this* operation followed by the {@code after} operation. If performing either* operation throws an exception, it is relayed to the caller of the* composed operation.  If performing this operation throws an exception,* the {@code after} operation will not be performed.** @param after the operation to perform after this operation* @return a composed {@code Consumer} that performs in sequence this* operation followed by the {@code after} operation* @throws NullPointerException if {@code after} is null*/default Consumer<T> andThen(Consumer<? super T> after) {Objects.requireNonNull(after);return (T t) -> { accept(t); after.accept(t); };}
}

spring5。通過上面的代碼可以看出,有兩個方法,一個是默認的方法andThen,還有一個accept方法,

Mono.create()方法的參數需要一個實現類,實現Consumer接口;Mono.create方法的參數指向的實例對象, 就是要實現這個accept方法。

例子中,下面的lambda表達式,就是accept方法的實現,實參的類型為 Consumer<MonoSink> , accept的實現為 如下:

cityMonoSink -> cityMonoSink.success(jpaEntityService.addUser(dto))

來來來,重復看一下,create方法的實現:

   public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {return onAssembly(new MonoCreate(callback));}

? 在方法內部調用了onAssembly方法,參數是MonoCreate對象,然后我們看看MonoCreate類,代碼如下:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//package reactor.core.publisher;import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable.Attr;
import reactor.core.publisher.FluxCreate.SinkDisposable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;final class MonoCreate<T> extends Mono<T> {final Consumer<MonoSink<T>> callback;MonoCreate(Consumer<MonoSink<T>> callback) {this.callback = callback;}public void subscribe(CoreSubscriber<? super T> actual) {MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);actual.onSubscribe(emitter);try {this.callback.accept(emitter);} catch (Throwable var4) {emitter.error(Operators.onOperatorError(var4, actual.currentContext()));}}static final class DefaultMonoSink<T> extends AtomicBoolean implements MonoSink<T>, InnerProducer<T> {final CoreSubscriber<? super T> actual;volatile Disposable disposable;static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, Disposable> DISPOSABLE = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, Disposable.class, "disposable");volatile int state;static final AtomicIntegerFieldUpdater<MonoCreate.DefaultMonoSink> STATE = AtomicIntegerFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, "state");volatile LongConsumer requestConsumer;static final AtomicReferenceFieldUpdater<MonoCreate.DefaultMonoSink, LongConsumer> REQUEST_CONSUMER = AtomicReferenceFieldUpdater.newUpdater(MonoCreate.DefaultMonoSink.class, LongConsumer.class, "requestConsumer");T value;static final int NO_REQUEST_HAS_VALUE = 1;static final int HAS_REQUEST_NO_VALUE = 2;static final int HAS_REQUEST_HAS_VALUE = 3;DefaultMonoSink(CoreSubscriber<? super T> actual) {this.actual = actual;}public Context currentContext() {return this.actual.currentContext();}@Nullablepublic Object scanUnsafe(Attr key) {if (key != Attr.TERMINATED) {return key == Attr.CANCELLED ? OperatorDisposables.isDisposed(this.disposable) : super.scanUnsafe(key);} else {return this.state == 3 || this.state == 1;}}public void success() {if (STATE.getAndSet(this, 3) != 3) {try {this.actual.onComplete();} finally {this.disposeResource(false);}}}public void success(@Nullable T value) {if (value == null) {this.success();} else {int s;do {s = this.state;if (s == 3 || s == 1) {Operators.onNextDropped(value, this.actual.currentContext());return;}if (s == 2) {if (STATE.compareAndSet(this, s, 3)) {try {this.actual.onNext(value);this.actual.onComplete();} finally {this.disposeResource(false);}}return;}this.value = value;} while(!STATE.compareAndSet(this, s, 1));}}public void error(Throwable e) {if (STATE.getAndSet(this, 3) != 3) {try {this.actual.onError(e);} finally {this.disposeResource(false);}} else {Operators.onOperatorError(e, this.actual.currentContext());}}public MonoSink<T> onRequest(LongConsumer consumer) {Objects.requireNonNull(consumer, "onRequest");if (!REQUEST_CONSUMER.compareAndSet(this, (Object)null, consumer)) {throw new IllegalStateException("A consumer has already been assigned to consume requests");} else {return this;}}public CoreSubscriber<? super T> actual() {return this.actual;}public MonoSink<T> onCancel(Disposable d) {Objects.requireNonNull(d, "onCancel");SinkDisposable sd = new SinkDisposable((Disposable)null, d);if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {Disposable c = this.disposable;if (c instanceof SinkDisposable) {SinkDisposable current = (SinkDisposable)c;if (current.onCancel == null) {current.onCancel = d;} else {d.dispose();}}}return this;}public MonoSink<T> onDispose(Disposable d) {Objects.requireNonNull(d, "onDispose");SinkDisposable sd = new SinkDisposable(d, (Disposable)null);if (!DISPOSABLE.compareAndSet(this, (Object)null, sd)) {Disposable c = this.disposable;if (c instanceof SinkDisposable) {SinkDisposable current = (SinkDisposable)c;if (current.disposable == null) {current.disposable = d;} else {d.dispose();}}}return this;}public void request(long n) {if (Operators.validate(n)) {LongConsumer consumer = this.requestConsumer;if (consumer != null) {consumer.accept(n);}int s;do {s = this.state;if (s == 2 || s == 3) {return;}if (s == 1) {if (STATE.compareAndSet(this, s, 3)) {try {this.actual.onNext(this.value);this.actual.onComplete();} finally {this.disposeResource(false);}}return;}} while(!STATE.compareAndSet(this, s, 2));}}public void cancel() {if (STATE.getAndSet(this, 3) != 3) {this.value = null;this.disposeResource(true);}}void disposeResource(boolean isCancel) {Disposable d = this.disposable;if (d != OperatorDisposables.DISPOSED) {d = (Disposable)DISPOSABLE.getAndSet(this, OperatorDisposables.DISPOSED);if (d != null && d != OperatorDisposables.DISPOSED) {if (isCancel && d instanceof SinkDisposable) {((SinkDisposable)d).cancel();}d.dispose();}}}}
}

上面的代碼比較多,我們主要關注下面兩個函數:

MonoCreate(Consumer<MonoSink<T>> callback) {this.callback = callback;}public void subscribe(CoreSubscriber<? super T> actual) {MonoCreate.DefaultMonoSink<T> emitter = new MonoCreate.DefaultMonoSink(actual);actual.onSubscribe(emitter);try {this.callback.accept(emitter);} catch (Throwable var4) {emitter.error(Operators.onOperatorError(var4, actual.currentContext()));}}

spring、通過上面的代碼可以看出,一個是構造器,參數是Consumer,里面進行操作保存了Consumer對象,然后在subscribe方法里面有一句代碼是this.callback.accept(emitter),就是在這里進行了接口的回調,回調Consumer的accept方法,這個方法是在調用Mono.create()方法的時候實現了。然后在細看subscribe方法,這里面有一個actual.onSubscribe方法,通過方法名可以知道,這里是訂閱了消息。webflux是基于reactor模型,基于事件消息和異步,這里也體現了一個異步。

Mono和Flux的其他用法可以參照上面的源碼流程自己看看,就不細說了。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/181713.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息