「Java」多线程
Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()
方法,在main()
方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。
创建线程
- 从
Thread
派生一个自定义类,然后覆写run()
方法
1 |
|
- 创建
Thread
实例时,传入一个Runnable
实例
1 | Thread t = new Thread(new MyRunnable()); |
查看Thread
类的源代码,会看到start()
方法内部调用了一个private native void start0()
方法,native
修饰符表示这个方法是由JVM虚拟机内部的C代码实现的,不是由Java代码实现的。
join()
通过对另一个线程对象调用join()
方法可以等待其执行结束,然后继续往下执行自身线程。
可以指定等待时间join(long)
,超过等待时间线程仍然没有结束就不再等待。对已经运行结束的线程调用join()
方法会立刻返回。
currentThread
Thread.currentThread()
获取当前线程
中断线程
interrupt()
interrupt()
方法可以发出“中断请求”,通过检测isInterrupted()
,可以响应是否结束。
如果线程处于等待状态,例如t.join()
会让main
线程等待,如果对main
线程调用interrupt()
,join()
方法会立刻抛出InterruptedException
。
1 | public class Main { |
标志位
用一个running
标志位来标识线程是否应该继续运行。
1 | public class Main { |
注意线程间共享变量需要使用volatile
关键字标记,确保每个线程都能读取到更新后的变量值。
在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间并不确定。因此,如果另一个线程在JVM把修改后的变量写回主存前读取了该变量,就会导致多线程之间共享的变量不一致。
volatile
关键字的目的就是告诉虚拟机:
- 每次访问变量时,总是获取主内存的最新值;
- 每次修改变量后,立刻回写到主内存。
守护线程
守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。
在调用start()
方法前,调用setDaemon(true)
把该线程标记为守护线程即可。
在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。
线程同步
synchronized(lockObject)
JVM保证同一个锁在任意时刻只能被一个线程获取。
加锁是为了令同步块内的语句变为原子操作。JVM规范定义了几种单个原子操作:
- 基本类型(
long
和double
除外)赋值,例如:int n = m
; - 引用类型赋值,例如:
List<String> list = anotherList
。
long
和double
是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把long
和double
的赋值作为原子操作实现的。
线程安全类
如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe)。Java标准库的java.lang.StringBuffer
是线程安全的。
还有一些不变类,例如String
,Integer
,LocalDate
,它们的所有成员变量都是final
,多线程同时访问时只能读不能写,这些不变类也是线程安全的。
最后,类似Math
这些只提供静态方法,没有成员变量的类,也是线程安全的。
要设计线程安全类,就要将类的方法设计为线程安全的。用synchronized
修饰方法可以把整个方法变为同步代码块:
1 | public synchronized void func(int n) { // 相当于锁住this |
死锁
Java的线程锁是可重入的锁,即JVM允许同一个线程重复获取同一个锁。获取锁的时候,要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized
块,记录-1,减到0时,才会真正释放锁。
而多线程各自持有不同的锁,并互相试图获取对方已持有的锁,就会导致无限等待,即死锁。
我们编写代码时,保证多线程获取锁的顺序一致,就可以避免这种死锁情况。
wait() & notifyAll()
两者配合可以用于多线程协调运行。
在synchronized
内部:
- 已获得的锁对象调用
wait()
方法,线程进入等待状态,并释放锁。wait()
方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait()
方法才会返回。wait()
方法返回后,线程又会重新试图获得锁。 - 已获得的锁对象调用
notify()
或notifyAll()
方法,唤醒其他等待线程。已唤醒的线程需要重新获得锁后才能继续执行。
看一种情况:
1 | public synchronized String getTask() throws InterruptedException { |
如果这里的判断条件是if
,有A、B、C 3个线程在isEmpty()=true
后进入了wait()
,此时queue
为空。D线程放入了一个task,唤醒所有等待线程。此时A、B、C都要从wait()
返回,但只有其中一个能先获得锁先执行,假设是A,它立刻调用remove()
然后释放锁,这个时候,queue又空了。随后,这个锁如果被B或C获得了,它们直接从wait()
下一行执行,无需再次判断便remove()
,就会出错。因此这里需要设置为while
,从而到wait()
下一行时,仍需判断,如果不为空,那么就不进入循环,直接remove
就好;为空,那么再wait()
。
当然,如果调用remove
的只有一个线程,那么用if
也不会出错。
concurrent
java.util.concurrent
包提供了许多并发相关功能实现。
线程安全集合
interface | non-thread-safe | thread-safe |
---|---|---|
List | ArrayList | CopyOnWriteArrayList |
Map | HashMap | ConcurrentHashMap |
Set | HashSet / TreeSet | CopyOnWriteArraySet |
Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |
Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |
ReentrantLock
1 | public synchronized void func(int n) { |
等价于
1 | private final Lock lock = new ReentrantLock(); |
ReentrantLock
可以尝试获取锁:
1 | if (lock.tryLock(1, TimeUnit.SECONDS)) { |
上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()
返回false
,程序就可以做一些额外处理,而不是无限等待下去。因此比直接使用synchronized
更安全
condition
通过Condition
,可以使用 ReentrantLock
来实现wait
和notify
的功能
1 | class TaskQueue { |
await()
可以在等待指定时间后,如果还没有被其他线程通过signal()
或signalAll()
唤醒,可以自己醒来:
1 | if (condition.await(1, TimeUnit.SECOND)) { |
ReadWriteLock
ReadWriteLock
可以保证:
- 只允许一个线程写入(其他线程既不能写入也不能读取);
- 没有写入时,多个线程允许同时读(提高性能)。
1 | public class Counter { |
适合读多写少的场景。
StampedLock
乐观锁:乐观地估计读的过程中大概率不会有写入,读的过程中也允许获取写锁后写入。这样需要一点额外的代码来判断读的过程中是否有写入。显然乐观锁并发程度更高。
悲观锁:读的过程中拒绝有写入,也就是写入必须等待。
ReadWriteLock
是一种悲观锁。
1 | public class Point { |
StampedLock
把读锁细分为乐观读和悲观读,能进一步提升并发效率。
但StampedLock
使不可重入的。
Atomic
使用java.util.concurrent.atomic
提供的原子操作可以简化多线程编程:
- 原子操作实现了无锁的线程安全;
- 适用于计数器,累加器等。
以AtomicInteger
为例,它提供的主要操作有:
- 增加值并返回新值:
int addAndGet(int delta)
- 加1后返回新值:
int incrementAndGet()
- 获取当前值:
int get()
- 用CAS方式设置:
int compareAndSet(int expect, int update)
Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set
。
线程池
创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。
线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。线程池实现了线程的复用,效率更高。
Java标准库提供了ExecutorService
接口表示线程池,典型用法如下:
1 | // 创建固定大小的线程池: |
ExecutorService
只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池;
1 | int min = 4; |
- SingleThreadExecutor:仅单线程执行的线程池。
ScheduledThreadPool
放入ScheduledThreadPool
的任务可以定期反复执行。
1 | ScheduledExecutorService ses = Executors.newScheduledThreadPool(4); |
FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间;而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务。
Future
通过实现Runnable
接口编写的多线程没有返回值,如果任务需要返回结果,可以继承Callable
接口:
1 | class Task implements Callable<BigDecimal> { |
对线程池提交一个Callable
任务,可以获得一个Future
对象。可以用Future
在将来某个时刻获取结果。
1 | ExecutorService es = Executors.newFixedThreadPool(4); |
在调用get()
时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
Future<V>
接口定义的方法有:
get()
:获取结果get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间;cancel(boolean mayInterruptIfRunning)
:取消当前任务;isDone()
:判断任务是否已完成。
CompletableFuture
通过Future
获得异步执行的结果时,调用阻塞方法get()
或轮询isDone()
都会导致主线程被迫等待。
CompletableFuture
针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。主线程设置好回调后,不再关心异步任务的执行。
创建一个CompletableFuture
是通过CompletableFuture.supplyAsync()
实现的,它需要一个实现了Supplier
接口的对象:
1 | public interface Supplier<T> { |
thenAccept()
处理正常结果;exceptional()
处理异常结果;thenApplyAsync()
用于串行化另一个CompletableFuture
;anyOf()
和allOf()
用于并行化多个CompletableFuture
。
CompletableFuture
的命名规则为:
xxx()
:表示该方法将继续在已有的线程中执行;xxxAsync()
:表示将异步在线程池中执行。
Fork/Join
Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
Fork/Join线程池可以把一个大任务拆成多个小任务并行执行,在多核CPU上就可以大大提高效率。任务类必须继承自RecursiveTask
或RecursiveAction
。
1 | public class Main { |
ThreadLocal
ThreadLocal
表示线程的“局部变量”,它确保每个线程的ThreadLocal
变量都是各自独立的;
ThreadLocal
适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);
初始化 && 使用:(使用ThreadLocal
要用try ... finally
结构,并在finally
中清除)
1 | static ThreadLocal<User> threadLocalUser = new ThreadLocal<>(); |
通过设置一个User
实例关联到ThreadLocal
中,在移除之前,所有方法都可以随时获取到该User
实例:
1 | void step1() { |
为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal
没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal
关联的实例,我们可以通过AutoCloseable
接口配合try (resource) {...}
结构,让编译器自动为我们关闭。
1 | public class UserContext implements AutoCloseable { |