多线程
1. 并发并行 并行:两个以上事件同一时刻发生
并发:两个以上事件同一时段发生
2. 程序 为完成特定任务,用某种语言编写的一组指令的集合。指一段静态代码
3. 进程(QQ.exe) 处于运行过程中的运行在内存中的应用程序。占用内存,进入到内存的程序。
拥有独立空间,动态性,并发性
是系统运行程序的基本单位。
系统运行一个程序是一个进程从创建,运行到消亡的过程。
4. 线程(QQ多人聊天) 进程的基本执行单位。
负责当前进程中程序的执行。
一个进程可以有n个线程。
JVM执行main方法,栈开辟一条main方法通向cpu的路径,创建线程会开辟一条通向cpu的新路径,用来执行run方法。对于两条路径,cpu自行选择。抢占式调用。
5. 创建线程 调用的是start方法,使其创建一个执行run方法的新线程
5.1. extends Thread 1 2 3 4 5 6 7 8 9 10 public class test extends Thread { @override public void run () {} public static void main (String[] args) { test t = new test ; t.start(); } }
通过new Thread()方式,创造了新的实例变量,线程之间不存在 资源共享
5.2. implements Runnable 1 2 3 4 5 6 7 8 9 public class test implements Runable { public void run () { } public static void main (String[] args) { test t = new test ; new Thread (t,"Test" ).start(); } }
避免了单继承的局限性
线程之间资源共享
5.3. implements Callable
6. 线程状态
New
jvm分配内存,初始化成员变量值,此时的线程等同于普通类
调用.start() 线程进入就绪状态,等待cpu的调度
Runnable
线程是可运行的状态,但是否运行还是取决于cpu的分配,看是否拥有cpu的时间片。
线程争取cpu,执行run()内的代码,执行时长也为时间片长
Blocked
时间片用完
发生阻塞式IO
sleep()方法主动放弃处理器资源
同步资源被锁定
等待通知notify
线程进入阻塞状态
Waiting
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 WAITING,
解除了阻塞原因后,线程重新进入就绪状态
Timed waiting
1 2 3 4 5 6 7 8 9 10 11 12 13 TIMED_WAITING,
Terminated
线程结束运行,或调用了stop方法,被error/exception
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import java.lang.Thread.State;public class ThreadState { public static void main (String[] args) { State state = null ; Thread thread= new Thread (()->{ for (int i=0 ;i<3 ;i++) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); state=thread.getState(); System.out.println(state); thread.start(); state = thread.getState(); System.out.println(state); while (state!=Thread.State.TERMINATED) { state = thread.getState(); System.out.println(state); } state = thread.getState(); System.out.println(state); } }
7. 控制线程 7.1. interrupt() 请求终止线程,线程的中断状态将被置位
1 boolean flag = Thread.currentThread().isInterrupted();
void interrupt()
向线程发送中断请求,线程中断状态被设置为true。如果目前该线程被一个sleep调用阻塞,InterruptedException异常被抛出。
boolean isInterrupted()
判断中断状态,不改变中断状态
static boolean interrupted()
测试当前线程是否被中断。会清除线程的中断状态,将中断状态设置为false
7.2. join() 插队 new Thread().join() 等待该线程执行完才执行其他线程
7.3. 守护线程 为其他线程提供服务
new Thread().setDeamon(true)
jvm不会等待该线程运行结束。当整个虚拟机中只有这种线程时,虚拟机选择退出
7.4. sleep() Thread.sleep(long millis) 暂停当前线程进入阻塞状态,不释放资源
7.5. yield() 礼让 Thread.yield() 该线程主动让出时间片进入就绪状态
7.6. wait() 该方法来自于Object类
1 public final native void wait (long timeout) throws InterruptedException;
阻塞线程,让出cpu时间片及资源,等待notify/notifyAll方法的唤醒
7.7. notify() 该方法来自于Object类
1 public final native void notify () ;
唤醒除当前线程以外的线程
7.8. 改变线程优先级 优先级也不能保证线程执行的先后顺序
new Thread().setPriority(Priority.MIN_PRORITY) 1
Priority.MAX_PRORITY 10
Priority.NORM_PRORITY 5
8. Volatile域 声明一个域为volatile,编译器和虚拟机就知道该域可以被多个线程并发更新
1 private volatile boolean done;
volatile并不保证域中值的一致性
9. 竞争 9.1. ReentrantLock 可重入锁 锁可以延续使用并拥有计数器
一旦线程进入就封锁锁对象,其他线程无法通过lock语句,被阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.util.concurrent.locks.ReentrantLock;ReentrantLock lock = new ReentrantLock ();lock.lock(); lock.unlock(); public class ReentrantLock { private boolean isLocked=false ; public Thread lockedBy=null ; private int holdCount=0 ; public synchronized void lock () { Thread t = Thread.currentThread(); while (isLocked&&lockedBy!=t){ this .wait(); } isLocked=true ; lockedBy=t; holdCount++; } public synchronized void unlock () { if (Thread.currentThread()==lockBy){ holdCount--; if (holdCount==0 ){ isLocked=false ; this .notify(); } } } }
9.2. synchronized 同步 可以是同步块,同步方法
1 2 3 4 5 6 7 8 synchronized (obj){ } private synchronized void method () { }
一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法,其他线程都只能等待,锁的是当前的this,实例对象
而类中的普通方法和同步锁无关
当有多个对象时,就不是同一把锁了,每个对象的方法可以并发运行
静态同步方法中,锁的是整个类 .class
同步方法块中,synchronized锁的是括号里的代码
锁在退出或异常退出后必须释放锁,防止其他线程饥饿等待
代码例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public Bank method () { synchronized (this ){ if (instance==null ){ instance = new Bank (); } } return instance; } public Bank method () { if (instance==null ){ synchronized (this ){ if (instance==null ){ instance = new Bank (); } } } return instance; }
9.3. 影院购票 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 import java.util.ArrayList;import java.util.List;public class Movie { public static void main (String[] args) { List<Integer> seats = new ArrayList <Integer>(); seats.add(1 ); seats.add(3 ); seats.add(6 ); seats.add(4 ); Cinema cinema = new Cinema (seats,"Happy Cinema" ); List<Integer> need = new ArrayList <Integer>(); need.add(1 ); need.add(3 ); Customer c1 = new Customer (cinema,need); new Thread (c1,"A" ).start(); List<Integer> need2 = new ArrayList <Integer>(); need2.add(6 ); Customer c2 = new Customer (cinema,need2); new Thread (c2,"B" ).start(); } } class Cinema { private List<Integer> seats; String name; public Cinema (List<Integer> seats,String name) { this .seats = seats; this .name=name; } public boolean book (List<Integer> need) { System.out.println("可用位置:" +seats); List<Integer> copyList = new ArrayList <Integer>(); copyList.addAll(seats); copyList.removeAll(need); int seatNum = seats.size()-copyList.size(); if (seatNum!=need.size()) { return false ; } seats=copyList; return true ; } public String getName () { return name; } public List<Integer> getSeats () { return seats; } } class Customer implements Runnable { private Cinema cinema; List<Integer> need; public Customer (Cinema cinema,List<Integer> need) { this .cinema=cinema; this .need=need; } public void run () { synchronized (cinema) { boolean flag = cinema.book(need); if (flag) { System.out.println("购" +cinema.getName()+"票成功,购票人" +Thread.currentThread().getName()+",位置为" +need +" 剩余位置 " +cinema.getSeats()); }else { System.out.println("购" +cinema.getName()+"票失败,购票人" +Thread.currentThread().getName()+",位置不够" ); } } } }
9.4. 购火车票 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class web12306 { public static void main (String[] args) { web w = new web (3 ,"火车票网" ); new Passanger (w,"A" ,2 ).start(); new Passanger (w,"B" ,2 ).start(); } } class Passanger extends Thread { private int seats; public Passanger (Runnable target,String name,int seats) { super (target,name); this .seats=seats; } public int getSeats () { return seats; } } class web implements Runnable { private int ticket; private String name; public web (int ticket,String name) { this .ticket=ticket; this .name=name; } private boolean book (int need) { System.out.println("可用票 " +ticket); if (need>ticket) { return false ; } ticket-=need; return true ; } public void run () { synchronized (this ) { Passanger p=(Passanger) Thread.currentThread(); boolean flag = book(p.getSeats()); if (flag) { System.out.println(Thread.currentThread().getName()+ " 购" +name+" 票成功 " ); } else { System.out.println(Thread.currentThread().getName()+" 购" +name+" 票失败" ); } } } }
9.5. 死锁处理-生产者消费者 9.6. 死锁问题 1 2 3 4 5 6 7 synchronized (A){ synchronized (B){} } synchronized (B){ synchronized (A){} }
互占资源
9.6.1. 管程法 利用一个缓冲区暂存数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public class siShuo { public static void main (String[] args) { Container c = new Container (); new Productor (c).start(); new Customers (c).start(); } } class Productor extends Thread { Container container; public Productor (Container container) { this .container = container; } public void run () { for (int i=1 ;i<=100 ;i++) { System.out.println("生产第 " +i+" 个馒头" ); container.push(new Steam (i)); } } } class Customers extends Thread { Container container; public Customers (Container container) { this .container = container; } public void run () { for (int i=1 ;i<=100 ;i++) { System.out.println("消费第 " +container.pop().id+" 个馒头" ); } } } class Container { Steam[] steam = new Steam [10 ]; int count=0 ; public synchronized void push (Steam s) { if (count==steam.length) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } steam[count++]=s; this .notifyAll(); } public synchronized Steam pop () { if (count==0 ) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Steam sm=steam[--count]; this .notifyAll(); return sm; } } class Steam { int id; public Steam (int id) { this .id = id; } }
9.6.2. 信号灯法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class siShuo2 { public static void main (String[] args) { TV tv=new TV (); new Productors2 (tv).start(); new Customers2 (tv).start(); } } class Productors2 extends Thread { TV tv; public Productors2 (TV tv) { this .tv = tv; } public void run () { for (int i=0 ;i<10 ;i++) { if (i%2 ==0 ) { tv.play("综艺" ); }else { tv.play("电影" ); } } } } class Customers2 extends Thread { TV tv; public Customers2 (TV tv) { this .tv = tv; } public void run () { for (int i=0 ;i<10 ;i++) { tv.watch(); } } } class TV { String voice; boolean flag = true ; public synchronized void play (String voice) { if (!flag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("表演了:" +voice); this .voice=voice; this .notifyAll(); this .flag=!flag; } public synchronized void watch () { if (flag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("听到了:" +voice); this .notifyAll(); this .flag=!flag; } }
10. 调度任务Timer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Timer timer = new Timer ();timer.schedule(TimerTask task,long delay, long period); timer.schedule(TimerTask task,Date firstTime, long period); timer.schedule(TimerTask task,long delay); import java.util.Timer;import java.util.TimerTask;public class TimeTest extends TimerTask { public static void main (String[] args) { Timer timer = new Timer (); timer.schedule(new TimeTest (),1000 ); } @Override public void run () { for (int i = 0 ; i < 10 ; i++) { System.out.println("Hello" ); } } }
11. ThreadLocal 1 2 3 4 5 6 7 8 9 10 11 private static ThreadLocal<Integer> tLocal = new ThreadLocal <Integer>() { @Override protected Integer initialValue () { return 20 ; } }; threadLocal = ThreadLocal.withInitial(()->20 ); threadLocal.get(); threadLocal.set(10 );
12. 线程池 经常创建和销毁线程,对性能影响很大。使用线程池可以实现重复利用。
减少创建线程的时间,提高响应速度
重复利用线程池中的线程,降低资源消耗
便于线程管理
12.1. 案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test void testSystem () throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(2 ); Future<String> submit = service.submit(new Callable <String>() { @Override public String call () throws Exception { return "Thread" ; } }); String s = submit.get(); System.out.println(s); }
提交指定的任务去执行:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
12.2. ThreadPoolExecutor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ThreadPoolExecutor extends AbstractExecutorService { private int corePoolSize; private volatile long keepAliveTime; private volatile int maximumPoolSize; public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {} public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } }
12.3. newFixedThreadPool 该池包含固定数量的线程,空闲线程会一直被保留
1 ExecutorService pool = Executors.newFixedThreadPool(2 );
源码
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
12.4. newCachedThreadPool 必要时创建新线程,空闲线程会被保留60秒
1 ExecutorService pool = Executors.newCachedThreadPool();
源码
1 2 3 4 5 6 7 8 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
12.5. newSingleThreadExecutor 只有一个线程的”池”,该线程顺序执行每一个提交的任务
1 ExecutorService ex = Executors.newSingleThreadExecutor();
源码
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()));
12.6. newScheduledThreadPool 用于预定执行而构建的固定线程池
1 ExecutorService pool = Executors.newScheduledThreadPool(2 );
源码
1 2 3 4 5 6 7 8 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
使用ScheduledThreadPoolExecutor需要注意的问题
ExecutorService的十个使用技巧
从零单排 Java Concurrency, SkipList&ConcurrnetSkipListMap
13. CompletableFuture 13.1. thenApply(T->U) 对结果应用一个函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <U> CompletableFuture<U> thenApply ( Function<? super T,? extends U> fn) { return uniApplyStage(null , fn); } private <V> CompletableFuture<V> uniApplyStage ( Executor e, Function<? super T,? extends V> f) { if (f == null ) throw new NullPointerException (); CompletableFuture<V> d = new CompletableFuture <V>(); if (e != null || !d.uniApply(this , f, null )) { UniApply<T,V> c = new UniApply <T,V>(e, d, this , f); push(c); c.tryFire(SYNC); } return d; }
13.2. thenCompose(T->U) 对结果调用函数并执行返回的future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public <U> CompletableFuture<U> thenCompose ( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null , fn); } private <V> CompletableFuture<V> uniComposeStage ( Executor e, Function<? super T, ? extends CompletionStage<V>> f) { if (f == null ) throw new NullPointerException (); Object r; Throwable x; if (e == null && (r = result) != null ) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null ) { return new CompletableFuture <V>(encodeThrowable(x, r)); } r = null ; } try { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); Object s = g.result; if (s != null ) return new CompletableFuture <V>(encodeRelay(s)); CompletableFuture<V> d = new CompletableFuture <V>(); UniRelay<V> copy = new UniRelay <V>(d, g); g.push(copy); copy.tryFire(SYNC); return d; } catch (Throwable ex) { return new CompletableFuture <V>(encodeThrowable(ex)); } } CompletableFuture<V> d = new CompletableFuture <V>(); UniCompose<T,V> c = new UniCompose <T,V>(e, d, this , f); push(c); c.tryFire(SYNC); return d; }
13.3. handle((T,Throwable)->U) 处理结果或错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public <U> CompletableFuture<U> handle ( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null , fn); } private <V> CompletableFuture<V> uniHandleStage ( Executor e, BiFunction<? super T, Throwable, ? extends V> f) { if (f == null ) throw new NullPointerException (); CompletableFuture<V> d = new CompletableFuture <V>(); if (e != null || !d.uniHandle(this , f, null )) { UniHandle<T,V> c = new UniHandle <T,V>(e, d, this , f); push(c); c.tryFire(SYNC); } return d; }
13.4. thenAccept(T->void) 类似于thenApply方法,结果为void
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public CompletableFuture<Void> thenAccept (Consumer<? super T> action) { return uniAcceptStage(null , action); } private CompletableFuture<Void> uniAcceptStage (Executor e,Consumer<? super T> f) { if (f == null ) throw new NullPointerException (); CompletableFuture<Void> d = new CompletableFuture <Void>(); if (e != null || !d.uniAccept(this , f, null )) { UniAccept<T> c = new UniAccept <T>(e, d, this , f); push(c); c.tryFire(SYNC); } return d; }
13.5. whenComplete((T,Throwable)->void) 类似于handle,结果为void
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public CompletableFuture<T> whenComplete ( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null , action); } private CompletableFuture<T> uniWhenCompleteStage ( Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null ) throw new NullPointerException (); CompletableFuture<T> d = new CompletableFuture <T>(); if (e != null || !d.uniWhenComplete(this , f, null )) { UniWhenComplete<T> c = new UniWhenComplete <T>(e, d, this , f); push(c); c.tryFire(SYNC); } return d; }
13.6. thenRun(Runnable) 执行Runnable,结果为void
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } private CompletableFuture<Void> uniRunStage (Executor e, Runnable f) { if (f == null ) throw new NullPointerException (); CompletableFuture<Void> d = new CompletableFuture <Void>(); if (e != null || !d.uniRun(this , f, null )) { UniRun<T> c = new UniRun <T>(e, d, this , f); push(c); c.tryFire(SYNC); } return d; }
14. 同步器 14.1. Semaphore信号量 允许线程等待直到被允许继续运行为止,限制访问资源的线程总数,如果许可数为1,常常阻塞线程直到另一个线程给出许可为止。
14.2. CountDownLatch倒计时门栓 允许线程集等待直到计数器减为0,当一个或多个线程需要等待直到指定数目的事件发生
14.3. CyclicBarrier障珊 允许线程集等待直至其中预定数目的线程到达一个公共障珊,染灰可以选择执行一个处理障珊的动作
14.4. Exchanger交换器 允许两个线程在要交换的对象准备好时交换对象,当两个线程工作在同一个数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据
14.5. SynchronousQueue同步队列 允许一个线程把对象交给另一个对象,在没有显式同步情况下,当两个线程准备好将一个对象从一个线程传递到另一个时