Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。

创建线程

  1. Thread派生一个自定义类,然后覆写run()方法
1
2
3
4
5
6
7
8
9
10

Thread t = new MyThread();
t.start(); // 启动新线程

class MyThread extends Thread {
@Override
public void run() {
...
}
}
  1. 创建Thread实例时,传入一个Runnable实例
1
2
3
4
5
6
7
8
9
Thread t = new Thread(new MyRunnable());
t.start(); // 启动新线程

class MyRunnable implements Runnable {
@Override
public void run() {
...
}
}

查看Thread类的源代码,会看到start()方法内部调用了一个private native void start0()方法,native修饰符表示这个方法是由JVM虚拟机内部的C代码实现的,不是由Java代码实现的。

join()

通过对另一个线程对象调用join()方法可以等待其执行结束,然后继续往下执行自身线程。

可以指定等待时间join(long),超过等待时间线程仍然没有结束就不再等待。对已经运行结束的线程调用join()方法会立刻返回。

currentThread

Thread.currentThread()获取当前线程

中断线程

interrupt()

interrupt()方法可以发出“中断请求”,通过检测isInterrupted(),可以响应是否结束。

如果线程处于等待状态,例如t.join()会让main线程等待,如果对main线程调用interrupt()join()方法会立刻抛出InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new MyThread();
t.start();
Thread.sleep(1000);
t.interrupt(); // 中断t线程
t.join(); // 等待t线程结束
System.out.println("end");
}
}

class MyThread extends Thread {
public void run() {
Thread hello = new HelloThread();
hello.start(); // 启动hello线程
try {
hello.join(); // 等待hello线程结束
} catch (InterruptedException e) {
System.out.println("interrupted!");
}
hello.interrupt();
}
}

class HelloThread extends Thread {
public void run() {
int n = 0;
while (!isInterrupted()) {
n++;
System.out.println(n + " hello!");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
}
}

标志位

用一个running标志位来标识线程是否应该继续运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Main {
public static void main(String[] args) throws InterruptedException {
HelloThread t = new HelloThread();
t.start();
Thread.sleep(1);
t.running = false; // 标志位置为false
}
}

class HelloThread extends Thread {
public volatile boolean running = true;
public void run() {
int n = 0;
while (running) {
n ++;
System.out.println(n + " hello!");
}
System.out.println("end!");
}
}

注意线程间共享变量需要使用volatile关键字标记,确保每个线程都能读取到更新后的变量值。

在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间并不确定。因此,如果另一个线程在JVM把修改后的变量写回主存前读取了该变量,就会导致多线程之间共享的变量不一致。

volatile关键字的目的就是告诉虚拟机:

  • 每次访问变量时,总是获取主内存的最新值;
  • 每次修改变量后,立刻回写到主内存。

守护线程

守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。

在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程即可。

在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

线程同步

synchronized(lockObject)

JVM保证同一个锁在任意时刻只能被一个线程获取。

加锁是为了令同步块内的语句变为原子操作。JVM规范定义了几种单个原子操作:

  • 基本类型(longdouble除外)赋值,例如:int n = m
  • 引用类型赋值,例如:List<String> list = anotherList

longdouble是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把longdouble的赋值作为原子操作实现的。

线程安全类

如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe)。Java标准库的java.lang.StringBuffer是线程安全的。

还有一些不变类,例如StringIntegerLocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

最后,类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

要设计线程安全类,就要将类的方法设计为线程安全的。用synchronized修饰方法可以把整个方法变为同步代码块:

1
2
3
4
5
6
public synchronized void func(int n) { // 相当于锁住this
...
}
public synchronized static void func(int n) { // 相当于锁住该类的Class实例
...
}

死锁

Java的线程锁是可重入的锁,即JVM允许同一个线程重复获取同一个锁。获取锁的时候,要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0时,才会真正释放锁。

而多线程各自持有不同的锁,并互相试图获取对方已持有的锁,就会导致无限等待,即死锁。

我们编写代码时,保证多线程获取锁的顺序一致,就可以避免这种死锁情况。

wait() & notifyAll()

两者配合可以用于多线程协调运行。

synchronized内部:

  • 已获得的锁对象调用wait()方法,线程进入等待状态,并释放锁。wait()方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait()方法才会返回。wait()方法返回后,线程又会重新试图获得锁。
  • 已获得的锁对象调用notify()notifyAll()方法,唤醒其他等待线程。已唤醒的线程需要重新获得锁后才能继续执行。

看一种情况:

1
2
3
4
5
6
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}

如果这里的判断条件是if,有A、B、C 3个线程在isEmpty()=true后进入了wait(),此时queue为空。D线程放入了一个task,唤醒所有等待线程。此时A、B、C都要从wait()返回,但只有其中一个能先获得锁先执行,假设是A,它立刻调用remove()然后释放锁,这个时候,queue又空了。随后,这个锁如果被B或C获得了,它们直接从wait()下一行执行,无需再次判断便remove(),就会出错。因此这里需要设置为while,从而到wait()下一行时,仍需判断,如果不为空,那么就不进入循环,直接remove就好;为空,那么再wait()

当然,如果调用remove的只有一个线程,那么用if也不会出错。

concurrent

java.util.concurrent包提供了许多并发相关功能实现。

线程安全集合

interface non-thread-safe thread-safe
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingDeque

ReentrantLock

1
2
3
public synchronized void func(int n) {
...
}

等价于

1
2
3
4
5
6
7
8
9
private final Lock lock = new ReentrantLock();
public synchronized void func(int n) {
lock.lock();
try {
...
} finally {
lock.unlock();
}
}

ReentrantLock可以尝试获取锁:

1
2
3
4
5
6
7
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。因此比直接使用synchronized更安全

condition

通过Condition,可以使用 ReentrantLock来实现waitnotify的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();

public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}

public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}

await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

1
2
3
if (condition.await(1, TimeUnit.SECOND)) {
...
}

ReadWriteLock

ReadWriteLock可以保证:

  • 只允许一个线程写入(其他线程既不能写入也不能读取);
  • 没有写入时,多个线程允许同时读(提高性能)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];

public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}

public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}

适合读多写少的场景。

StampedLock

乐观锁:乐观地估计读的过程中大概率不会有写入,读的过程中也允许获取写锁后写入。这样需要一点额外的代码来判断读的过程中是否有写入。显然乐观锁并发程度更高。

悲观锁:读的过程中拒绝有写入,也就是写入必须等待。

ReadWriteLock是一种悲观锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Point {
private final StampedLock stampedLock = new StampedLock();

private double x;
private double y;

public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}

public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁,返回版本号
double currentX = x;
double currentY = y;
if (!stampedLock.validate(stamp)) { // 验证版本号,检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 如果有写,获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}

StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。

StampedLock使不可重入的。

Atomic

使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

  • 原子操作实现了无锁的线程安全;
  • 适用于计数器,累加器等。

AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set

线程池

创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。

线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。线程池实现了线程的复用,效率更高。

Java标准库提供了ExecutorService接口表示线程池,典型用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
// 关闭线程池:
executor.shutdown();
// `shutdownNow()`会立刻停止正在执行的任务,`awaitTermination()`则会等待指定的时间让线程池关闭。

class Task implements Runnable {
...
}

ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池;
1
2
3
4
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
  • SingleThreadExecutor:仅单线程执行的线程池。

ScheduledThreadPool

放入ScheduledThreadPool的任务可以定期反复执行。

1
2
3
4
5
6
7
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
// 2秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间;而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务。

Future

通过实现Runnable接口编写的多线程没有返回值,如果任务需要返回结果,可以继承Callable接口:

1
2
3
4
5
6
7
8
class Task implements Callable<BigDecimal> {
@Override
public BigDecimal call() throws Exception {
Thread.sleep(1000);
double d = 5 + Math.random() * 20;
return new BigDecimal(d).setScale(2, RoundingMode.DOWN);
}
}

对线程池提交一个Callable任务,可以获得一个Future对象。可以用Future在将来某个时刻获取结果。

1
2
3
4
ExecutorService es = Executors.newFixedThreadPool(4);
Future<BigDecimal> future = es.submit(new Task());
// 从Future获取异步执行返回的结果:
System.out.println(future.get());

在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。

Future<V>接口定义的方法有:

  • get():获取结果
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

CompletableFuture

通过Future获得异步执行的结果时,调用阻塞方法get()或轮询isDone()都会导致主线程被迫等待。

CompletableFuture针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。主线程设置好回调后,不再关心异步任务的执行。

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

1
2
3
public interface Supplier<T> {
T get();
}
  • thenAccept()处理正常结果;
  • exceptional()处理异常结果;
  • thenApplyAsync()用于串行化另一个CompletableFuture
  • anyOf()allOf()用于并行化多个CompletableFuture

CompletableFuture的命名规则为:

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行。

Fork/Join

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

Fork/Join线程池可以把一个大任务拆成多个小任务并行执行,在多核CPU上就可以大大提高效率。任务类必须继承自RecursiveTaskRecursiveAction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class Main {
public static void main(String[] args) throws Exception {
// 创建2000个随机数组成的数组:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}

static Random random = new Random(0);

static long random() {
return random.nextInt(10000);
}
}

class SumTask extends RecursiveTask<Long> {

static final int THRESHOLD = 500;
long[] array;
int start;
int end;

SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢计算速度:
try {
Thread.sleep(2);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
// “分裂”子任务:
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
// invokeAll会并行运行两个子任务:
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}

ThreadLocal

ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

初始化 && 使用:(使用ThreadLocal要用try ... finally结构,并在finally中清除)

1
2
3
4
5
6
7
8
9
10
static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
void processUser(user) {
try {
threadLocalUser.set(user);
step1();
step2();
} finally {
threadLocalUser.remove();
}
}

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

1
2
3
4
5
void step1() {
User u = threadLocalUser.get();
log();
printUser();
}

为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。

为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {...}结构,让编译器自动为我们关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class UserContext implements AutoCloseable {

static final ThreadLocal<String> ctx = new ThreadLocal<>();

public UserContext(String user) {
ctx.set(user);
}

public static String currentUser() {
return ctx.get();
}

@Override
public void close() {
ctx.remove();
}
}
// use:
try (var ctx = new UserContext("Bob")) {
// 可任意调用UserContext.currentUser():
String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象