多线程_03_线程终止
1、线程终止的方法
终止线程的方法,在Thread中已经提供了一些操作方法,譬如:(如下图所示)。虽然这些方法都可终止或挂起一个线程。stop、suspend
注意:
到这些方法都已经添加了删除线,为什么呢,原因很简单,这是一种较为霸道或强制中断的方式,可能会导致数据产生异常。假设一个场景:一个线程正在执行,此时调用stop方法强行中断,而当前线程的任务还没执行结束就要强制中断,就好比我们再linux os 下执行kill -9
一样,是一种不友好或是不安全的操作。
这时我们就需要一种优雅的线程中断方式,何为优雅呢,主要突出在让线程知道任务执行结束后线程终止。
2、问题思考
引申出一个问题:一个线程在什么情况下是执行结束了呢?
分析下这段代码,通过new 一个线程任务,调用start()
启动线程,而线程主要执行的内容就是回调run()
方法 print。这就是整个流程,那么在run()
方法执行完之前,线程一直处于运行状态,直到执行结束,该线程才会被销毁/终止。
3、人为或外部干预实现
上述例子是一个正常情况下线程执行结束终止的流程。 那么本章开头提出的场景,需要人为或外部干预的呢?
先看场景代码:
public class ThreadDemo3 extends Thread{
public static void main(String[] args) {
ThreadDemo3 threadDemo3 = new ThreadDemo3();
threadDemo3.start();
}
@Override
public void run() {
while (true){
System.out.println("run--------");
}
}
}
针对上述场景代码,我们可以实现通过一个信号来优雅终止,简单调整的代码:
public class ThreadDemo3 extends Thread{
private static volatile boolean flag = true;
public static void main(String[] args) {
ThreadDemo3 threadDemo3 = new ThreadDemo3();
boolean condition = 1<2;
if(condition){
flag = false;
}
threadDemo3.start();
}
@Override
public void run() {
while (ThreadDemo3.flag){
System.out.println("run--------");
}
}
}
其实,没有必要每个这样的场景都要这么写,因为JAVA给我们提供了一个interrupt()
方法,该方法就是为了实现线程中断操作,它的原理和上述释放信号基本等同。
添加了interrupt()
方法的代码:
public class ThreadDemo3Interrupt extends Thread{
public static void main(String[] args){
System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
ThreadDemo3Interrupt interrupt = new ThreadDemo3Interrupt();
interrupt.start();
interrupt.setName("interrupt");
//调用方法,发送一个中断信息
interrupt.interrupt();
System.out.println(interrupt.getName()+"---"+interrupt.isInterrupted());
}
@Override
public void run() {
//优雅的终止,通过isInterrupted()获取中断状态/信号
while (!Thread.currentThread().isInterrupted()){
//TODO
System.out.println("run--------");
}
}
}
解析:
interrupt()
和isInterrupted
都是实例方法,意味此线程。interrupt.interrupt()
调用方法,给interrupt
线程发送一个中断信号(即false->true),通知它中断。Thread.currentThread().isInterrupted()
获取当前执行线程的中断信号,如果中断,则代表线程执行结束。
4、分析interrupt.interrupt()
源码
①、JAVA源码分析:
源码在Thread类
/**
* 中断此线程
* 1、
* 如果该线程在调用Object#wait()、Object#wait(long)、Object#wait(long, int)
* 、Thread#join()、Thread#join(long)、Thread#join(long, int)、Thread#sleep(long)
* 、Thread#sleep(long, int)方法,将会抛出InterruptedException,同时清除线程的中断状态;
* 2、
* 如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,
* 线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;
* 3、
* 如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,
* 类似调用wakeup的效果;
*/
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
分析:
-
如果该线程在调用Object#wait()、Object#wait(long)、Object#wait(long, int)、Thread#join()、Thread#join(long)、Thread#join(long, int)、Thread#sleep(long)、Thread#sleep(long, int)方法,将会抛出InterruptedException,同时清除线程的中断状态;
-
如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;
-
如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,类似调用wakeup的效果;
该方法内部调用interrupt0()
,
private native void interrupt0();
在多线程_01_执行原理 中已经知道JAVA native 和 Hostpot映射关系
{"interrupt0", "()V", (void *)&JVM_Interrupt},
②、追踪JVM的JVM_Interrupt
源码在jvm.cpp
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END
分析:
调用了Thread::interrupt(thr)
③、追踪Thread::interrupt(thr)
进入os_linux.cpp
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//获取系统native线程对象
OSThread* osthread = thread->osthread();
//判断中断标识位是否中断
if (!osthread->interrupted()) {
// 这里设置中断标识位
osthread->set_interrupted(true);
//内存屏障,使osthread的interrupted状态对其它线程立即可见
OrderAccess::fence();
//调用了Thread.sleep
ParkEvent * const slp = thread->_SleepEvent ;
//调用unpark 唤醒
if (slp != NULL) slp->unpark() ;
}
// JUC的锁 通过unpark唤醒
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
// synchronized同步块和Object.wait() 通过unpark唤醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
解析:
interrupt()
内部是通过ParkEvent的unpark方法唤醒interrupt()
内部通过set_interrupted
设置中断标识位- Lock.lock()方法不会响应中断,Lock.lockInterruptibly()方法则会响应中断并抛出异常,区别在于park()等待被唤醒时lock会继续执行park()来等待锁,而 lockInterruptibly会抛出异常;
- object.wait、Thread.sleep和Thread.join会抛出InterruptedException并清除中断状态;
- synchronized被唤醒后会尝试获取锁,失败则会通过循环继续park()等待,因此实际上是不会被interrupt()中断的;
- 一般情况下,抛出异常时,会清空Thread的interrupt状态,即复位,下面会提到。
④、追踪set_interrupted
和interrupted
进入osThread.hpp
解析: 看到这里,明白上述JAVA例子了吧。
- hotspot源码也是通过一个
volatile jint(0/1)
修饰的变量作为线程中断信号
⑤、分析4、①中的InterruptibleChannel的IO阻塞中断
进入Thread.java
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
void blockedOn(Interruptible b) {
synchronized (blockerLock) {
blocker = b;
}
}
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0();
b.interrupt(this);
return;
}
}
interrupt0();
}
解析:
- 这段代码有一个声明
Interruptible
类型的变量blocker
,可以通过blockedOn
赋值。 - 如果一个nio通道实现了
InterruptibleChannel
接口,就可以响应interrupt()
中断,其原理就在InterruptibleChannel
接口的抽象实现类AbstractInterruptibleChannel
的begin
方法
⑥探究AbstractInterruptibleChannel
进入AbstractInterruptibleChannel.java
public abstract class AbstractInterruptibleChannel
implements Channel, InterruptibleChannel
{
private Interruptible interruptor;
private volatile Thread interrupted;
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end(boolean completed)
throws AsynchronousCloseException
{
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
static void blockedOn(Interruptible intr) { // package-private
sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),
intr);
}
}
接着,探究begin
和 end
方法在哪里调用,进入java.nio.channels.Channels
public final class Channels {
private static class ReadableByteChannelImpl
extends AbstractInterruptibleChannel
implements ReadableByteChannel
{
public int read(ByteBuffer dst) throws IOException {
......
synchronized (readLock) {
while (totalRead < len) {
......
try {
begin();
bytesRead = in.read(buf, 0, bytesToRead);
} finally {
end(bytesRead > 0);
}
......
}
......
}
}
}
private static class WritableByteChannelImpl
extends AbstractInterruptibleChannel
implements WritableByteChannel
{......
public int write(ByteBuffer src) throws IOException {
int len = src.remaining();
int totalWritten = 0;
synchronized (writeLock) {
while (totalWritten < len) {
int bytesToWrite = Math.min((len - totalWritten),
TRANSFER_SIZE);
if (buf.length < bytesToWrite)
buf = new byte[bytesToWrite];
src.get(buf, 0, bytesToWrite);
try {
begin();
out.write(buf, 0, bytesToWrite);
} finally {
end(bytesToWrite > 0);
}
totalWritten += bytesToWrite;
}
return totalWritten;
}
}
......
}
}
解析:
ReadableByteChannel
每次执行阻塞方法read()
前,都会执行begin()
,把Interruptible
回调接口注册到当前线程上。当线程中断时,Thread.interrupt()
触发回调接口Interruptible
关闭io
通道,导致read
方法返回,最后在finally
块中执行end()
方法检查中断标记,抛出ClosedByInterruptException
;
⑦、探究java.nio.channels.Selector
进入java.nio.channels.Selector,没有找到,去子类java.nio.channels.spi.AbstractSelector
public abstract class AbstractSelector
extends Selector
{
private Interruptible interruptor = null;
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
AbstractSelector.this.wakeup();
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end() {
AbstractInterruptibleChannel.blockedOn(null);
}
}
解析:
找到调用begin
和 end
的类,这里注意OS环境差异.
- windows
是DefaultSelectorProvider->SelectorProvider->SelectorProviderImpl->WindowsSelectorProvider->WindowsSelectorImpl->doSelect()
- linux
是DefaultSelectorProvider->SelectorProvider->SelectorProviderImpl->EPollSelectorProvider->EPollSelectorImpl->doSelect()
下面的代码是EPollSelectorImpl.doSelect()
protected int doSelect(long timeout) throws IOException {
......
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
......
}
解析:
begin()
通过blockedOn
方法将前线程的blocker变量设为Interruptible
类实例- 当线程中断时,会通过
Interruptible
类的interrupt
间接调用wakeup
方法 - 使得
pollWrapper.poll
方法调用epoll
底层实现返回
至此 interrupt 分析结束
5、分析interrupted
public class ThreadDemo3Interrupt extends Thread{
public static void main(String[] args){
//1、获取当前线程 即main线程的中断状态,false
System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
ThreadDemo3Interrupt interrupt = new ThreadDemo3Interrupt();
interrupt.start();
interrupt.setName("interrupt");
//调用方法,发送一个中断信号
interrupt.interrupt();
//2、当前线程 即main线程 发送中断信号
Thread.currentThread().interrupt();
//3、获取当前线程 即main线程的中断状态,true
System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
//4、获取当前线程 即main线程的中断状态 并且 复位
System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
//5、上面的复位已经清除了中断状态 为默认 false
System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
//6、获取当前线程 即main线程的中断状态
System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
System.out.println(interrupt.getName()+"---"+interrupt.isInterrupted());
}
@Override
public void run() {
//优雅的终止,通过isInterrupted()获取中断状态/信号
while (!Thread.currentThread().isInterrupted()){
//TODO
System.out.println("run--------");
}
}
}
解析:
interrupted
是static
方法,并且返回值类型为boolean
类型,并且会主动复位(将线程的中断状态设置为默认状态false
)
①、从JAVA源码层面了解interrupted
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
public static native Thread currentThread();
private native boolean isInterrupted(boolean ClearInterrupted);
解析:
interrupted
方法内部调用的是native
封装的isInterrupted
,接下来依据多线程_01_执行原理 中提到的JAVA native 和 Hostpot映射关系
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};
②、深入JVM探究JVM_IsInterrupted
进入jvm.cpp
JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
JVMWrapper("JVM_IsInterrupted");
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr == NULL) {
return JNI_FALSE;
} else {
return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
}
JVM_END
解析:
JVM_IsInterrupted
内部调用Thread::is_interrupted
②、探究Thread::is_interrupted
进入os_linux.cpp
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
bool interrupted = osthread->interrupted();
if (interrupted && clear_interrupted) {
osthread->set_interrupted(false);
// consider thread->_SleepEvent->reset() ... optional optimization
}
return interrupted;
}
解析:
可以看出其方法内部先是取出当前线程,然后调用interrupted()
的值。判断是否中断状态,然后set_interrupted(false)
复位(将中断状态设置为false),最后返回interrupted()
的值。
6、分析InterruptedException
将上述 添加interrupt
方法这一步骤的代码略作调整。
public class ThreadDemo3Interrupt extends Thread{
public static void main(String[] args) throws InterruptedException {
ThreadDemo3Interrupt interrupt = new ThreadDemo3Interrupt();
interrupt.start();
//先运行3s
Thread.sleep(3000);
//调用方法,发送一个中断信息
interrupt.interrupt();
}
@Override
public void run() {
//优雅的终止,通过isInterrupted()获取中断状态/信号
while (!Thread.currentThread().isInterrupted()){
try {
//1s后中断
Thread.sleep(1000);
//TODO
System.out.println("run--------");
} catch (InterruptedException e) {
System.out.println(123);
e.printStackTrace();
}
}
}
}
执行结果如下图:
解析:
InterruptedException
是一个必须被捕获的异常- 从控制台的打印信息可以看出,线程执行了一段时间后收到了中断信息,sleep,然后被
InterruptedException
中断了sleep ,而不是中断线程,并且捕获到异常,打印了catch块内容。但是随后又继续执行线程,且不中断。这是为什么呢?
①、带着问题进入Thread.sleep(long)
进入Thread.java
public static native void sleep(long millis) throws InterruptedException;
②、根据native
和jvm
映射探究JVM_Sleep
进入jvm.cpp
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//判断并清除线程中断状态,如果中断状态为true,抛出中断异常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
//抛出InterruptedException异常
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
......
解析:
sleep
调用is_interrupted
和第五步 一样的原理,然后 抛出了InterruptedException
异常。
故:得出结论,InterruptedException
与interrupted
异曲同工之妙,只不过前者是被动,后者是主动。
- 感谢你赐予我前进的力量