可以去了解一下什么是并发和并行及进程与多线程。
并发和并行
进程与线程
Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()
方法,在main()
方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。
因此,对于大多数Java程序来说,我们说多任务,实际上是说如何使用多线程 实现多任务。
创建新线程
Java语言内置了多线程支持。当Java程序启动的时候,实际上是启动了一个JVM进程,然后,JVM启动主线程来执行main()
方法。在main()
方法中,我们又可以启动其他线程。推荐使用Runable()
。
要创建一个新线程非常容易,我们需要实例化一个Thread
实例,然后调用它的start()
或者使用下方代码块提供的其他两种方法。
1 2 3 4 public static void main (String[] args) { Thread t = new Thread (); t.start(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class CreatThread { public static void main (String[] args) throws InterruptedException { System.out.println("线程main启动" ); Thread thread = new MyThread (); Thread thread1 = new Thread (new MyRunnable ()); Thread thread2 = new Thread (() -> { System.out.println("线程thread启动" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } System.out.println("thread end..." ); }); thread.start(); thread1.start(); thread2.start(); Thread.sleep(1000 ); System.out.println("main end..." ); } } class MyThread extends Thread { @Override public void run () { System.out.println("线程thread已启动" ); System.out.println("thread end..." ); } } class MyRunnable implements Runnable { @Override public void run () { System.out.println("线程thread已启动" ); } }
我们无法确定线程main
启动之后会先输出哪个,但是我给main
线程休眠了1秒,所以main end...
会是最后输出。
需要注意:
直接调用run()
方法,相当于调用了一个普通的Java方法,当前线程并没有任何改变,也不会启动新线程。上述代码实际上是在main()
方法内部又调用了run()
方法,打印hello
语句是在main
线程中执行的,没有任何新线程被创建 。
必须调用Thread
实例的start()
方法才能启动新线程 ,如果我们查看Thread
类的源代码,会看到start()
方法内部调用了一个private native void start0()
方法,native
修饰符表示这个方法是由JVM虚拟机内部的C代码实现的,不是由Java代码实现的。
Callable
如果我们的线程需要返回一个值 ,我们需要使用另一个接口Callable
。和Runnable
接口比,它多了一个返回值。
1 2 3 4 5 6 class Task implements Callable <String> { @Override public String call () throws Exception { return "test" ; } }
1 2 3 4 5 6 7 ExecutorService executor = Executors.newFixedThreadPool(5 ); Callable<String> task = new Task (); Future<String> future = executor.submit(task); String result = future.get();
返回结果是什么类型就写什么类型。获取到future对象之后,可以调用get()
方法获得异步执行的结果。在调用get()
时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
一个Future<V>
接口表示一个未来可能会返回的结果,它定义的方法有:
get()
:获取结果(可能会等待)
get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间,超过会报错然后结束线程;
cancel(boolean mayInterruptIfRunning)
:取消当前任务;
isDone()
:判断任务是否已完成。
CompletableFuture
因为当调用future的get()方法时,当前主线程是堵塞的,这不太合理,可能在某些场景不太适用。于是Java提供了CompletableFuture
改善这问题。CompletableFuture
通过回调的方式计算处理结果,并且提供了函数式编程能力,让代码更美观。
CompletableFuture
源码中有四个静态方法用来执行异步任务:
1 2 3 4 5 6 7 8 9 10 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) {..}public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier,Executor executor) {..}public static CompletableFuture<Void> runAsync (Runnable runnable) {..}public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) {..}
显然Run开头的没有返回值,因为Runable()
接口并不提供返回值。
执行异步任务:
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> test = CompletableFuture.supplyAsync(() -> { System.out.println("执行" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 999 ; });
返回值,我们可以利用以下这几个方法获取执行结果:
1 2 3 4 V get () ; V get (long timeout,Timeout unit) ; T getNow (T defaultValue) ; T join () ;
get()
获取执行结果,但是会使当前线程堵塞,如果执行线程一直没有返回数据,get()
会一直等待下去。
第二个get(long timeout,Timeout unit)
,可以自己设置等待时间,超过等待时间会报错TimeoutException – if the wait timed out
,然后结束线程。
getNow(T defaultValue)
表示当有了返回结果时会返回结果,如果异步线程抛了异常会返回自己设置的默认值。
join()
返回结果。如果异步线程抛出异常则它抛出 CompletionException
异常。源码中说抛出以下这两种异常
1 2 CancellationException – if the computation was cancelled CompletionException – if this future completed exceptionally or a completion computation threw an exception
thenAccept()
功能 :当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数,无返回值.
1 2 3 4 5 6 public static void main (String[] args) { CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A" ); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任务B" ); CompletableFuture<Void> futureC = futureB.thenAccept(System.out::println); }
执行任务A,同时异步执行任务B,待任务B正常返回之后,用B的返回值执行任务C,任务C无返回值。
thenRun(…)
功能 :对不关心上一步的计算结果,执行下一个操作
1 2 3 CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A" ); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任务B" ); CompletableFuture<Void> test = futureA.thenRun(() -> System.out.println("执行任务B" ));
执行任务A,任务A执行完以后,执行任务B,任务B不接受任务A的返回值(不管A有没有返回值),test无返回值。
可以跟thenAccept()
比较一下:前者需要利用到上一步的返回结果,后者不需要。
thenApply(…)
功能 :当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数,有返回值
1 2 3 4 5 6 CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A" ); CompletableFuture<String> futureC = futureA.thenApply(a -> { System.out.println(a + "test" ); return "successful" ; }); System.out.println(futureC.join());
就相当于有返回值了而已,跟前面两个的区别就在于它有返回值了。它跟thenAccept()
比较相似,是需要上次执行结果作为参数。
我趣,CompletableFuture
还有好多内容,我懒,我不看了。
CompletableFuture
还有许多方法优化代码效率,可以去看看(会不会是因为我没看才这样说的呢)。
CompletableFuture
的命名规则:
xxx()
:表示该方法将继续在已有的线程中执行;
xxxAsync()
:表示将异步在线程池中执行。
线程的优先级
可以对线程设定优先级,设定优先级的方法是:
1 Thread.setPriority(int n) // 1~10, 默认值5
优先级高的线程被操作系统调度的优先级较高,操作系统对高优先级线程可能调度更频繁,但我们不能通过设置优先级来确保高优先级的线程一定会先执行。
线程的状态
在Java程序中,一个线程对象只能调用一次 start()
方法启动新线程,并在新线程中执行run()
方法。一旦run()
方法执行完毕,线程就结束了。因此,Java线程的状态有以下几种:
New:新创建的线程,尚未执行;
Runnable:运行中的线程,正在执行run()
方法的Java代码;
Blocked:运行中的线程,因为某些操作被阻塞而挂起;
Waiting:运行中的线程,因为某些操作在等待中;
Timed Waiting:运行中的线程,因为执行sleep()
方法正在计时等待;
Terminated:线程已终止,因为run()
方法执行完毕。
用一个状态转移图表示如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ┌─────────────┐ │ New │ └─────────────┘ │ ▼ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌─────────────┐ ┌─────────────┐ ││ Runnable │ │ Blocked ││ └─────────────┘ └─────────────┘ │┌─────────────┐ ┌─────────────┐│ │ Waiting │ │Timed Waiting│ │└─────────────┘ └─────────────┘│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ ▼ ┌─────────────┐ │ Terminated │ └─────────────┘
当线程启动后,它可以在Runnable
、Blocked
、Waiting
和Timed Waiting
这几个状态之间切换,直到最后变成Terminated
状态,线程终止。除了新建和终止外还有四个状态处于同一层。
可以使用getState()
查看线程状态。
线程终止的原因有:
线程正常终止:run()
方法执行到return
语句返回;
线程意外终止:run()
方法因为未捕获的异常导致线程终止;
一个线程可以等待 另一个线程直到它运行结束。使用join()
方法。
代码:
1 2 3 4 5 6 7 8 9 public static void main (String[] args) throws InterruptedException { Thread thread = new Thread (() -> { System.out.println("hello" ); }); System.out.println("start" ); thread.start(); thread.join(); System.out.println("end" ); }
输出:
如果把thread.join()
去掉,则输出:
等待thread这个线程结束再继续运行main线程。join
就是等待该线程结束,然后才继续往下执行自身线程。
中断线程
调用interrupt()
方法
对目标线程调用interrupt()
方法,目标线程需要反复检测自身状态是否是interrupted状态,如果是,就立刻结束运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void main (String[] args) throws InterruptedException { MyThread1 myThread = new MyThread1 (); myThread.start(); Thread.sleep(1 ); myThread.interrupt(); myThread.join(); System.out.println("end" ); } } class MyThread1 extends Thread { public void run () { int n = 0 ; while (! isInterrupted()) { n ++; System.out.println(n + " hello!" ); } }
输出结果:
这里如果把sleep去掉,代码直接中断,来不及输出东西了。这里sleep把休眠的时间改长的话会输出更多,因为有更多的时间。
如果我们把调换一下代码的位置:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws InterruptedException { MyThread1 myThread = new MyThread1 (); myThread.start(); Thread.sleep(1 ); myThread.join(); myThread.interrupt(); System.out.println("end" ); }
如果换成这样,把interrupt()
方法放到后面再调用,但是前面有一个join()
方法,就表示我们只能等待myThread这个线程结束才能往下走,又因为MyThread1里有一个while循环语句,永远都是false,结束不了,所以会一直输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public static void main (String[] args) throws InterruptedException { Thread t = new MyThread2 (); t.start(); Thread.sleep(1000 ); t.interrupt(); t.join(); System.out.println("end" ); } } class MyThread2 extends Thread { public void run () { Thread hello = new HelloThread (); hello.start(); try { hello.join(); } catch (InterruptedException e) { System.out.println("interrupted!" ); } hello.interrupt(); } } class HelloThread extends Thread { public void run () { int n = 0 ; while (!isInterrupted()) { n++; System.out.println(n + " hello!" ); try { Thread.sleep(100 ); } catch (InterruptedException e) { break ; } } }
t
线程启动,执行run()
然后hello
线程启动,开始输出hello
直到睡眠时间到,t
线程调用interrupt()
,而此时,t
线程正位于hello.join()
的等待中,hello.join()
就会报错,try{...} catch(){...}
捕获到这个异常,输出interrupted!
。在t
线程结束前,对hello
线程也进行了interrupt()
调用通知其中断。如果去掉这一行代码,可以发现hello
线程仍然会继续运行,且JVM不会退出。
设置标志位 中断
对目标线程调用interrupt()
方法是中断线程的一种方法,还有另一种通过设置标志位 中断方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws InterruptedException { HelloThread t = new HelloThread (); t.start(); Thread.sleep(1 ); t.running = false ; } } class HelloThread extends Thread { public volatile boolean running = true ; public void run () { int n = 0 ; while (running) { n ++; System.out.println(n + " hello!" ); } System.out.println("end!" ); }
HelloThread
的标志位boolean running
是一个线程间共享的变量。线程间共享变量需要使用volatile
关键字标记,确保每个线程都能读取到更新后的变量值。
对线程间共享的变量用关键字volatile
声明涉及到Java的内存模型。在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定 的!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ Main Memory │ │ ┌───────┐┌───────┐┌───────┐ │ │ var A ││ var B ││ var C │ │ └───────┘└───────┘└───────┘ │ │ ▲ │ ▲ │ ─ ─ ─│─│─ ─ ─ ─ ─ ─ ─ ─│─│─ ─ ─ │ │ │ │ ┌ ─ ─ ┼ ┼ ─ ─ ┐ ┌ ─ ─ ┼ ┼ ─ ─ ┐ ▼ │ ▼ │ │ ┌───────┐ │ │ ┌───────┐ │ │ var A │ │ var C │ │ └───────┘ │ │ └───────┘ │ Thread 1 Thread 2 └ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─ ┘
这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。例如,主内存的变量a = true
,线程1执行a = false
时,它在此刻仅仅是把变量a
的副本变成了false
,主内存的变量a
还是true
,在JVM把修改后的a
回写到主内存之前,其他线程读取到的a
的值仍然是true
,这就造成了多线程之间共享的变量不一致。
因此,volatile
关键字的目的是告诉虚拟机:
每次访问变量时,总是获取主内存的最新值;
每次修改变量后,立刻 回写到主内存。
volatile
关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。
针对多线程使用的变量如果不是volatile或者final修饰的,很有可能产生不可预知的结果(另一个线程修改了这个值,但是之后在某线程看到的是修改之前的值)。其实道理上讲同一实例的同一属性本身只有一个副本。但是多线程是会缓存值的,本质上,volatile就是不去缓存,直接取值。在线程安全的情况下加volatile会牺牲性能。
中断是一个状态!interrupt()
方法只是将这个状态置为true而已。所以说正常运行的程序不去检测状态,就不会终止,而wait()
等阻塞方法会去检查并抛出异常。如果在正常运行的程序中添加while(!Thread.interrupted())
,则同样可以在中断后离开代码体。
守护线程
守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。
因此,JVM退出时,不必关心守护线程是否已结束。
不是字面看着守护这个线程不让它结束,而是可以直接结束这个线程从而可以结束JVM。不要看表面意思啊喂
1 2 3 Thread t = new MyThread ();t.setDaemon(true ); t.start();
在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。
线程同步(加锁解锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Main { public static void main (String[] args) throws Exception { var add = new AddThread (); var dec = new DecThread (); add.start(); dec.start(); add.join(); dec.join(); System.out.println(Counter.count); } } class Counter { public static int count = 0 ; } class AddThread extends Thread { public void run () { for (int i=0 ; i<10000 ; i++) { Counter.count += 1 ; } } } class DecThread extends Thread { public void run () { for (int i=0 ; i<10000 ; i++) { Counter.count -= 1 ; } } }
当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行。比如上方这个代码块的执行结果就不一定是0,对数据的处理不是原子操作,导致多个线程读写一个变量,数据不一致。
这个时候,有个单线程模型下不存在的问题就来了:如果多个线程同时读写共享变量,会出现数据不一致的问题。这个结果每次执行都不一样,不会是我们想象的那样得到执行加一次后执行减一次。
🌰:两个人分别去银行向同一张卡取钱、排队干饭。
多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ┌───────┐ ┌───────┐ │Thread1│ │Thread2│ └───┬───┘ └───┬───┘ │ │ │-- lock -- │ │ILOAD (100) │ │IADD │ │ISTORE (101) │ │-- unlock -- │ │ │-- lock -- │ │ILOAD (101) │ │IADD │ │ISTORE (102) │ │-- unlock -- ▼ ▼
通过加锁和解锁的操作,就能保证3条指令总是在一个线程执行期间,不会有其他线程会进入此指令区间。即使在执行期线程被操作系统中断执行,其他线程也会因为无法获得锁导致无法进入此指令区间。只有执行线程将锁释放后,其他线程才有机会获得锁并执行。这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行。
可见,保证一段代码的原子性就是通过加锁和解锁实现的。Java程序使用synchronized
关键字对一个对象进行加锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class Main { public static void main (String[] args) throws Exception { var add = new AddThread (); var dec = new DecThread (); add.start(); dec.start(); add.join(); dec.join(); System.out.println(Counter.count); } } class Counter { public static int count = 0 ; public static final Object lock = new Object (); } class AddThread extends Thread { public void run () { for (int i = 0 ; i < 10000 ; i++) { synchronized (Counter.lock) { Counter.count += 1 ; } } } } class DecThread extends Thread { public void run () { for (int i = 0 ; i < 10000 ; i++) { synchronized (Counter.lock) { Counter.count -= 1 ; } } } }
它表示用Counter.lock
实例作为锁,两个线程在执行各自的synchronized(Counter.lock) { ... }
代码块时,必须先获得锁,才能进入代码块进行 。执行结束后,在synchronized
语句块结束会自动释放锁。这样一来,对Counter.count
变量进行读写就不可能同时进行。上述代码无论运行多少次,最终结果都是0。
同步方法跟同步块差不多,这里写的是同步块。不同的是同步方法默认锁住的对象是本身这个方法的类,默认锁的是this.
而同步块可以锁任何对象,只要你传这个对象过去就行了,就是上面这个栗子。
使用synchronized
解决了多线程同步访问共享变量的正确性问题。但是,它的缺点是带来了性能下降。因为synchronized
代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized
会降低程序的执行效率。
在使用synchronized
的时候,不必担心抛出异常。因为无论是否有异常,都会在synchronized
结束处正确释放锁。
需要注意的是线程各自的synchronized
锁住的必须是同一个对象 ,因为JVM只保证同一个锁在任意时刻只能被一个线程获取,但是两个不同的锁在同一时刻可以被两个线程分别获取。
使用锁的时候根据共享实例选择不同的锁,避免好几个线程使用的都是同一个锁降低了效率。
JVM规范定义了几种原子操作:
基本类型(long
和double
除外)赋值,例如:int n = m
;
引用类型赋值,例如:List<String> list = anotherList
。
原子操作 是指不难被中断的一个或者一系列操作。
这两种都不需要synchronized的操作。
但是,如果是多行赋值,就必须保证是同步操作。
有时候赋值操作可以转换成利用指针引用进行赋值从而达到把非原子操作变为原子操作。
多行赋值,必须进行同步操作:
1 2 3 4 5 6 7 8 9 10 class Pair { int first; int last; public void set (int first, int last) { synchronized (this ) { this .first = first; this .last = last; } } }
可以改造成这样:
1 2 3 4 5 6 7 class Pair { int [] pair; public void set (int first, int last) { int [] ps = new int [] { first, last }; this .pair = ps; } }
这里不再需要同步,因为this.pair = ps
是引用赋值的原子操作。
这里的ps
是方法内部定义的局部变量,每个线程都会有各自的局部变量,互不影响,并且互不可见,并不需要同步。
死锁
多个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package lock;public class DeadLock { public static void main (String[] args) { test t1 = new test ("工藤新一" ); test t2 = new test ("怪盗基德" ); t1.start(); t2.start(); } } class LockA {} class LockB {} class test extends Thread { static final LockA lockA = new LockA (); static final LockB lockB = new LockB (); String obj; test (String obj) { this .obj = obj; } @Override public void run () { try { fight(); } catch (InterruptedException e) { throw new RuntimeException (e); } } public void fight () throws InterruptedException { if (this .obj.equals("工藤新一" )) { synchronized (lockA) { System.out.println(this .obj + "获得lockA" ); Thread.sleep(100 ); synchronized (lockB) { System.out.println(this .obj + "获得lockB" ); } } } else { synchronized (lockB) { System.out.println(this .obj + "获得lockB" ); Thread.sleep(100 ); synchronized (lockA) { System.out.println(this .obj + "获得lockA" ); } } } } }
如果是按这样写的话
线程1:工藤新一进入fight()
,获得lockA
;
线程2:怪盗基德进入fight()
,获得lockB
。
随后
线程1:工藤新一准备获得lockB
,失败,等待中;
线程2:怪盗基德准备获得lockA
,失败,等待中。
此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。
如果我们把获取第二把锁的位置换一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void fight () throws InterruptedException { if (this .obj.equals("工藤新一" )) { synchronized (lockA) { System.out.println(this .obj + "获得lockA" ); Thread.sleep(100 ); }synchronized (lockB) { System.out.println(this .obj + "获得lockB" ); } } else { synchronized (lockB) { System.out.println(this .obj + "获得lockB" ); Thread.sleep(100 ); } synchronized (lockA) { System.out.println(this .obj + "获得lockA" ); } } }
再拿到一个锁之后马上解锁,然后拿下一把锁,不抱死,让两个人都能拿到下一把锁。
产生死锁的四个条件:
互斥条件:一个资源每次只能被一个进程使用。
请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件:进程已获得的资源,在未使用完之前,不能强行剥夺。
循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。
Java的线程锁是可重入的锁。对同一个线程,在获取到锁以后继续获取同一个锁。JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁 (ReentrantLock)。由于Java的线程锁是可重入锁,所以,获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized
块,记录-1,减到0的时候,才会真正释放锁。
ReentrantLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class TestLock extends Thread { int ticketNums = 10 ; private final ReentrantLock lock = new ReentrantLock (); @Override public void run () { while (true ) { try { lock.lock(); if (ticketNums > 0 ) { try { Thread.sleep(100 ); }catch (InterruptedException e) { e.printStackTrace(); } System.out.println(ticketNums--); }else { break ; } } finally { lock.unlock(); } } } public static void main (String[] args) { TestLock t1 = new TestLock (); new Thread (t1).start(); new Thread (t1).start(); new Thread (t1).start(); } }
保证线程安全,好像跟synchronized
用处一样,只是使用更方便了。
使用ReentrantLock
比直接使用synchronized
更安全,可以替代synchronized
进行线程同步。和synchronized
不同的是,ReentrantLock
可以尝试获取锁:
1 2 3 4 5 6 7 if (lock.tryLock(1, TimeUnit.SECONDS)) { try { ... } finally { lock.unlock(); } }
上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()
返回false
,程序就可以做一些额外处理,而不是无限等待下去。
所以,使用ReentrantLock
比直接使用synchronized
更安全,线程在tryLock()
失败的时候不会导致死锁。
wait()和notify()
这两个方法就是睡眠线程和唤醒线程。基于synchronized
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 package PC;public class TestPC { public static void main (String[] args) { SynContainer container = new SynContainer (); new Productor (container).start(); new Consumer (container).start(); } } class Productor extends Thread { SynContainer container; public Productor (SynContainer container) { this .container = container; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { try { container.push(new Chicken (i)); } catch (InterruptedException e) { throw new RuntimeException (e); } System.out.println("生产了" + i + "只鸡" ); } } } class Consumer extends Thread { SynContainer container; public Consumer (SynContainer container) { this .container = container; } @Override public void run () { for (int i = 0 ; i < 100 ; i++) { try { System.out.println("消费了-->" + container.pop().id + "只鸡" ); } catch (InterruptedException e) { throw new RuntimeException (e); } } } } class Chicken { int id; public Chicken (int id) { this .id = id; } } class SynContainer { Chicken[] chickens = new Chicken [10 ]; int count = 0 ; public synchronized void push (Chicken chicken) throws InterruptedException { if (count == chickens.length) { this .wait(); } chickens[count] = chicken; count++; this .notifyAll(); } public synchronized Chicken pop () throws InterruptedException { if (count == 0 ) { this .wait(); } count--; this .notifyAll(); return chickens[count]; } }
wait
和notify
用于多线程协调运行:
在synchronized
内部可以调用wait()
使线程进入等待状态;
必须在已获得的锁对象上调用wait()
方法;
在synchronized
内部可以调用notify()
或notifyAll()
唤醒其他等待线程;
必须在已获得的锁对象上调用notify()
或notifyAll()
方法;
已唤醒的线程还需要重新获得锁后才能继续执行。
必须写在synchronized(锁对象){......}
代码块中。
wait()
定义在Object
类中的一个方法,wait()
方法在调用时,会释放线程获得的锁 ,直到wait()
方法返回后,线程才重新试图获得锁。
notifyAll()
将唤醒所有 当前正在this
锁等待的线程,而notify()
只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。
多线程编程好好好难。
Condition
synchronized
可以配合wait
和notify
实现线程在条件不满足时等待,条件满足时唤醒。而当我们使用ReentrantLock
的时候就需要使用Condition
对象来实现wait
和notify
的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private final Condition condition = lock.newCondition(); public void add (String s) { lock.lock(); try { queue.add(s); condition.signalAll(); } finally { lock.unlock(); } } public String get () throws InterruptedException { lock.lock(); try { while (queue.isEmpty()) { condition.await(); } return queue.remove(); }finally { lock.unlock(); } }
使用Condition
时,引用的Condition
对象必须从Lock
实例的newCondition()
返回,这样才能获得一个绑定了Lock
实例的Condition
实例。
await()
会释放当前锁,进入等待状态;
signal()
会唤醒某个等待线程;
signalAll()
会唤醒所有等待线程;
唤醒线程从await()
返回后需要重新获得锁。
ReadWriteLock
只允许一个线程写入(其他线程既不能写入也不能读取);
没有写入时,多个线程允许同时读(提高性能)。
1 2 3 4 5 6 7 private final ReadWriteLock rwlock = new ReentrantReadWriteLock (); private final Lock rlock = rwlock.readLock(); private final Lock wlock = rwlock.writeLock(); wlock.lock(); wlock.unlock(); rlock.lock(); rlock.unlock();
分别用读锁和写锁去加锁读写操作,读取时可以多个线程同时获取读锁。注意,如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写 。这是悲观的读锁。
StampedLock
乐观的读锁,读的过程中也允许获取写锁后写入,写入后我们的数据可能会发生变化,所以需要判断我们读入的时候是否有写入数据。StampedLock
和ReadWriteLock
区别在读锁不同。StampedLock
是不可重入锁 ,不能在一个线程中反复获取同一个锁。
1 private final StampedLock stampedLock = new StampedLock ();
1 2 long stamp = stampedLock.writeLock(); stampedLock.unlockWrite(stamp);
1 long stamp = stampedLock.tryOptimisticRead();
通过validate()
去验证版本号,如果读取过程没有写入,版本号不变,验证成功。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。
1 stampedLock.validate(stamp)
1 2 stamp = stampedLock.readLock(); stampedLock.unlockRead(stamp);
Concurrent集合
java.util.concurrent
包提供了几种并发集合类。
interface
non-thread-safe
thread-safe
List
ArrayList
CopyOnWriteArrayList
Map
HashMap
ConcurrentHashMap
Set
HashSet / TreeSet
CopyOnWriteArraySet
Queue
ArrayDeque / LinkedList
ArrayBlockingQueue / LinkedBlockingQueue
Deque
ArrayDeque / LinkedList
LinkedBlockingDeque
当我们需要多线程访问时,我们可以把对应的集合换成并发集合类。
Atomic原子类
这个更是重量级 ,内容有点多,我都没细学,大概看了看,用到再说。
Atomic包下所有的原子类都只适用于单个元素,即只能保证一个基本数据类型、对象、或者数组的原子性。根据使用范围,可以将这些类分为四种类型,分别为原子更新基本类型 、原子更新数组 、原子更新引用 、原子更新属性 。
原子更新基本类型
atomic包下原子更新基本数据类型包括AtomicInteger(原子更新整数类型)、AtomicLong(原子更新长整数类型)、AtomicBoolean(原子更新布尔类型)三个类。
这里只举AtomicInteger
栗子。
AtomicInteger
中提供许多方法给我们调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 get(); set(int newValue); getAndSet(int newValue); getAndIncrement() getAndDecrement() incrementAndGet() decrementAndGet() getAndAdd(int delta)
可以去Java源码中查看。
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class AtomicTest { public static void main (String[] args) throws Exception { var add = new AddThread (); var dec = new DecThread (); add.start(); dec.start(); add.join(); dec.join(); System.out.println(AtomicTest.getCount()); } public static int getCount () { return atomicInteger.get(); } } class Counter { public static int count = 0 ; public static AtomicInteger atomicInteger = new AtomicInteger (); } class AddThread extends Thread { public void run () { for (int i = 0 ; i < 10000 ; i++) { atomicInteger.getAndIncrement(); } } } class DecThread extends Thread { public void run () { for (int i = 0 ; i < 10000 ; i++) { atomicInteger.getAndDecrement(); } } }
这个栗子使用的是线程同步那里的栗子,我们这里使用getAndIncrement()
和getAndDecrement()
确保原子操作。线程同步那边使用的是synchronized
保证原子性。
原子更新引用类型
基本类型的原子类只能更新一个变量,如果需要原子更新多个变量,则需要使用引用类型原子类。引用类型的原子类包括AtomicReference、AtomicStampedReference、AtomicMarkableReference三个。
AtomicReference 引用原子类
AtomicStampedReference 原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。(CAS是一个算法,无锁算法)
AtomicMarkableReference 原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来。
以AtomicReference
为例:
AtomicReference
同样提供了许多方法:
1 2 3 4 5 6 7 8 9 get(); set(V newValue) getAndSet(V newValue) compareAndSet(V expectedValue, V newValue)
可以去源码中查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class Evangelion { public String pilots; public String series; public Evangelion (String series, String pilots) { this .pilots = pilots; this .series = series; } public static void main (String[] args) { AtomicReference<Evangelion> atomicReference = new AtomicReference <>(); Evangelion unit0 = new Evangelion ("Zerogōki" , "Rei Ayanami" ); Evangelion unit1 = new Evangelion ("Shogōki" , "Shinji Ikari" ); Evangelion unit2 = new Evangelion ("Nigōki" , "Asuka Langley Soryu" ); atomicReference.set(unit0); String series0 = atomicReference.get().series; String pilots0 = atomicReference.get().pilots; System.out.println(series0 + "驾驶员为" + pilots0); boolean res = atomicReference.compareAndSet(unit0, unit2); System.out.println(res); String series2 = atomicReference.get().series; String pilots2 = atomicReference.get().pilots; System.out.println(series2 + "驾驶员为" + pilots2); } }
输出结果:
1 2 3 Zerogōki驾驶员为Rei Ayanami true Nigōki驾驶员为Asuka Langley Soryu
原子更新数组
原子更新数组并不是对数组本身的原子操作,而是对数组中的元素。主要包括3个类:AtomicIntegerArray
(原子更新整数数组的元素)、AtomicLongArray
(原子更新长整数数组的元素)及AtomicReferenceArray
(原子更新引用类型数组的元素)
以 AtomicIntegerArray
为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class AtomicIntegerArray implements java .io.Serializable { private final int [] array; public final int get (int i) { return (int )AA.getVolatile(array, i); } public final void set (int i, int newValue) { AA.setVolatile(array, i, newValue); } public final boolean compareAndSet (int i, int expectedValue, int newValue) { return AA.compareAndSet(array, i, expectedValue, newValue); } public final int getAndIncrement (int i) { return (int )AA.getAndAdd(array, i, 1 ); } public final int getAndDecrement (int i) { return (int )AA.getAndAdd(array, i, -1 ); } public final int incrementAndGet (int i) { return (int )AA.getAndAdd(array, i, 1 ) + 1 ; } public final int decrementAndGet (int i) { return (int )AA.getAndAdd(array, i, -1 ) - 1 ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Array { public static void main (String[] args) { int [] array = new int [5 ]; AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray (5 ); AtomicIntegerArray atomicIntegerArray1 = new AtomicIntegerArray (array); atomicIntegerArray.set(0 , 100 ); System.out.println(atomicIntegerArray.get(0 )); } }
原子更新对象属性
只选择更新某个对象中的字段,可以使用更新对象字段的原子类。包括三个类,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater以及AtomicReferenceFieldUpdater。需要注意的是这些类的使用需要满足以下条件才可。
被操作的字段不能是static类型;
被操纵的字段不能是final类型;
被操作的字段必须是volatile修饰的;
属性必须对于当前的Updater所在区域是可见的。
以AtomicReferenceFieldUpdater
为栗:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Evangelion { public volatile String pilot; public String series; public Evangelion (String series, String pilot) { this .pilot = pilot; this .series = series; } public static void main (String[] args) { AtomicReferenceFieldUpdater<Evangelion,String> updater = AtomicReferenceFieldUpdater.newUpdater(Evangelion.class, String.class, "pilot" ); Evangelion unit1 = new Evangelion ("Shogōki" , "Shinji Ikari" ); updater.set(unit1, "Rei Ayanami" ); System.out.println(unit1.pilot); } }
输出结果
AtomicReferenceFieldUpdater
是一个抽象类。直接调用里面的newUpdater(...)
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public abstract class AtomicReferenceFieldUpdater <T,V>public static <U,W> AtomicReferenceFieldUpdater<U,W> newUpdater (Class<U> tclass, Class<W> vclass, String fieldName)
CAS
CAS是Compare And Swap的简称,即比较并交换的意思。CAS是一种无锁算法,其算法思想如下:
CAS的函数公式:compareAndSwap(V,E,N); 其中V表示要更新的变量,E表示预期值,N表示期望更新的值。调用compareAndSwap函数来更新变量V,如果V的值等于期望值E,那么将其更新为N,如果V的值不等于期望值E,则说明有其它线程跟新了这个变量,此时不会执行更新操作,而是重新读取该变量的值再次尝试调用compareAndSwap来更新。
可见CAS其实存在一个循环的过程,如果有多个线程在同时修改这一个变量V,在修改之前会先拿到这个变量的值,再和变量对比看是否相等,如果相等,则说明没有其它线程修改这个变量,自己更新变量即可。如果发现要修改的变量和期望值不一样,则说明再读取变量V的值后,有其它线程对变量V做了修改,那么,放弃本次更新,重新读变量V的值,并再次尝试修改,直到修改成功为止。这个循环过程一般也称作自旋 ,CAS操作的整个过程如下图所示:
![CAS流程图](https://cos.asuka-xun.cc/blog/background pictureCAS.jpg)
应用
在应用中CAS可以用于实现无锁数据结构,常见的有无锁队列(先入先出 以及无锁栈(先入后出)。对于可在任意位置插入数据的链表以及双向链表,实现无锁操作的难度较大。
缺陷
ABA问题
ABA问题是无锁结构实现中常见的一种问题,可基本表述为:
进程P1读取了一个数值A
P1被挂起(时间片耗尽、中断等),进程P2开始执行
P2修改数值A为数值B,然后又修改回A
P1被唤醒,比较后发现数值A没有变化,程序继续执行。
对于P1来说,数值A未发生过改变,但实际上A已经被变化过了,继续使用可能会出现问题。在CAS操作中,由于比较的多是指针,这个问题将会变得更加严重。试想如下情况:
1 2 3 4 5 top | V 0x0014 | Node A | --> | Node X | --> ……
有一个栈(先入后出)中有top和节点A,节点A目前位于栈顶top指针指向A。现在有一个进程P1想要pop一个节点,因此按照如下无锁操作进行
1 2 3 4 5 6 7 8 pop() { do{ ptr = top; // ptr = top = NodeA next_prt = top->next; // next_ptr = NodeX } while(CAS(top, ptr, next_ptr) != true); return ptr; }
而进程P2在执行CAS操作之前打断了P1,并对栈进行了一系列的pop和push操作,使栈变为如下结构:
1 2 3 4 5 top | V 0x0014 | Node C | --> | Node B | --> | Node X | --> ……
进程P2首先pop出NodeA,之后又push了两个NodeB和C,由于内存管理机制中广泛使用的内存重用机制,导致NodeC的地址与之前的NodeA一致。
这时P1又开始继续运行,在执行CAS操作时,由于top依旧指向的是NodeA的地址(实际上已经变为NodeC),因此将top的值修改为了NodeX,这时栈结构如下:
1 2 3 4 top | 0x0014 V | Node C | --> | Node B | --> | Node X | --> ……
经过CAS操作后,top指针错误地指向了NodeX而不是NodeB。
直接cv维基百科 。
线程池
使用线程池的好处:
提高响应速度(减少了创建新线程的时间)
降低资源消耗(重复利用线程池中线程,不需要每次都创建)
便于线程管理()
corePoolSize:核心池的大小
maximumPoolSize:最大线程数
keepAliveTime:线程没有任务时最多保持多长时间后会终止。
Java提供ExecutorService
接口表示线程池。
详细用法看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class TestPool { public static void main (String[] args) { ExecutorService service = Executors.newFixedThreadPool(10 ); service.execute(new MyThread ()); service.execute(new MyThread ()); service.execute(new MyThread ()); service.execute(new MyThread ()); service.execute(new MyThread ()); service.shutdown(); } } class MyThread implements Runnable { @Override public void run () { System.out.println(Thread.currentThread().getName()); } }
输出结果:
1 2 3 4 5 pool-1 -thread-1 pool-1 -thread-4 pool-1 -thread-3 pool-1 -thread-2 pool-1 -thread-5
线程池在程序结束的时候要关闭。使用shutdown()
方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()
会立刻停止正在执行的任务,awaitTermination()
则会等待指定的时间让线程池关闭。
Java提供ExecutorService
接口的几个实现类有:
FixedThreadPool:线程数固定的线程池;(上面那个代码就使用这个实现类)
CachedThreadPool:线程数根据任务动态调整的线程池;
SingleThreadExecutor:仅单线程执行的线程池。
动态限制线程池的大小CachedThreadPool
,可以根据Executors.newCachedThreadPool()
方法的源码:
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
第一个参数是创建最小的线程、第二个是最大进程、第三个是未使用60秒的线程将被终止并从缓存中删除。
利用ScheduledThreadPool实现定时任务
需要反复执行的任务使用ScheduledThreadPool
。
1 ScheduledExecutorService ses = Executors.newScheduledThreadPool(4 );
我们可以提交一次性任务,它会在指定延迟后只执行一次:
1 2 3 4 5 6 7 8 9 // 1秒后执行一次性任务: ses.schedule(new Mythread(), 1, TimeUnit.SECONDS); // 参数 schedule(Runnable command, long delay, TimeUnit unit); // 另一种写法 schedule(Callable<V> callable, long delay, TimeUnit unit);
如果任务以固定的每3秒执行,我们可以这样写:
1 2 3 4 5 6 7 8 // 2秒后开始执行定时任务,每3秒执行: ses.scheduleAtFixedRate(new Mythread(), 2, 3, TimeUnit.SECONDS); // 参数 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
如果任务以固定的3秒为间隔执行,我们可以这样写:
1 2 3 4 5 6 7 8 // 2秒后开始执行定时任务,以3秒为间隔执行: ses.scheduleWithFixedDelay(new Mythread(), 2, 3, TimeUnit.SECONDS); // 参数 scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
注意scheduleAtFixedRate
和scheduleWithFixedDelay
的区别。scheduleAtFixedRate
是指任务总是以固定时间间隔触发,不管任务执行多长时间(不管前面任务是否执行完毕) (我还不确定) :
1 2 3 │░░░░ │░░░░░░ │░░░ │░░░░░ │░░░ ├───────┼───────┼───────┼───────┼────> │<─────>│<─────>│<─────>│<─────>│
而scheduleWithFixedDelay
是指,上一次任务执行完毕后(上一次的任务执行完毕后,再开始计时) ,等待固定的时间间隔,再执行下一次任务:
1 2 3 │░░░│ │░░░░░│ │░░│ │░ └───┼───────┼─────┼───────┼──┼───────┼──> │<─────>│ │<─────>│ │<─────>│
这有个坑,但是不知道原因。
我自己测试scheduleAtFixedRate
的时候是出现执行完任务才进行下一次任务的执行,而不是以固定时间间隔触发。
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class TestPool { public static void main (String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10 ); ses.scheduleAtFixedRate(new MyThread (), 2 , 3 , TimeUnit.SECONDS); } } class MyThread implements Runnable { public static int count = 0 ; @Override public void run () { count++; System.out.println(Thread.currentThread().getName() + "现在时间是" + LocalDateTime.now()); try { Thread.sleep(5000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } } }
ForkJoin
这是另一种线程池,它主要功能是把一个大任务拆成多个小任务并行执行 。
利用分治的思想:通过分解任务,并行执行,最后合并结果得到最终结果。
摆烂了,不写了。之后一定看。