026、线程基础(Thinking in Java)

Par @Martin dans le
Tags :

进程和线程:

  • 进程, 线程的容器, 有独立的运行空间
  • 线程, CPU 最小执行单元, 通过 CPU 的时间片切换来实现多线程”并行”

在 Java 中, 最底层地创建线程有两种方案:

  • 继承 Thread 类
  • 实现 Runable 接口

实际开发中, 我们其实很少用这两种方案, 而是使用 java.util.concurrent (并发包), 主要包含消息队列、原子量、并发集合、同步器、可重入锁、线程池等.

java.util.concurrent (并发包)

Executor

Executor 是 Java 中启动线程的优选方案.
它 是 Java SE5的 java.util.concurrent 包中的执行器, 它可以用来管理 Thread 对象, 相当于一个中间件.

Java 通过 java.util.concurrent.Executors 提供四种线程池, 分别为:

  • newCachedThreadPool 创建一个可缓存线程池, 如果线程池长度超过处理需要, 可灵活回收空闲线程, 若无可回收, 则新建线程.
  • newFixedThreadPool 创建一个定长线程池, 可控制线程最大并发数, 超出的线程会在队列中等待.
  • newScheduledThreadPool 创建一个定长线程池, 支持定时及周期性任务执行 (多数情况下可用来替代 Timer 类).
  • newSingleThreadExecutor 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务, 保证所有任务按照指定顺序 (FIFO, LIFO, 优先级) 执行.

线程的数量通常和当前 CPU 核心数一致比较好, 通过 Runtime.getRuntime().availableProcessors() 可以获得当前 CPU 核心数 (注意不是 CPU 线程数, 有的 CPU 是什么 4 核 8 线程).

要使用 Executor, 首先要定义任务, 然后用 Executor 去”驱动”它.

定义任务

// 显示 LiftOff(发射) 前的倒计时
class LiftOff implements Runnable {
    private int countDown = 10; // default 倒数次数
    private static int taskCount = 0;
    private final int id = taskCount++; // 记录线程 id

    public LiftOff() {
    }

    public LiftOff(int countDown) {
        this.countDown = countDown;
    }

    public String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "). ";
    }

    public void run() {
        for (; countDown >= 0;  countDown--) {
            System.out.print(status());
            Thread.yield();
        }
    }
}


稍微解释下, id 属性用来区分 LiftOff 任务的多个实例(线程), 所以它被定义成 final 的.

Thread.yield() 静态方法的作用是对 线程调度器 的一种建议, 它是在声明此刻我(当前线程)放弃对 CPU 的占用权, 现在正是你(CPU)切换时间片的大好时机.

注意, Thread.yield() 虽然让 CPU 立即执行一次切换, 去执行其他线程, 但是这个”其他”也包含当前线程…

当然, LiftOff 虽然实现了 Runnable, 但并不意味着它必须用新线程去驱动, 它仍然可以单独的使用.

public class MainThread {
    public static void main(String[] args) {
        LiftOff launch = new LiftOff();
        launch.run();
    }
}


输出结果:

del *.class

javac MainThread.java

java MainThread

#0(10). #0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(Liftoff!).


使用 Executor 驱动

import java.util.concurrent.*;

public class MainThread {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();

        for (int i = 0; i < 5; i++) {
            exec.execute(new LiftOff());
        }
        exec.shutdown();
    }
}


执行结果如下:

javac MainThread.java

java MainThread

#0(10). #1(10). #2(10). #3(10). #2(9). #2(8). #2(7). #2(6).
#2(5). #1(9). #4(10). #0(9). #4(9). #1(8). #2(4). #3(9). #2(3).
#1(7). #1(6). #4(8). #4(7). #4(6). #4(5). #0(8). #0(7). #4(4).
#1(5). #1(4). #2(2). #3(8). #3(7). #2(1). #1(3). #1(2). #1(1).
#1(Liftoff!). #4(3). #0(6). #4(2). #2(Liftoff!). #3(6). #4(1).
#0(5). #4(Liftoff!). #3(5). #3(4). #3(3). #3(2). #0(4). #3(1).
#0(3). #3(Liftoff!). #0(2). #0(1). #0(Liftoff!).


单个的 Executor 就可以被用来创建和管理系统中所有的任务.

ExecutorService 接口提供了管理终止的方法(具有生命周期的 Executor), 它知道如何构建恰当的上下文来执行 Runnable 对象.
一个 ExecutorService 的生命周期有三种状态: 运行、关闭、终止.
Executor 创建时处于运行状态, 当调用 shutdown() 后,处于关闭状态, isShutdown() 方法返回 true,这时, 不应该再向 Executor 中添加任务, 所有已添加的任务执行完毕后, Executor 处于终止状态, isTerminated() 返回 true.
如果 Executor 处于关闭状态, 往 Executor 提交任务会抛出 unchecked exception RejectedExecutionException.

shutdown() 方法防止新任务被提交给 Executor, 但允许执行以前提交的任务.
另外还有个 shutdownNow() 方法, 它阻止等待任务的启动并试图停止当前正在执行的任务.

另一种提交任务方法

上面的例子, 我们实现了一个 Runable 接口, 然后用 ExecutorService.execute 来执行任务, Executor 还提供另一种执行任务的方法来满足不同情况下的需求, 那就是 ExecutorService.submit, 与 execute 不同, submit 的任务需要实现的是 Callable 接口而不是 Runable.

  • ExecutorService.execute 任务无返回值, 如果需要知道线程的情况, 就需要另外的共享变量.
  • ExecutorService.submit 任务返回一个 Future 实例表示任务的状态, 通过 Future.get() 可以取到 (阻塞), 可以通过 Future.isDone() 来判断线程是否完成.

线程休眠

让线程休眠的方法就是调用 Thread.sleep().

继续刚才的 LiftOff 的例子, 我们只需要更新它的 run() 方法.

public void run() {
    try {
        for (; countDown >= 0;  countDown--) {
            System.out.print(status());
            // Thread.sleep(100);
            // java SE5 style:
            TimeUnit.MILLISECONDS.sleep(100);
        }
    } catch (InterruptedException e) {
        System.out.println("Interrupted Exception");
    }

}


sleep() 与 yield() 的区别就在于, sleep() 除了拥有 yield() 的功能外, 还可以让 cpu 在指定时间内不再切换回自身.

sleep() 的调用会抛出 InterruptedException 异常, 因为异常不能跨线程传播, 所以我们必须在线程本地处理掉.

Java SE5 引入了更加显示的 sleep(), 作为 TimeUnit 类的一部分, 这个方法允许指定 sleep() 的时间单位, 如上面的就是 MILLISECONDS(毫秒).

线程优先级

简单的说, 线程调试器会让优先级高的线程先执行, 但并不是意味着优先级低的线程就得不到执行了, 它只是执行的频率低一些而已.

可以使用 Thread.getPriority() 来获取现有线程的优先级, 使用 Thread.setPriority() 来修改它.

import java.util.concurrent.*;

class SimplePriorities implements Runnable {
    private int countDown = 5; // 倒计时
    private double d;
    private int priority; // 优先级

    public SimplePriorities(int priority) {
        this.priority = priority;
    }

    public String toString() {
        return Thread.currentThread() + ": " + countDown;
    }

    public void run() {
        Thread.currentThread().setPriority(priority);

        for (; countDown > 0; countDown--) {
            // 进行一个稍大开销的运算, 让效果明显点
            for (int i = 1; i < 100000; i++) {
                d += (Math.PI + Math.E) / (double)i;
                if (i % 1000 == 0) {
                    Thread.yield();
                }
            }

            System.out.println(this);
        }
    }
}

public class MainThread {
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++) {
            exec.execute(new SimplePriorities(Thread.MIN*PRIORITY));
        }
        exec.execute(new SimplePriorities(Thread.MAX*PRIORITY));
        exec.shutdown();
    }
}


执行结果如下:

Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-3,1,main]: 5
Thread[pool-1-thread-1,1,main]: 5
Thread[pool-1-thread-4,1,main]: 5
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-5,1,main]: 5
Thread[pool-1-thread-3,1,main]: 4
Thread[pool-1-thread-2,1,main]: 4
...


在 SimplePriorities 中完成了 toString() 的覆盖用来打印线程的名称优先级及线程所属线程组, Thread.currentThread() 方法可以用来获取当前任务所在线程的 Thread 对象引用.

main 中, 我们创建 5 个低优化级的线程和 1 个高优先级的线程, 因为优化级与操作系统有依赖, 所以唯一的可移植设置是使用:

  • MIN*PRIORITY
  • NORM*PRIORITY
  • MAX*PRIORITY

现在说说为什么要在 run() 中设计那么一个稍大开销的运算?
先看看注释掉那段运算后执行的结果:

Thread[pool-1-thread-2,1,main]: 5
Thread[pool-1-thread-2,1,main]: 4
Thread[pool-1-thread-2,1,main]: 3
Thread[pool-1-thread-2,1,main]: 2
Thread[pool-1-thread-2,1,main]: 1
Thread[pool-1-thread-6,10,main]: 5
Thread[pool-1-thread-6,10,main]: 4
...


可以看到, 这些输出几乎是连续的(前几个都是 pool-1-thread-2).
如果没有加入这些运算的话, 因为 CPU 执行是非常快速的, 没有开销大的运算, 线程还没发生切换就被执行完了, 这样也就看不到设置的优先级效果.

后台线程(也称守护线程)

所谓 Daemon 线程, 是指在程序运行时在后台提供一种通用服务的线程, 当所有的非后台线程线束时, 程序也就终止了, 同时会杀死进程中的所有后台线程.

setDaemon() 方法可以将一个线程设置为后台线程, 但是注意, 它必须要在线程启动前设置.

Thread daemon = new Thread();
daemon.setDaemon(true);
daemon.start();


这里我们没有使用 Executor, 而是通过 Thread 显示的创建线程, 以便可以设置线程的后台标志.

通过编写定制的 ThreadFactory 也可以定制由 Executor 创建的线程的属性(后台、优先级及名称), 这里就不演示了.

isDaemon() 方法返回一个线程是否是后台线程.

由后台线损创建的任何其它线程也将自动地被设置为后台线程.

还有一个情况就是 finally 语句, 如果所有的非后台线程都结束, 那么后台线程会在不执行 finally 语句的情况下被终止.

所以这几乎是一种不好的思想, 因为非后台的 Executor 通常是一种更好的守护方式, 因为 Executor 控制的所有线程可以同时被关闭, 在后面将会提及.

另外的创建线程方法

虽然使用 Executor 是优选的方案, 但还是要了解一下另外两种”原始”的创建新线程方法.

直接继承 Thread

class LiftOff extends Thread {
    private int countDown = 10; // default 倒数次数
    private static int taskCount = 0;
    private final int id = taskCount++; // 记录线程 id

    public LiftOff() {
    }

    public LiftOff(int countDown) {
        this.countDown = countDown;
    }

    public String status() {
        return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "). ";
    }

    public void run() {
        for (; countDown >= 0;  countDown--) {
            System.out.print(status());
            Thread.yield();
        }
    }
}

public class MainThread {
    public static void main(String[] args) {
        LiftOff launch = new LiftOff();
        launch.start();
    }
}


实现 Runnable 接口

实现 Runnable 接口后, 要实现线程驱动, 还必须显示地创建一个新线程, 并将 Runnable 附着在其上, Thread 类就是用来完成这个动作的, 我们只需要将 Runnable 对象传给 Thread 的构造函数即可.

public class MainThread {
    public static void main(String[] args) {
        Thread t = new Thread(new LiftOff());
        t.start();
        System.out.println("Waiting for Liftoff.");
    }
}


创建 Thread 对象后, 调用该对象的 start() 方法为新线程执行必需的初始化操作, 然后新线程开始去”跑动”(即调用 Runnable 的 run() 方法), 执行结果如下:

javac MainThread.java

java MainThread

Waiting for Liftoff.
#0(10). #0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(Liftoff).


我们可以看到, start() 方法是立即返回的, 创建运行新线程并不会阻塞当前线程的继续执行.

当然你可以创建更多的线程:

public class MainThread {
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(new LiftOff());
            t.start();
        }
        System.out.println("Waiting for Liftoff.");
    }
}


执行结果如下:

del *.class

javac MainThread.java

java MainThread

#0(10). #0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2).
#0(1). Waiting for Liftoff. #2(10). #1(10). #1(9). #2(9). #0(Liftoff!).
#4(10). #3(10). #4(9). #2(8). #1(8). #2(7). #4(8). #3(9). #4(7). #2(6).
#1(7). #2(5). #4(6). #4(5). #3(8). #4(4). #2(4). #1(6). #2(3). #4(3).
#3(7). #4(2). #2(2). #1(5). #2(1). #4(1). #3(6). #4(Liftoff!). #2(Liftoff!).
#1(4). #3(5). #1(3). #3(4). #1(2). #3(3). #1(1). #3(2). #1(Liftoff!).
#3(1). #3(Liftoff!).


可见, 多线程是轮流交换着去执行的.

加入线程(join)

一个线程可以在其他线程之上调用 join() 方法, 这个方法是阻塞的, 它的含义在于等到被 join 的线程执行结束之后再执行新线程.

join() 方法还可以加一个超时参数, 其意义在于如果等待超时后, join() 方法也能返回.

join() 方法还可以被中断, 在被 join 的线程上调用 interrupt() 方法, 不过这个方法会抛出 InterruptedException 异常.