注册

Java “优雅”地中断线程(原理篇)

前言

之前有分析过如何优雅地中断线程,秉着"既要知其然,也要知其所以然"精神,本篇将从底层源码分析中断是如何工作的。 通过本篇文章,你将了解到:

1、线程底层源码入口
2、中断的作用
3、Thread.sleep/Object.join/Object.wait 对中断的处理
4、究竟该如何停止线程
5、疑难点分析

1、线程底层源码入口

Java 层的Thread

以Thread.java里的方法Thread.interrupt()为例,最终调用了interrupt0()方法:

    private native void interrupt0();

可以看出,是native方法,接下来看看怎么找到其JNI实现。

JNI 入口

在Thread.c里定义了JNI 方法: image.png interrupt0()方法对应JVM_Interrupt()函数。 在jvm.cpp里:

#jvm.cpp
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");

oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
//调用Thread.cpp里的函数
Thread::interrupt(thr);
}
JVM_END

继续跟进Thread.cpp:

#Thread.cpp
void Thread::interrupt(Thread* thread) {
//调用os里的函数
os::interrupt(thread);
}

最终调用了os里的函数。

2、中断的作用

中断源码分析

入口找到了,接着继续深入分析。上面分析到了os::interrupt(thread),os是区分系统的,此处以Linux系统为例:

#os_linux.cpp
void os::interrupt(Thread* thread) {
...
OSThread* osthread = thread->osthread();

//中断标记位没有设置
if (!osthread->interrupted()) {
//则设置中断标记位为true
osthread->set_interrupted(true);
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
//唤醒线程,对应sleep挂起
if (slp != NULL) slp->unpark() ;
}

//唤醒线程,对应wait/join 操作挂起等
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();

//唤醒线程,对应synchronized 获取锁挂起
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}

显然,在Java 层调用Thread.interrupt()方法,最终底层完成了两件事:

1、将中断标记位设置为true。
2、将挂起的线程唤醒。

image.png

中断状态查询

Java 层的Thread.java里提供了两个方法来查询中断标记位的值,分别是:

#Thread.java
//成员方法
public boolean isInterrupted() {
return isInterrupted(false);
}

//静态方法
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

无论是成员方法还是静态方法,最终都调用了Thread.isInterrupted(xx)方法:

#Thread.java
//ClearInterrupted 表示是否清空中断标记位
private native boolean isInterrupted(boolean ClearInterrupted);

可以看出:

1、成员方法isInterrupted()没有清空中断标记位。
2、静态方法interrupted()清空了中断标记位。

继续跟进isInterrupted(xx)方法,由上面跟踪的入口经验最终有如下代码:

#Thread.cpp
bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
return os::is_interrupted(thread, clear_interrupted);
}

#os_linux.cpp
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
OSThread* osthread = thread->osthread();
//查询当前中断值
bool interrupted = osthread->interrupted();

if (interrupted && clear_interrupted) {
//如果参数clear_interrupted 为true,表示要清空标记位
//则设置标记位为false
osthread->set_interrupted(false);
}

//返回查询到的中断标记位的值
return interrupted;
}

因此,Thread.isInterrupted(xx)方法的作用是:

1、查询当前线程的中断标记位的值。
2、根据参数决定是否重置中断标记。

而Thread.isInterrupted(xx) 是私有方法,因此Thread.java 对外提供了两种方法:Thread.isInterrupted()(成员方法)和Thread.interrupted(静态方法)。

3、Thread.sleep/Object.join/Object.wait 对中断的处理

中断线程Demo

既然知道了中断的作用,接着来看看如何使用Thread.interrupt()来中断线程,先来看看个Demo。

public class TestThread {
public static void main(String args[]) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println("t1 is alive " + System.currentTimeMillis());
}
}
});

t1.start();

try {
//保证t1运行一会
Thread.sleep(2000);
//中断t1
t1.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}

while (true) {
System.out.println("t1 interrupt status:" + t1.isInterrupted() + " t1 isAlive:" + t1.isAlive() + " " + System.currentTimeMillis());
}
}
}

该Demo的目的是:

1、先开启线程t1,让t1不断循环打印时间。
2、在另一个线程(主线程)中断t1,并不断查询t1中断标记位的值。

如果没有分析上面的中断原理,你可能会认为t1应该被中断退出了循环。实际上从打印结果可知,t1的中断标记位被设置为true,然而t1并没有退出循环。这结果符合我们的原理分析,因为中断线程的操作底层只做了两件事:设置中断标记位和唤醒线程。 在上面的例子里,t1并没有被挂起,因此唤醒线程没啥实际意义。 总而言之:

"中断线程"这个词听起来比较霸气,让人误以为就是让一个线程停止运行。实际上线程的停止与否并不是它控制的,而是线程执行过程中主动退出或是有异常抛出。

将上面的Demo稍加修改如下:

public class TestThread {
public static void main(String args[]) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println("t1 is alive " + System.currentTimeMillis());
//睡眠一会再执行
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("t1 catch exception:" + e.getMessage());
}
}
});

t1.start();

try {
//保证t1运行一会
Thread.sleep(2000);
//中断t1
t1.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}

while (true) {
System.out.println("t1 interrupt status:" + t1.isInterrupted() + " t1 isAlive:" + t1.isAlive() + " " + System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

相比上一个Demo,仅仅是在t1里增加1s的睡眠时间,打印结果如下: image.png 可以看出,t1被中断后因为抛出了异常而退出了循环,其中断标记位为false,线程已经停止了运行。

Thread.sleep 源码解析

通过比对上面两个Demo可知,引起结果不同是因为增加了Thread.sleep(xx)方法,因此来看看它内部到底做了什么。

#jvm.cpp
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
//在睡眠之前先检查是否已经发生了中断,若是则抛出中断异常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
...

if (millis == 0) {
...
} else {
ThreadState old_state = thread->osthread()->get_state();
thread->osthread()->set_state(SLEEPING);
if (os::sleep(thread, millis, true) == OS_INTRPT) {
//睡眠结束后,判断是否已经发生了中断,若是则抛出中断异常
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
}
thread->osthread()->set_state(old_state);
}
...
JVM_END

#os_linux.cpp
int os::sleep(Thread* thread, jlong millis, bool interruptible) {
assert(thread == Thread::current(), "thread consistency check");

//_SleepEvent 是ParkEvent类型
ParkEvent * const slp = thread->_SleepEvent ;
...
//interruptible 表示是否支持中断,默认支持
if (interruptible) {
...
for (;;) {
//此处是死循环,退出依靠return或者break
if (os::is_interrupted(thread, true)) {
//再次判断是否已经发生中断,若是则返回OS_INTRPT
//该值在外层判断
return OS_INTRPT;
}

//时间耗尽,则退出
if(millis <= 0) {
return OS_OK;
}
...
{
//挂起线程
slp->park(millis);
//线程被唤醒后继续循环
}
}
} else {
for (;;) {
...
//时间耗尽,则退出
if(millis <= 0) break ;

prevtime = newtime;
slp->park(millis);
}
return OS_OK ;
}
}

可以看出,Thread.sleep(xx)方法作用如下:

1、线程挂起一定的时间,时间到达后继续执行循环。
2、interruptible==true场景下,在循环里判断是否发生了中断,若是,则抛出中断异常。

再来分析上面的Demo:

1、线程调用Thread.sleep(xx)进行睡眠,此时线程被挂起。
2、外部调用Thread.interrupt()中断该线程,此时中断标记位值为true。
3、线程被唤醒,唤醒后判断是否发生了中断(通过中断标记位),若是则抛出异常。

虽然Thread.interrupt()没有直接停止线程,但是可以利用中断标记位来查看是否发生过中断的动作,根据这个动作来决定是否停止线程的执行。 而对于Thread.sleep(xx)来说,作为一个公共方法,当检测到中断时,抛出中断异常让外部处理。

以上分析了Thread.sleep(xx)原理,还有个小细节: 当外界调用Thread.interrupt()并捕获了中断异常的时候,此时线程的中断标记位的值位false,这个在哪修改的呢? 判断中断标记时使用了如下代码:

os::is_interrupted(thread, true)

第二个参数表示重置当前中断标记位的值,该函数上面已经分析过。

调用Object.join/Object.wait 挂起线程,此时线程若被中断,表现与Thread.sleep差不多,此处就不再展开分析了。

4、究竟该如何停止线程

还是来看Demo:

public class TestThread {
static volatile boolean isCancelThread = false;
public static void main(String args[]) {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
while(!isCancelThread || !Thread.currentThread().isInterrupted()) {
try {
doSomething1();
doSomething2();
} catch (Exception e) {
if (e instanceof InterruptedException) {
isCancelThread = true;
}
}
}
}
});

t1.start();

//另一个线程两种方式停止线程
//1、设置isCancelThread = true
//2、调用t1.interrupt()
}

private static void doSomething1() {
//具体逻辑
}

private static void doSomething2() {
//具体逻辑
}
}

来分析Demo逻辑: doSomething1() 、doSomething2() 可能是自己写的方法,也可能是其它人提供的方法,它们内部可能会使得线程阻塞。

第一: 外部通过设置isCancelThread 来标记是否让线程停止运行。这种写法有个缺陷,若是线程在调用doSomething1() 或doSomething2()时阻塞,那么将不会立即检测isCancelThread的值,也即是不能立即停止线程。

第二 针对doSomething1() 或doSomething2()可能阻塞的问题,外部通过使用 Thread.interrupt()中断线程,此时需要捕获异常,捕获到了中断异常意味着可以停止线程运行了。 当然,如果你不想额外使用isCancelThread标记,可以直接判断中断标记位:Thread.currentThread().isInterrupted(),此时Demo再改改:

    while(!Thread.currentThread().isInterrupted()) {
try {
doSomething1();
doSomething2();
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}

为什么要额外在catch里增加中断动作呢?原因是中断时可能会遇到sleep/wait/join 等方法将中断标记位的值置为false,此处再次中断是为了让while感知到中断已经发生过了,从而退出循环。

5、疑难点分析

网上有些文章将线程能不能中断总结为:

1、若是线程处在RUNNABLE状态,则interrupt不能中断线程。 2、若是线程处在WAITING/WAITING 则interrupt能中断线程。

通过上面的分析,相信你已经知道了这种说法并不严谨。 来看个Demo,Thread处在RUNNABLE时调用interrupt()中断:

public class TestThread {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
doSomething();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();


Thread.sleep(2000);

t1.interrupt();
System.out.println("main thread interrupt Thread t1");

}

private static void doSomething() throws InterruptedException{
while(true) {
System.out.println("thread state:" + Thread.currentThread().getState() + " " + System.currentTimeMillis());
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
}
}
}

显然Thread1 一直处在RUNNABLE 状态,但是调用interrupt()后Thread1停止了。 再来看来看个Demo,Thread处在WAITING时调用interrupt()中断:

public class TestThread {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
doSomething();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();

long count = 100000;
while(count >= 0) {
System.out.println("thread state:" + t1.getState() + " " + System.currentTimeMillis());
count--;
}

t1.interrupt();
System.out.println("main thread interrupt Thread t1");
while (true) {
Thread.sleep(1000);
System.out.println("after interrupt thread state:" + t1.getState() + " " + System.currentTimeMillis());
}

}

private static void doSomething() throws InterruptedException{
while(true) {
LockSupport.park();
}
}
}

显然Thread1 一直处在WAITING 状态,但是调用interrupt()后Thread1却没停止。 问题的根源是:

线程停止与否要看线程执行体里的代码(方法/代码块)有没有检测中断,并且检测之后是否有处理中断(抛出异常/return 出正常流程)。

因此更严谨的说法是:线程是否能被中断取决于它是否检测并处理了中断状态。 这在AQS里实现可中断锁/不可中断锁时会充分体现。

本文基于JDK 1.8,运行环境也是jdk1.8。

0 个评论

要回复文章请先登录注册