#### 试想一个场景:当某个线程存在无线循环执行,当达到某一临界点时,需要人为干预来达到线程终止。

1、线程终止的方法

终止线程的方法,在Thread中已经提供了一些操作方法,譬如:stop、suspend(如下图所示)。虽然这些方法都可终止或挂起一个线程。

线程终止

注意: 到这些方法都已经添加了删除线,为什么呢,原因很简单,这是一种较为霸道或强制中断的方式,可能会导致数据产生异常。假设一个场景:一个线程正在执行,此时调用stop方法强行中断,而当前线程的任务还没执行结束就要强制中断,就好比我们再linux os 下执行kill -9一样,是一种不友好或是不安全的操作。

这时我们就需要一种优雅的线程中断方式,何为优雅呢,主要突出在让线程知道任务执行结束后线程终止。

2、问题思考

引申出一个问题:一个线程在什么情况下是执行结束了呢? 执行结束

分析下这段代码,通过new 一个线程任务,调用start()启动线程,而线程主要执行的内容就是回调run()方法 print。这就是整个流程,那么在run()方法执行完之前,线程一直处于运行状态,直到执行结束,该线程才会被销毁/终止。

3、人为或外部干预实现

上述例子是一个正常情况下线程执行结束终止的流程。 那么本章开头提出的场景,需要人为或外部干预的呢?

先看场景代码:

public class ThreadDemo3 extends Thread{
    public static void main(String[] args) {
        ThreadDemo3 threadDemo3 = new ThreadDemo3();
        threadDemo3.start();
    }

    @Override
    public void run() {
        while (true){
            System.out.println("run--------");
        }
    }
}

针对上述场景代码,我们可以实现通过一个信号来优雅终止,简单调整的代码:

public class ThreadDemo3 extends Thread{
    private static volatile boolean flag = true;
    public static void main(String[] args) {
        ThreadDemo3 threadDemo3 = new ThreadDemo3();
        boolean condition = 1<2;
        if(condition){
            flag = false;
        }
        threadDemo3.start();
    }

    @Override
    public void run() {
        while (ThreadDemo3.flag){
            System.out.println("run--------");
        }
    }
}

其实,没有必要每个这样的场景都要这么写,因为JAVA给我们提供了一个interrupt()方法,该方法就是为了实现线程中断操作,它的原理和上述释放信号基本等同

添加了interrupt()方法的代码:

public class ThreadDemo3Interrupt extends Thread{
    public static void main(String[] args){
        System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
        ThreadDemo3Interrupt interrupt = new ThreadDemo3Interrupt();
        interrupt.start();
        interrupt.setName("interrupt");
        //调用方法,发送一个中断信息
        interrupt.interrupt();
        System.out.println(interrupt.getName()+"---"+interrupt.isInterrupted());

    }

    @Override
    public void run() {
        //优雅的终止,通过isInterrupted()获取中断状态/信号
        while (!Thread.currentThread().isInterrupted()){
            //TODO
            System.out.println("run--------");
        }
    }
}

调用interrupt方法 解析:

  • interrupt()isInterrupted都是实例方法,意味此线程。
  • interrupt.interrupt()调用方法,给interrupt线程发送一个中断信号(即false->true),通知它中断。
  • Thread.currentThread().isInterrupted()获取当前执行线程的中断信号,如果中断,则代表线程执行结束。

4、分析interrupt.interrupt()源码

①、JAVA源码分析:

源码在Thread类

/**
 * 中断此线程
 * 1、
 * 如果该线程在调用Object#wait()、Object#wait(long)、Object#wait(long, int)
 * 、Thread#join()、Thread#join(long)、Thread#join(long, int)、Thread#sleep(long)
 * 、Thread#sleep(long, int)方法,将会抛出InterruptedException,同时清除线程的中断状态;
 * 2、
 * 如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,
 * 线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;
 * 3、
 * 如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,
 * 类似调用wakeup的效果;
 */
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

分析:

  • 如果该线程在调用Object#wait()、Object#wait(long)、Object#wait(long, int)、Thread#join()、Thread#join(long)、Thread#join(long, int)、Thread#sleep(long)、Thread#sleep(long, int)方法,将会抛出InterruptedException,同时清除线程的中断状态;

  • 如果线程堵塞在java.nio.channels.InterruptibleChannel的IO上,Channel将会被关闭,线程被置为中断状态,并抛出java.nio.channels.ClosedByInterruptException;

  • 如果线程堵塞在java.nio.channels.Selector上,线程被置为中断状态,select方法会马上返回,类似调用wakeup的效果;

该方法内部调用interrupt0()

private native void interrupt0();

多线程_01_执行原理 中已经知道JAVA native 和 Hostpot映射关系

{"interrupt0",       "()V",        (void *)&JVM_Interrupt},

②、追踪JVM的JVM_Interrupt

源码在jvm.cpp

JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
  JVMWrapper("JVM_Interrupt");

  // Ensure that the C++ Thread and OSThread structures aren't freed before we operate
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  // We need to re-resolve the java_thread, since a GC might have happened during the
  // acquire of the lock
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr != NULL) {
    Thread::interrupt(thr);
  }
JVM_END

分析: 调用了Thread::interrupt(thr)

③、追踪Thread::interrupt(thr)

进入os_linux.cpp

void os::interrupt(Thread* thread) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");
  //获取系统native线程对象
  OSThread* osthread = thread->osthread();
  //判断中断标识位是否中断
  if (!osthread->interrupted()) {
    // 这里设置中断标识位
    osthread->set_interrupted(true);
    //内存屏障,使osthread的interrupted状态对其它线程立即可见
    OrderAccess::fence();
    //调用了Thread.sleep
    ParkEvent * const slp = thread->_SleepEvent ;
    //调用unpark 唤醒
    if (slp != NULL) slp->unpark() ;
  }

  // JUC的锁 通过unpark唤醒
  if (thread->is_Java_thread())
    ((JavaThread*)thread)->parker()->unpark();
  // synchronized同步块和Object.wait() 通过unpark唤醒
  ParkEvent * ev = thread->_ParkEvent ;
  if (ev != NULL) ev->unpark() ;

}

解析:

  • interrupt() 内部是通过ParkEvent的unpark方法唤醒
  • interrupt() 内部通过set_interrupted设置中断标识位
  • Lock.lock()方法不会响应中断,Lock.lockInterruptibly()方法则会响应中断并抛出异常,区别在于park()等待被唤醒时lock会继续执行park()来等待锁,而 lockInterruptibly会抛出异常;
  • object.wait、Thread.sleep和Thread.join会抛出InterruptedException并清除中断状态;
  • synchronized被唤醒后会尝试获取锁,失败则会通过循环继续park()等待,因此实际上是不会被interrupt()中断的;
  • 一般情况下,抛出异常时,会清空Thread的interrupt状态,即复位,下面会提到。

④、追踪set_interruptedinterrupted

进入osThread.hpp

interrupt

解析: 看到这里,明白上述JAVA例子了吧。

  • hotspot源码也是通过一个volatile jint(0/1) 修饰的变量作为线程中断信号

⑤、分析4、①中的InterruptibleChannel的IO阻塞中断 进入Thread.java

private volatile Interruptible blocker;
private final Object blockerLock = new Object();

void blockedOn(Interruptible b) {
    synchronized (blockerLock) {
        blocker = b;
    }
}
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

解析:

  • 这段代码有一个声明Interruptible类型的变量blocker,可以通过blockedOn赋值。
  • 如果一个nio通道实现了InterruptibleChannel接口,就可以响应interrupt()中断,其原理就在InterruptibleChannel接口的抽象实现类AbstractInterruptibleChannelbegin方法

⑥探究AbstractInterruptibleChannel

进入AbstractInterruptibleChannel.java

public abstract class AbstractInterruptibleChannel
    implements Channel, InterruptibleChannel
{
    private Interruptible interruptor;
    private volatile Thread interrupted;
    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                    public void interrupt(Thread target) {
                        synchronized (closeLock) {
                            if (!open)
                                return;
                            open = false;
                            interrupted = target;
                            try {
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) { }
                        }
                    }};
        }
        blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }

    protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        blockedOn(null);
        Thread interrupted = this.interrupted;
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException();
        }
        if (!completed && !open)
            throw new AsynchronousCloseException();
    }

    static void blockedOn(Interruptible intr) {         // package-private
        sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),
                                                             intr);
    }
}

接着,探究beginend 方法在哪里调用,进入java.nio.channels.Channels

public final class Channels {
    private static class ReadableByteChannelImpl
        extends AbstractInterruptibleChannel
        implements ReadableByteChannel
    {
        public int read(ByteBuffer dst) throws IOException {
            ......
                synchronized (readLock) {
                while (totalRead < len) {
                    ......
                        try {
                        begin();
                        bytesRead = in.read(buf, 0, bytesToRead);
                    } finally {
                        end(bytesRead > 0);
                    }
                    ......
                }
                ......
            }
        }
    }
    private static class WritableByteChannelImpl
        extends AbstractInterruptibleChannel
        implements WritableByteChannel
    {......
        public int write(ByteBuffer src) throws IOException {
            int len = src.remaining();
            int totalWritten = 0;
            synchronized (writeLock) {
                while (totalWritten < len) {
                    int bytesToWrite = Math.min((len - totalWritten),
                                                TRANSFER_SIZE);
                    if (buf.length < bytesToWrite)
                        buf = new byte[bytesToWrite];
                    src.get(buf, 0, bytesToWrite);
                    try {
                        begin();
                        out.write(buf, 0, bytesToWrite);
                    } finally {
                        end(bytesToWrite > 0);
                    }
                    totalWritten += bytesToWrite;
                }
                return totalWritten;
            }
        }
        ......
    }
}

解析: ReadableByteChannel每次执行阻塞方法read()前,都会执行begin(),把Interruptible回调接口注册到当前线程上。当线程中断时,Thread.interrupt()触发回调接口Interruptible关闭io通道,导致read方法返回,最后在finally块中执行end()方法检查中断标记,抛出ClosedByInterruptException;

⑦、探究java.nio.channels.Selector

进入java.nio.channels.Selector,没有找到,去子类java.nio.channels.spi.AbstractSelector

public abstract class AbstractSelector
    extends Selector
{
    private Interruptible interruptor = null;
    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                    public void interrupt(Thread ignore) {
                        AbstractSelector.this.wakeup();
                    }};
        }
        AbstractInterruptibleChannel.blockedOn(interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted())
            interruptor.interrupt(me);
    }

    protected final void end() {
        AbstractInterruptibleChannel.blockedOn(null);
    }
}

解析: 找到调用beginend 的类,这里注意OS环境差异.

  • windows 是DefaultSelectorProvider->SelectorProvider->SelectorProviderImpl->WindowsSelectorProvider->WindowsSelectorImpl->doSelect()
  • linux 是DefaultSelectorProvider->SelectorProvider->SelectorProviderImpl->EPollSelectorProvider->EPollSelectorImpl->doSelect()

下面的代码是EPollSelectorImpl.doSelect()

protected int doSelect(long timeout) throws IOException {
        ......
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        ......
    }

解析:

  • begin()通过blockedOn方法将前线程的blocker变量设为Interruptible类实例
  • 当线程中断时,会通过Interruptible类的interrupt间接调用wakeup方法
  • 使得pollWrapper.poll方法调用epoll底层实现返回

至此 interrupt 分析结束

5、分析interrupted

public class ThreadDemo3Interrupt extends Thread{
    public static void main(String[] args){
        //1、获取当前线程 即main线程的中断状态,false
        System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
        ThreadDemo3Interrupt interrupt = new ThreadDemo3Interrupt();
        interrupt.start();
        interrupt.setName("interrupt");
        //调用方法,发送一个中断信号
        interrupt.interrupt();
        //2、当前线程 即main线程 发送中断信号
        Thread.currentThread().interrupt();
        //3、获取当前线程 即main线程的中断状态,true
        System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
        //4、获取当前线程 即main线程的中断状态 并且 复位
        System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
        //5、上面的复位已经清除了中断状态 为默认 false
        System.out.println(Thread.currentThread().getName()+"---"+Thread.currentThread().isInterrupted());
        //6、获取当前线程 即main线程的中断状态
        System.out.println(Thread.currentThread().getName()+"---"+Thread.interrupted());
        System.out.println(interrupt.getName()+"---"+interrupt.isInterrupted());

    }

    @Override
    public void run() {
        //优雅的终止,通过isInterrupted()获取中断状态/信号
        while (!Thread.currentThread().isInterrupted()){
            //TODO
            System.out.println("run--------");
        }
    }
}

解析:

  • interruptedstatic方法,并且返回值类型为boolean类型,并且会主动复位(将线程的中断状态设置为默认状态 false)

①、从JAVA源码层面了解interrupted

public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

public static native Thread currentThread();

private native boolean isInterrupted(boolean ClearInterrupted);

解析: interrupted方法内部调用的是native 封装的isInterrupted,接下来依据多线程_01_执行原理 中提到的JAVA native 和 Hostpot映射关系

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (void *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"countStackFrames", "()I",        (void *)&JVM_CountStackFrames},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"isInterrupted",    "(Z)Z",       (void *)&JVM_IsInterrupted},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

②、深入JVM探究JVM_IsInterrupted

进入jvm.cpp

JVM_QUICK_ENTRY(jboolean, JVM_IsInterrupted(JNIEnv* env, jobject jthread, jboolean clear_interrupted))
  JVMWrapper("JVM_IsInterrupted");
  oop java_thread = JNIHandles::resolve_non_null(jthread);
  MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
  JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
  if (thr == NULL) {
    return JNI_FALSE;
  } else {
    return (jboolean) Thread::is_interrupted(thr, clear_interrupted != 0);
  }
JVM_END

解析: JVM_IsInterrupted内部调用Thread::is_interrupted

②、探究Thread::is_interrupted 进入os_linux.cpp

bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
  assert(Thread::current() == thread || Threads_lock->owned_by_self(),
    "possibility of dangling Thread pointer");

  OSThread* osthread = thread->osthread();

  bool interrupted = osthread->interrupted();

  if (interrupted && clear_interrupted) {
    osthread->set_interrupted(false);
    // consider thread->_SleepEvent->reset() ... optional optimization
  }

  return interrupted;
}

解析: 可以看出其方法内部先是取出当前线程,然后调用interrupted()的值。判断是否中断状态,然后set_interrupted(false)复位(将中断状态设置为false),最后返回interrupted()的值。

6、分析InterruptedException

将上述 添加interrupt方法这一步骤的代码略作调整。

public class ThreadDemo3Interrupt extends Thread{
    public static void main(String[] args) throws InterruptedException {
        ThreadDemo3Interrupt interrupt = new ThreadDemo3Interrupt();
        interrupt.start();
        //先运行3s
        Thread.sleep(3000);
        //调用方法,发送一个中断信息
        interrupt.interrupt();

    }

    @Override
    public void run() {
        //优雅的终止,通过isInterrupted()获取中断状态/信号
        while (!Thread.currentThread().isInterrupted()){
            try {
                //1s后中断
                Thread.sleep(1000);
                //TODO
                System.out.println("run--------");
            } catch (InterruptedException e) {
                System.out.println(123);
                e.printStackTrace();
            }

        }
    }
}

执行结果如下图:

InterruptedException

解析:

  • InterruptedException是一个必须被捕获的异常
  • 从控制台的打印信息可以看出,线程执行了一段时间后收到了中断信息,sleep,然后被InterruptedException中断了sleep ,而不是中断线程,并且捕获到异常,打印了catch块内容。但是随后又继续执行线程,且不中断。这是为什么呢?

①、带着问题进入Thread.sleep(long)

进入Thread.java

public static native void sleep(long millis) throws InterruptedException;

②、根据nativejvm映射探究JVM_Sleep

进入jvm.cpp

JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
  JVMWrapper("JVM_Sleep");

  if (millis < 0) {
    THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
  //判断并清除线程中断状态,如果中断状态为true,抛出中断异常
  if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
   //抛出InterruptedException异常
   THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
  }
......

解析: sleep 调用is_interrupted 和第五步 一样的原理,然后 抛出了InterruptedException异常。

故:得出结论,InterruptedExceptioninterrupted异曲同工之妙,只不过前者是被动,后者是主动