futuretaskget阻塞,JCU-futuretask如何實現

 2023-10-21 阅读 20 评论 0

摘要:一. Future是什么? 1. Future是什么? JDK 的 Future 就類似于我們網購買東西的訂單號,當我們執行某一耗時的任務時,我們可以另起一個線程異步去執行這個耗時的任務,同時我們可以干點其他事情。當事情干完后我們再根據 future 這個"訂單

一. Future是什么?

1. Future是什么?

JDK 的 Future 就類似于我們網購買東西的訂單號,當我們執行某一耗時的任務時,我們可以另起一個線程異步去執行這個耗時的任務,同時我們可以干點其他事情。當事情干完后我們再根據 future 這個"訂單號"去提取耗時任務的執行結果即可。因此 Future 也是多線程中的一種應用模式。

擴展: 說起多線程,那么 Future 又與 Thread 有什么區別呢?最重要的區別就是 Thread 是沒有返回結果的,而 Future 模式是有返回結果的。

2. 如何使用Future

前面搞明白了什么是Future,下面我們再來舉個簡單的例子看看如何使用Future。

假如現在我們要打火鍋,首先我們要準備兩樣東西:把水燒開和準備食材。因為燒開水是一個比較漫長的過程(相當于耗時的業務邏輯),因此我們可以一邊燒開水(相當于另起一個線程),一邊準備火鍋食材(主線程),等兩者都準備好了我們就可以開始打火鍋了。

public class DaHuoGuo {public static void main(String[] args) throws Exception {FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {@Overridepublic String call() throws Exception {println(Thread.currentThread().getName() + ":" + "開始燒開水...");// 模擬燒開水耗時Thread.sleep(2000);println(Thread.currentThread().getName() + ":"  + "開水已經燒好了...");return "開水";}});Thread thread = new Thread(futureTask);thread.start();// do other thingprintln(Thread.currentThread().getName() + ":"  + " 此時開啟了一個線程執行future的邏輯(燒開水),此時我們可以干點別的事情(比如準備火鍋食材)...");// 模擬準備火鍋食材耗時Thread.sleep(3000);println(Thread.currentThread().getName() + ":"  + "火鍋食材準備好了");String shicai = "火鍋食材";// 開水已經稍好,我們取得燒好的開水String boilWater = futureTask.get();println(Thread.currentThread().getName() + ":"  + boilWater + "和" + shicai + "已經準備好,我們可以開始打火鍋啦");}public static void println(String content){SimpleDateFormat sdf = new SimpleDateFormat();// 格式化時間 sdf.applyPattern("HH:mm:ss");// a為am/pm的標記Date date = new Date();// 獲取當前時間 System.out.println("["+sdf.format(date)+"] "+content);}
}// [14:46:51] main: 此時開啟了一個線程執行future的邏輯(燒開水),此時我們可以干點別的事情(比如準備火鍋食材)...
// [14:46:51] Thread-0:開始燒開水...
// [14:46:53] Thread-0:開水已經燒好了...
// [14:46:54] main:火鍋食材準備好了
// [14:46:54] main:開水和火鍋食材已經準備好,我們可以開始打火鍋啦

futuretaskget阻塞、從以上代碼中可以看到,我們使用Future主要有以下步驟:

  1. 新建一個 Callable 匿名函數實現類對象,我們的業務邏輯在 Callablecall 方法中實現,其中 Callable 的泛型是返回結果類型;
  2. 然后把 Callable 匿名函數對象作為 FutureTask 的構造參數傳入,構建一個 futureTask 對象;
  3. 然后再把 futureTask 對象作為 Thread 構造參數傳入并開啟這個線程執行去執行業務邏輯;
  4. 最后我們調用 futureTask 對象的 get 方法得到業務邏輯執行結果。

可以看到跟 Future 使用有關的 JDK 類主要有 FutureTaskCallable 兩個,下面主要對 FutureTask 進行源碼分析。

擴展:還有一種使用 Future 的方式是將 Callable 實現類提交給線程池執行的方式,這里不再介紹,自行百度即可。

二. FutureTask源碼分析

(一) FutureTask的成員變量和成員方法

  1. 我們先來看下FutureTask的類結構:
    futuretask.png

可以看到 FutureTask 實現了 RunnableFuture 接口,而RunnableFuture接口又繼承了 FutureRunnable 接口。因為FutureTask間接實現了Runnable接口,因此可以作為任務被線程Thread執行;此外,最重要的一點就是FutureTask還間接實現了Future接口,因此還可以獲得任務執行的結果。

  1. 成員變量
    我們首先來看下FutureTask的成員變量有哪些,理解這些成員變量對后面的源碼分析非常重要。
    /** 封裝的Callable對象,其call方法用來執行異步任務 */
    private Callable<V> callable;
    /** 用來裝異步任務的執行結果 */
    private Object outcome;
    /** 執行callable任務的線程 */
    private volatile Thread runner;
    /** 線程等待節點,reiber stack的一種實現 */
    private volatile WaitNode waiters;
    /** 任務執行狀態 */
    private volatile int state;private static final sun.misc.Unsafe UNSAFE;
    // 使用 Unsafe 執行 cas 修改成員變量時, 用到的字段偏移量
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;// 靜態塊
    static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = FutureTask.class;stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));} catch (Exception e) {throw new Error(e);}
    }
    

(二) FutureTask的狀態變化

前面講了FutureTask的成員變量,有一個表示狀態的成員變量state我們要重點關注下,state變量表示任務執行的狀態。

private volatile int state;
/** 任務新建狀態 */
private static final int NEW          = 0;
/** 任務正在完成狀態,是一個瞬間過渡狀態 */
private static final int COMPLETING   = 1;
/** 任務正常結束狀態 */
private static final int NORMAL       = 2;
/** 任務執行異常狀態 */
private static final int EXCEPTIONAL  = 3;
/** 任務被取消狀態,對應cancel(false) */
private static final int CANCELLED    = 4;
/** 任務中斷狀態,是一個瞬間過渡狀態 */
private static final int INTERRUPTING = 5;
/** 任務被中斷狀態,對應cancel(true) */
private static final int INTERRUPTED  = 6;

futuretask阻塞、可以看到任務狀態變量state有以上7種狀態,0-6分別對應著每一種狀態。任務狀態一開始是NEW,然后由FutureTask的三個方法set,setExceptioncancel來設置狀態的變化,其中狀態變化有以下四種情況:

  • NEW -> COMPLETING -> NORMAL:
    這個狀態變化表示異步任務的正常結束,其中COMPLETING是一個瞬間臨時的過渡狀態,由set方法設置狀態的變化;
  • NEW -> COMPLETING -> EXCEPTIONAL:
    這個狀態變化表示異步任務執行過程中拋出異常,由setException方法設置狀態的變化;
  • NEW -> CANCELLED:
    這個狀態變化表示被取消,即調用了cancel(false),由cancel方法來設置狀態變化;
  • NEW -> INTERRUPTING -> INTERRUPTED:
    這個狀態變化表示被中斷,即調用了cancel(true),由cancel方法來設置狀態變化。

(三) run() 方法

public void run() {// 為了確保只有1個線程在執行futureTask, 需要確保兩個提交同時滿足, 否則直接從run()方法返回//  (1) futureTask 的狀態是 new//  (2) futureTask 此時的執行線程為 null, 即還沒有線程執行該 futureTask// 什么樣的調用方式會讓多個線程執行痛經一個 futureTask 呢? //  答: 實例化了一個 futureTask 對象, 然后調用了多次 new Thread(futureTask).start()if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {// 代碼執行到這里, 已經確保只有1個線程可以執行 futureTask, // 所以直接在當前線程中調用 callable.call() 即可; 調用中: //  (1) 如果發生異常: 更新狀態為 EXCEPTIONAL , 通過方法 setException()?//  (2) 如果沒有發生異常, 更新狀態為 NORMAL, 通過方法 set()Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// 代碼執行到這里, 還是已經確保了只有1個線程可以執行 futureTask// 無論當前線程執行是否拋出異常, 執行后都應該把 futureTask 的 runner 屬性置 null// 表示當前線程已執行完畢runner = null;// 后面3行是在處理執行過程中被 interrupt 的情況, 因為 run() 方法并不能實時響應中斷,// 只是通過代碼邏輯檢測中斷(參考while(!Thread.currentThread.isInterrupted())循環),// 因此, 在代碼執行后響應中斷, s >= INTERRUPTING 的情形, 處理方法為: //  private void handlePossibleCancellationInterrupt(int s) {//      if (s == INTERRUPTING)//          while (state == INTERRUPTING)//              Thread.yield();//  }if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield();int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}

這里值得注意的是判斷線程滿不滿足執行異步任務條件時, runner 是否為 null 是調用 UNSAFE 的 CAS 方法 compareAndSwapObject 來判斷和設置的,同時 compareAndSwapObject 是通過成員變量 runner 的偏移地址 runnerOffset 來給 runner 賦值的,此外,成員變量 runner 被修飾為 volatile 是在多線程的情況下, 一個線程的 volatile 修飾變量的設值能夠立即刷進主存,因此值便可被其他線程可見。

(四)FutureTask的狀態更改方法: set()setException()

protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 將run()最后的執行結果保存到 outcome 成員outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state NORMALfinishCompletion();}
}protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 將run()最后的執行結果保存到 outcome 成員outcome = t;UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state EXCEPTIONALfinishCompletion();}
}

(五)FutureTask的喚醒等待線程方法

因為 set(V v)setException(Throwable t) 方法最后都調用了 finishCompletion() , 就是表示異步任務不管正常還是異常結束, 都要執行一部分統一的操作, 這些操作主要是來喚醒所有因為 “調用 get() 方法時因異步任務還未執行完而阻塞” 的線程. 這些阻塞線程會被包裝成 WaitNode 類形成棧存儲. 因此喚醒(移除)的順序是"后進先出"即后面先來的線程先被先喚醒(移除),關于這個線程等待鏈表是如何成鏈的,后面再繼續分析。

private void finishCompletion() {// waiters 是 FutureTask 的成員變量, 每個因調用 get() 而阻塞的線程, 都會被// 包裝為 WaitNode 對象(定義見下方), 所有的阻塞線程會組成一個鏈表存儲. 首先看到的這個外層// for 循環其實是一種 "徹底清空所有WaitNode" 的保證, 真正遍歷鏈表進行喚醒的是// 內部的 for (;;) 循環; 需要這個保證是因為動作的起點是: //     if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) // 判斷, 這個判斷只能確保當時沒有新的線程因get()被加入等待隊列, 所以需要加上外層的for檢測for (WaitNode q; (q = waiters) != null;) {// 判斷沒有新線程加入get()的等待隊列if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {// 如下所有代碼都是普通的遍歷鏈表, 執行喚醒 WaitNode 內部線程的操作for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}// 無意義, done()的方法體1.8版本中為空 {}done();  // 因為異步任務已經執行完且結果已經保存到outcome中,因此此時可以將callable對象置空了callable = null;
}

[注]: WaitNode 定義:

static final class WaitNode {volatile Thread thread;   // 包裝線程// 成鏈表的標志 (實際為棧, 用棧頂元素執行cas判斷, 確定是否有新線程加入get()等待隊列)volatile WaitNode next;   WaitNode() { thread = Thread.currentThread(); }
}

(六)FutureTask.get方法,獲取任務執行結果

線程池和futuretask?前面我們起一個線程在其run方法中執行異步任務后,此時我們可以調用FutureTask.get方法來獲取異步任務執行的結果。

public V get() throws InterruptedException, ExecutionException {int s = state;// (1) 如果任務狀態state<=COMPLETING,說明異步任務正在執行過程中,//     此時會調用awaitDone方法阻塞等待if (s <= COMPLETING)s = awaitDone(false, 0L);// (2) 代碼執行到這里, 說明等待的線程已被喚醒, 任務執行完畢: //     任務可能執行成功也可能執行失敗, report() 會根據執行的狀態//     選擇正常返回還是拋異常. 定義詳見下面return report(s);
}
  1. awaitDone( )方法
// 2個參數出現的原因是: 有的線程指調用 get() 只想等待有限時間
// 等到任務結束返回的普通 get(), timed 參數為false   
private int awaitDone(boolean timed, long nanos)throws InterruptedException {// 計算最大等待的時間點. 不限制等待時長的時間點取0final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;// 還未入棧boolean queued = false;for (;;) {// (1) 等待線程被執行中斷, 拋異常退出if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}// 任務執行狀態int s = state;// (2) s > COMPLETING 表示任務執行完畢, 返回最終狀態退出. //     任務可能正常結束(NORMAL),可能拋出異常(EXCEPTIONAL) ,//     或任務被取消(CANCELLED,INTERRUPTING或INTERRUPTED狀態的一種)if (s > COMPLETING) {// 【問】run()方法在任務結束時,也會調用finishCompletion(), 諸個將等待棧中的//      WaitNode節點的thread置空,這里為什么又要再調用一次 q.thread = null 清空呢?// 【答】因為若很多線程來獲取任務執行結果,在任務執行完的那一刻,此時獲取任務的線程//      要么已經在線程等待鏈表中; 要么此時還是一個孤立的WaitNode節點。//      (1)在線程等待鏈表中的的所有WaitNode節點將由finishCompletion來移除(同時喚醒)所有//         等待的WaitNode節點,以便垃圾回收;//      (2)而孤立的線程WaitNode節點此時還未阻塞,因此不需要被喚醒,此時只要把其屬性置為//         null,然后其有沒有被誰引用,因此可以被GC。if (q != null)q.thread = null;return s;}// 任務還在執行中, 繼續等待else if (s == COMPLETING) Thread.yield();// 如果節點還未構造, 構造節點else if (q == null)q = new WaitNode();// 將構造的節點加入該線程等待棧的頭部// [問]: 為什么節點加入棧的動作要寫在循環內呢? // [答]: 這是多線程下cas節點入棧的標準寫法. 因為入棧動作可能失敗, 所以寫在//       死循環內持續入棧; 這也是循環內判斷 else if (q == null) 分支的//       原因: 這個分支是保證節點只構造一次, 但入棧動作可執行無數次知道成功else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 處理get()線程限時等待的情況: else if (timed) {nanos = deadline - System.nanoTime();// 等待已超時if (nanos <= 0L) {removeWaiter(q);return state;}// 等待未超時, 繼續等待預期時間LockSupport.parkNanos(this, nanos);}// 處理不限時get()的情況: else// 線程進入阻塞等待狀態LockSupport.park(this);}
}

總的來說, 將本來可以寫在一起的代碼邏輯, 比如構造節點后入棧, 然后將節點中的線程阻塞這3個先后動作, 拉平成同一等級的分支寫在死循環里的做法, 是一種兼顧 cas 操作失敗的寫法. 即保證無限次 cas 嘗試, 又保證無需 cas 的連貫動作可以在下一次 for 循環中like執行.

  1. report( )方法
private V report(int s) throws ExecutionException {// 執行結果Object x = outcome;// (1) 正常返回if (s == NORMAL)return (V)x;// (2) 因取消任務而拋異常退出if (s >= CANCELLED)throw new CancellationException();// (3) 任務失敗退出throw new ExecutionException((Throwable)x);
}

(七) FutureTask.cancel方法,取消執行任務

下面可以看到, 只有當執行cancel動作時, 還沒有線程執行任務時才能執行取消

public boolean cancel(boolean mayInterruptIfRunning) {// 狀態 != NEW, 則已有現成在執行任務, 不能取消// cas修改狀態時發現狀態不是 NEW 了, 說明有新線程執行任務了, 也不能取消任務if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;try {    // 如果潤徐中斷的話, 對線程中斷if (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {// 最終喚醒阻塞棧中的等待線程finishCompletion();}return true;
}

三. 總結

總的來說, 最簡單的實現 future 模式, 只要:

  • ( 1 ) 聲明一個 volatile 的標記變量, 標記任務是否執行完畢
  • ( 2 ) 未執行完畢時, 調用 get() 的線程執行 flag.wait() 即可. 利用了jvm內部的條件等待隊列
  • ( 3 ) 用線程執行run()方法

futuretask。反觀 javaSE 的實現, 有幾方面擴展:

  • ( 1 ) 標記變量不止是 true/false ,取而代之的是一系列狀態: new, completing, NORMAL, EXCEPTIONAL等. 這主要是為了配合run(),get(),cancel()在多線程下的邏輯
  • ( 2 ) get()線程阻塞的問題上, javaSE沒有使用synchronized的條件等待隊列, 而是用 cas 操作等待棧的方法. 當新線程執行get()阻塞時, 其它線程感值到新線程是通過 cas 查看棧頂節點是否發生變化得來的
  • ( 3 ) 對于run()方法上, 通過javaSE的實現通過設置成員變量 volatile Thread runner , 來限制同一時刻最多只有一個線程執行run()方法
  • ( 4 ) 除此之外, javaSE版本還實現了 cancel 等方法

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/164264.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息