Netty优雅退出机制shutdownGracefully源码分析

 2023-09-06 阅读 27 评论 0

摘要:使用Netty开发的小伙伴肯定对下面这两句代码非常熟悉了 bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();那就是Netty中大名鼎鼎的优雅退出,顾名思义它的作用就是使线程池退出,用我们都用过,那么它到底是如何工作的呢? 由于Net

 使用Netty开发的小伙伴肯定对下面这两句代码非常熟悉了

bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();
那就是Netty中大名鼎鼎的优雅退出,顾名思义它的作用就是使线程池退出,用我们都用过,那么它到底是如何工作的呢?

由于Netty处理的是线程池,线程池的关闭要求其中的每一个线程关闭。而线程的实现实在SingleThreadEventExecutor类,

所以我们将再次回到这个类,首先看其中的shutdownGracefully()方法,其中的参数quietPeriod为静默时间,timeout为截至时间,

第三个参数是TimeUnit unit为时间单位,其中该类还定义了一个变量gracefulShutdownStartTime,即优雅关闭开始时间,代码如下:

    @Overridepublic Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {if (isShuttingDown()) {return terminationFuture(); // 正在关闭阻止其他线程}boolean inEventLoop = inEventLoop();boolean wakeup;int oldState;for (;;) {if (isShuttingDown()) {return terminationFuture(); // 正在关闭阻止其他线程}int newState;wakeup = true;oldState = STATE_UPDATER.get(this);if (inEventLoop) {newState = ST_SHUTTING_DOWN;} else {switch (oldState) {case ST_NOT_STARTED:case ST_STARTED:newState = ST_SHUTTING_DOWN;break;default: // 一个线程已修改好线程状态,此时这个线程才执行16行代码newState = oldState;wakeup = false; // 已经有线程唤醒,所以不用再唤醒}}if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {break;  // 保证只有一个线程将oldState修改为newState}// 隐含STATE_UPDATER已被修改,则在下一次循环返回}// 在default情况下会更新这两个值gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);gracefulShutdownTimeout = unit.toNanos(timeout);if (oldState == ST_NOT_STARTED) {thread.start();}if (wakeup) {wakeup(inEventLoop);}return terminationFuture();}
这段代码考虑了多线程同时调用关闭的情况,我们抓住其中的关键点,该方法只是将线程状态修改为ST_SHUTTING_DOWN

并不执行具体的关闭操作(类似shutdown方法将线程状态修改为ST_SHUTDOWN)。for()循环是为了保证修改state的线程(原生

线程或者外部线程)有且只有一个,并且通过CAS操作来确保线程安全。所以这段代码的关键在于修改STATE_UPDATER状态,

这个状态有什么作用呢?

想想如果我们需要让阻塞的线程,从阻塞状态退出,使其从一个EventLoop循环中退出,有什么办法来通知它?

就是通过设置一个状态变量,来告诉它现在线程池的状态,从而它判断接下来是退出还是等待执行。

本人上一篇博文讲解NioEventLoop中讲到,run()方法中有这么一段代码

f (this.isShuttingDown()) {this.closeAll();if (this.confirmShutdown()) {this.cleanupAndTerminate(true);return;}}
其实查询线程状态的方法有三个:

    public boolean isShuttingDown() {return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;}public boolean isShutdown() {return STATE_UPDATER.get(this) >= ST_SHUTDOWN;}public boolean isTerminated() {return STATE_UPDATER.get(this) == ST_TERMINATED;}
正是使用了刚才的那个 STATE_UPDATER状态变量。

需要注意的是调用shutdownGracefully()方法后线程状态为ST_SHUTTING_DOWN,调用shutdown()方法后线程状态为

ST_SHUTDOWN,isShuttingDown()可以一并判断这两种状态。closeAll()方法关闭注册到NioEventLoop的所有Channel,

代码不在列出,在博主上一篇博文有。confirmShutdown()方法在SingleThreadEventExecutor类,确定是否可以关闭或者

说是否可以从NioEventLoop循环中跳出。代码如下:

    protected boolean confirmShutdown() {if (!isShuttingDown()) {return false;   // 没有调用shutdown相关的方法直接返回}if (!inEventLoop()) {   // 必须是原生线程throw new IllegalStateException("must be invoked from an event loop");}cancelScheduledTasks(); // 取消调度任务if (gracefulShutdownStartTime == 0) {   // 优雅关闭开始时间,这也是一个标记gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();}// 执行完普通任务或者没有普通任务时执行完shutdownHook任务if (runAllTasks() || runShutdownHooks()) {if (isShutdown()) {return true;    // 调用shutdown()方法直接退出}if (gracefulShutdownQuietPeriod == 0) {return true;    // 优雅关闭静默时间为0也直接退出}wakeup(true);   // 优雅关闭但有未执行任务,唤醒线程执行return false;}final long nanoTime = ScheduledFutureTask.nanoTime();// shutdown()方法调用直接返回,优雅关闭截止时间到也返回if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {return true;}// 在静默期间每100ms唤醒线程执行期间提交的任务if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {wakeup(true);try {Thread.sleep(100);} catch (InterruptedException e) {// Ignore}return false;}// 静默时间内没有任务提交,可以优雅关闭,此时若用户又提交任务则不会被执行return true;}
总结一下,调用shutdown()方法只要满足下列任一条件既能从循环跳出:

1.执行完普通任务

2.没有普通任务,执行完shutdownHook任务

3.既没有普通任务,也没有shutdownHook任务

调用shutdownGracefully()方法只要满足下列任一条件既能从循环跳出:

1.执行完普通任务且静默时间为0

2.没有普通任务,执行完shutdownHook任务且静默时间为0

3.静默期间没有任务提交

4.优雅关闭截至时间已到

我们可以将静默时间看作为一段观察期,在此期间如果没有任务执行,说明可以跳出循环;如果此期间有任务执行,

执行完后立即进入下一个观察期继续观察;如果连续多个观察期一直有任务执行,那么截止时间到则跳出循环。

我们看看shutdownGracefully()的默认参数

    public Future<?> shutdownGracefully() {return shutdownGracefully(2, 15, TimeUnit.SECONDS);}

可知,Netty默认为,在2秒的静默时间内如果没有任务,则关闭;否则15秒截止时间到达时关闭。

至此,我们已经清楚了从EventLoop循环中跳出的机制,最后,我们抵达终点站:线程结束机制。这一部分的代码实现

在线程工厂的生成方法中:

    thread = threadFactory.newThread(new Runnable() {@Overridepublic void run() {boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();   // 模板方法,EventLoop实现success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);// 用户调用了关闭的方法或者抛出异常if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;  // 抛出异常也将状态置为ST_SHUTTING_DOWN}}if (success && gracefulShutdownStartTime == 0) {// time=0,说明confirmShutdown()方法没有调用,记录日志}try {for (;;) {// 抛出异常时,将普通任务和shutdownHook任务执行完毕// 正常关闭时,结合前述的循环跳出条件if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {// 线程状态设置为ST_TERMINATED,线程终止STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {//  关闭时,任务队列中添加了任务,记录日志}terminationFuture.setSuccess(null); // 异步结果设置为成功}}}}});
20-22行代码说明子类在实现模板方法run()时,须调用confirmShutdown()方法,不调用的话会有错误日志。25-31行的

for()循环主要是对异常情况的处理,但同时也兼顾了正常调用关闭方法的情况。可以将抛出异常的情况视为静默时间为0的

shutdownGracefully()方法,这样便于理解循环跳出条件。34行代码cleanup()的默认实现什么也不做,NioEventLoop覆盖了基类。

实现关闭NioEventLoop持有的selector:

    protected void cleanup() {try {selector.close();} catch (IOException e) {logger.warn("Failed to close a selector.", e);}}
关于Netty优雅关闭的机制,还有最后一点细节,那就是runShutdownHooks()方法:

    private boolean runShutdownHooks() {boolean ran = false;while (!shutdownHooks.isEmpty()) {// 使用copy是因为shutdwonHook任务中可以添加或删除shutdwonHook任务List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);shutdownHooks.clear();for (Runnable task: copy) {try {task.run();} catch (Throwable t) {logger.warn("Shutdown hook raised an exception.", t);} finally {ran = true;}}}if (ran) {lastExecutionTime = ScheduledFutureTask.nanoTime();}return ran;}
此外,还有threadLock.release()方法,threadLock是一个初始值为0的信号量。一个初值为0的信号量,当线程请求锁时只会阻塞,

这又什么作用呢?从awaitTermination()方法揭晓答案,用来使其它线程阻塞等待原生线程关闭:

    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {// 由于tryAcquire()永远不会成功,所以必定阻塞timeout时间if (threadLock.tryAcquire(timeout, unit)) {threadLock.release();}return isTerminated();}
关于shutdownGracefully的分析就到此,欢迎一起讨论

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/15304.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息