(1)是 Spring5 添加新的模塊,用于 web 開發的,功能和 SpringMVC 類似的,Webflux 使用
當前一種比較流程響應式編程出現的框架。
(2)使用傳統 web 框架,比如 SpringMVC,這些基于 Servlet 容器,Webflux 是一種異步非阻
塞的框架,異步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相關 API 實現
的。
(3)解釋什么是異步非阻塞
* 異步和同步
* 非阻塞和阻塞
** 上面都是針對對象不一樣
** 異步和同步針對調用者,調用者發送請求,如果等著對方回應之后才去做其他事情就是同
步,如果發送請求之后不等著對方回應就去做其他事情就是異步
** 阻塞和非阻塞針對被調用者,被調用者受到請求之后,做完請求任務之后才給出反饋就是阻
塞,受到請求之后馬上給出反饋然后再去做事情就是非阻塞
(4)Webflux 特點:
第一 非阻塞式:在有限資源下,提高系統吞吐量和伸縮性,以 Reactor 為基礎實現響應式編程
第二 函數式編程:Spring5 框架基于 java8,Webflux 使用 Java8 函數式編程方式實現路由請求
(5)比較 SpringMVC
Spring Web Flow,第一 兩個框架都可以使用注解方式,都運行在 Tomet 等容器中
第二 SpringMVC 采用命令式編程,Webflux 采用異步響應式編程
響應式編程是一種面向數據流和變化傳播的編程范式。這意味著可以在編程語言中很方便
地表達靜態或動態的數據流,而相關的計算模型會自動將變化的值通過數據流進行傳播。
電子表格程序就是響應式編程的一個例子。單元格可以包含字面值或類似"=B1+C1"的公
式,而包含公式的單元格的值會依據其他單元格的值的變化而變化。
* 提供的觀察者模式兩個類 Observer 和 Observable?
/*** A class can implement the <code>Observer</code> interface when it* wants to be informed of changes in observable objects.*/
public interface Observer {/*** This method is called whenever the observed object is changed. An* application calls an <tt>Observable</tt> object's*/void update(Observable o, Object arg);
}
public class Observable {private boolean changed = false;private Vector obs;/** Construct an Observable with zero Observers. */public Observable() {obs = new Vector();}/**將一個觀察者添加到觀察者聚集上面*/public synchronized void addObserver(Observer o) {if (o == null)throw new NullPointerException();if (!obs.contains(o)) {obs.addElement(o);}}/** 將一個觀察者從觀察者聚集上刪除 */public synchronized void deleteObserver(Observer o) {obs.removeElement(o);}public void notifyObservers() {notifyObservers(null);}/*** 如果本對象有變化(那時hasChanged 方法會返回true)* 調用本方法通知所有登記的觀察者,即調用它們的update()方法* 傳入this和arg作為參數*/public void notifyObservers(Object arg) {Object[] arrLocal;synchronized (this) {if (!changed)return;arrLocal = obs.toArray();clearChanged();}for (int i = arrLocal.length-1; i>=0; i--)((Observer)arrLocal[i]).update(this, arg);}/** 將觀察者聚集清空 */public synchronized void deleteObservers() {obs.removeAllElements();}/** 將“已變化”設置為true */protected synchronized void setChanged() {changed = true;}/** 將“已變化”重置為false */protected synchronized void clearChanged() {changed = false;}public synchronized boolean hasChanged() {return changed;}public synchronized int countObservers() {return obs.size();}
}
public class ObserverDemo extends Observable {public static void main(String[] args) {ObserverDemo observer = new ObserverDemo();//添加觀察者observer.addObserver((o,arg)->{System.out.println("發生變化");});observer.addObserver((o,arg)->{System.out.println("手動被觀察者通知,準備改變");});observer.setChanged(); //數據變化observer.notifyObservers(); //通知}
}
Spring boot,(1)響應式編程操作中,Reactor 是滿足 Reactive 規范框架
(2)Reactor 有兩個核心類,Mono 和 Flux,這兩個類實現接口 Publisher,提供豐富操作
符。Flux 對象實現發布者,返回 N 個元素;Mono 實現發布者,返回 0 或者 1 個元素
(3)Flux 和 Mono 都是數據流的發布者,使用 Flux 和 Mono 都可以發出三種數據信號:
元素值,錯誤信號,完成信號,錯誤信號和完成信號都代表終止信號,終止信號用于告訴
訂閱者數據流結束了,錯誤信號終止數據流同時把錯誤信息傳遞給訂閱者
第一步 引入依賴
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
??? <groupId>io.projectreactor</groupId>
??? <artifactId>reactor-core</artifactId>
??? <version>3.4.18</version>
</dependency>
第二步 編程代碼?
spring5?調用 just 或者其他方法只是聲明數據流,數據流并沒有發出,只有進行訂閱之后才會觸
發數據流,不訂閱什么都不會發生的
public class TestRector {public static void main(String[] args) {//聲明不同類型數據流//just 方法直接聲明元素Flux<Integer> flux=Flux.just(1,2,3,4);//訂閱輸出數據流flux.subscribe(System.out::println);Mono<Integer> mono=Mono.just(1);mono.subscribe(System.out::println);
// //聲明數組元素
// Integer[] array = {1,2,3,4};
// Flux.fromArray(array);
// //聲明集合元素
// List<Integer> list = Arrays.asList(array);
// Flux.fromIterable(list);
//
// Stream<Integer> stream = list.stream();
// Flux.fromStream(stream);}
}
* 錯誤信號和完成信號都是終止信號,不能共存的
* 如果沒有發送任何元素值,而是直接發送錯誤或者完成信號,表示是空數據流
* 如果沒有錯誤信號,沒有完成信號,表示是無限數據流
* 對數據流進行一道道操作,成為操作符,比如工廠流水線
Spring Framework、?
? 把每個元素轉換流,把轉換之后多個流合并大的流
SpringWebflux 基于 Reactor,默認使用容器是 Netty,Netty 是高性能的NIO 框架,異步非阻
塞的框架
spring,1.BIO (同步阻塞I/O模式)
數據的讀取寫入必須阻塞在一個線程內等待其完成。
這里使用那個經典的燒開水例子,這里假設一個燒開水的場景,有一排水壺在燒開水,BIO的工作模式就是, 叫一個線程停留在一個水壺那,直到這個水壺燒開,才去處理下一個水壺。但是實際上線程在等待水壺燒開的時間段什么都沒有做。
2.NIO(同步非阻塞)
SpringCloud。同時支持阻塞與非阻塞模式,但這里我們以其同步非阻塞I/O模式來說明,那么什么叫做同步非阻塞?如果還拿燒開水來說,NIO的做法是叫一個線程不斷的輪詢每個水壺的狀態,看看是否有水壺的狀態發生了改變,從而進行下一步的操作。
3.AIO (異步非阻塞I/O模型)
異步非阻塞與同步非阻塞的區別在哪里?異步非阻塞無需一個線程去輪詢所有IO操作的狀態改變,在相應的狀態改變后,系統會通知對應的線程來處理。對應到燒開水中就是,為每個水壺上面裝了一個開關,水燒開之后,水壺會自動通知我水燒開了。
* SpringWebflux 核心控制器 DispatchHandler,實現接口 WebHandler
* 接口 WebHandler 有一個方法
springwebflux、
?
* HandlerMapping:請求查詢到處理的方法
* HandlerAdapter:真正負責請求處理
* HandlerResultHandler:響應結果處理
SpringWebflux 實現方式有兩種:注解編程模型和函數式編程模型
使用注解編程模型方式,和之前 SpringMVC 使用相似的,只需要把相關依賴配置到項目中,
SpringBoot 自動配置相關運行容器,默認情況下使用 Netty 服務器
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId> </dependency>
實體類
@Data//lombok工具
@AllArgsConstructor
public class User {private String name;private String gender;private Integer age;
}
創建接口定義操作的方法
//用戶操作接口
public interface UserService {//根據 id 查詢用戶Mono<User> getUserById(int id);//查詢所有用戶Flux<User> getAllUser();//添加用戶Mono<Void> saveUserInfo(Mono<User> user);
}
接口實現類
@Service
public class UserServiceImpl implements UserService {//創建 map 集合存儲數據private final Map<Integer, User> users = new HashMap<>();public UserServiceImpl() {this.users.put(1,new User("lucy","nan",20));this.users.put(2,new User("mary","nv",30));this.users.put(3,new User("jack","nv",50));}//根據 id 查詢@Overridepublic Mono<User> getUserById(int id) {return Mono.justOrEmpty(this.users.get(id));}//查詢多個用戶@Overridepublic Flux<User> getAllUser() {return Flux.fromIterable(this.users.values());}//添加用戶@Overridepublic Mono<Void> saveUserInfo(Mono<User> userMono) {return userMono.doOnNext(person -> {//向 map 集合里面放值int id = users.size()+1;users.put(id,person);}).thenEmpty(Mono.empty());}
}
創建 controller
@RestController
public class UserController {//注入 service@Autowiredprivate UserService userService;//id 查詢@GetMapping("/user/{id}")public Mono<User> getUserId(@PathVariable int id) {return userService.getUserById(id);}//查詢所有@GetMapping("/user")public Flux<User> getUsers() {return userService.getAllUser();}//添加@PostMapping("/saveuser")public Mono<Void> saveUser(@RequestBody User user) {Mono<User> userMono = Mono.just(user);return userService.saveUserInfo(userMono);}
}
? 說明
SpringMVC 方式實現,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat
SpringWebflux 方式實現,異步非阻塞 方式,基于 SpringWebflux+Reactor+Netty
(1)在使用函數式編程模型操作時候,需要自己初始化服務器
(2)基于函數式編程模型時候,有兩個核心接口:RouterFunction(實現路由功能,請求轉發
給對應的 handler)和 HandlerFunction(處理請求生成響應的函數)。核心任務定義兩個函數
式接口的實現并且啟動需要的服務器。
( 3 ) SpringWebflux 請 求 和 響 應 不 再 是 ServletRequest 和 ServletResponse ,而是
ServerRequest 和 ServerResponse
創建handler文件夾
public class UserHandler {private final UserService userService;public UserHandler(UserService userService) {this.userService = userService;}//根據 id 查詢public Mono<ServerResponse> getUserById(ServerRequest request) {//獲取 id 值int userId = Integer.parseInt(request.pathVariable("id"));//空值處理Mono<ServerResponse> notFound = ServerResponse.notFound().build();//調用 service 方法得到數據Mono<User> userMono = this.userService.getUserById(userId);//把 userMono 進行轉換返回//使用 Reactor 操作符 flatMapreturnuserMono.flatMap(person ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(person))).switchIfEmpty(notFound);}//查詢所有public Mono<ServerResponse> getAllUsers(ServerRequest request) {//調用 service 得到結果Flux<User> users = this.userService.getAllUser();returnServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users,User.class);}//添加public Mono<ServerResponse> saveUser(ServerRequest request) {//得到 user 對象Mono<User> userMono = request.bodyToMono(User.class);returnServerResponse.ok().build(this.userService.saveUserInfo(userMono));}}
創建Server類
public class Server {
}
在里面編寫創建路由的方法
//1 創建 Router 路由public RouterFunction<ServerResponse> routingFunction() {//創建 hanler 對象UserService userService = new UserServiceImpl();UserHandler handler = new UserHandler(userService);//設置路由return RouterFunctions.route(GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById).andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::getAllUsers);}
在里面創建服務器完成適配
//2 創建服務器完成適配
public void createReactorServer() { //路由和 handler 適配 RouterFunction<ServerResponse> route = routingFunction(); HttpHandler httpHandler = toHttpHandler(route); ReactorHttpHandlerAdapter adapter = new
ReactorHttpHandlerAdapter(httpHandler); //創建服務器 HttpServer httpServer = HttpServer.create(); httpServer.handle(adapter).bindNow();
}
最終調用
// 最終調用public static void main(String[] args) throws Exception{Server server = new Server();server.createReactorServer();System.out.println("enter to exit");System.in.read();}
?
創建Client類,服務器地址要與前面相同
public class Client {public static void main(String[] args) {//調用服務器地址WebClient webClient = WebClient.create("http://127.0.0.1:63192");//根據 id 查詢String id = "1";User userresult = webClient.get().uri("/users/{id}", id).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block();System.out.println(userresult.getName());//查詢所有Flux<User> results = webClient.get().uri("/users").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(User.class);results.map(stu -> stu.getName()).buffer().doOnNext(System.out::println).blockFirst();}
}
然后啟動服務器,啟動客戶端
?
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态