Java基础知识-线程

多线程

1. 并发并行

并行:两个以上事件同一时刻发生

并发:两个以上事件同一时段发生

2. 程序

为完成特定任务,用某种语言编写的一组指令的集合。指一段静态代码

3. 进程(QQ.exe)

处于运行过程中的运行在内存中的应用程序。占用内存,进入到内存的程序。

拥有独立空间,动态性,并发性

是系统运行程序的基本单位。

系统运行一个程序是一个进程从创建,运行到消亡的过程。

4. 线程(QQ多人聊天)

进程的基本执行单位。

负责当前进程中程序的执行。

一个进程可以有n个线程。

JVM执行main方法,栈开辟一条main方法通向cpu的路径,创建线程会开辟一条通向cpu的新路径,用来执行run方法。对于两条路径,cpu自行选择。抢占式调用。

5. 创建线程

调用的是start方法,使其创建一个执行run方法的新线程

5.1. extends Thread

1
2
3
4
5
6
7
8
9
10
public class test extends Thread{

@override
public void run(){}

public static void main(String[] args){
test t = new test;
t.start();
}
}

通过new Thread()方式,创造了新的实例变量,线程之间不存在资源共享

5.2. implements Runnable

1
2
3
4
5
6
7
8
9
public class test implements Runable{

public void run(){ }

public static void main(String[] args){
test t = new test;
new Thread(t,"Test").start();
}
}

避免了单继承的局限性

线程之间资源共享

5.3. implements Callable

  • call()可以有返回值

  • call()可以抛出异常

  • 支持泛型的返回值

  • 通过FutureTask获取返回结果

    FutureTask类实现了Futrue接口,通过get()方法得到call()方法返回值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable) {}
    }

    public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
    }

    public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
    V get() throws InterruptedException, ExecutionException;
    }

    用FutureTask实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    FutureTask task = new FutureTask(new Callable() {
    int sum = 0;
    @Override
    public Object call() throws Exception {
    for (int i = 1; i < 10; i++) {
    System.out.println(sum);
    sum+=i;
    }
    return sum;
    }
    });
    new Thread(task).start();

    try {
    Object result = task.get();
    System.out.println("final:"+result);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }

    用线程池实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class CallableTest implements Callable<String>{
    private int i=10;
    @Override
    public String call() throws Exception {
    i--;
    return Thread.currentThread().getName()+" "+i;
    }

    public static void main(String[] args) {

    //创建线程
    ExecutorService ser = Executors.newFixedThreadPool(1);
    //提交执行
    Future<String> result1 = ser.submit(new CallableTest());
    //获取结果
    try {
    String r1 = result1.get();
    System.out.println(r1);
    } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
    }
    //关闭服务
    ser.shutdown();
    }
    }

6. 线程状态

  • New

    1
    2
    3
    4
    /**
    * Thread state for a thread which has not yet started.
    */
    NEW,

    jvm分配内存,初始化成员变量值,此时的线程等同于普通类

    调用.start() 线程进入就绪状态,等待cpu的调度

  • Runnable

    1
    2
    3
    4
    5
    6
    7
    /**
    * Thread state for a runnable thread. A thread in the runnable
    * state is executing in the Java virtual machine but it may
    * be waiting for other resources from the operating system
    * such as processor.
    */
    RUNNABLE;

    线程是可运行的状态,但是否运行还是取决于cpu的分配,看是否拥有cpu的时间片。

    线程争取cpu,执行run()内的代码,执行时长也为时间片长

  • Blocked

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * Thread state for a thread blocked waiting for a monitor lock.
    * A thread in the blocked state is waiting for a monitor lock
    * to enter a synchronized block/method or
    * reenter a synchronized block/method after calling
    * {@link Object#wait() Object.wait}.
    */
    BLOCKED,

    时间片用完

    发生阻塞式IO

    sleep()方法主动放弃处理器资源

    同步资源被锁定

    等待通知notify

    线程进入阻塞状态

  • Waiting

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * Thread state for a waiting thread.
    * A thread is in the waiting state due to calling one of the
    * following methods:
    * <ul>
    * <li>{@link Object#wait() Object.wait} with no timeout</li>
    * <li>{@link #join() Thread.join} with no timeout</li>
    * <li>{@link LockSupport#park() LockSupport.park}</li>
    * </ul>
    *
    * <p>A thread in the waiting state is waiting for another thread to
    * perform a particular action.
    *
    * For example, a thread that has called <tt>Object.wait()</tt>
    * on an object is waiting for another thread to call
    * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
    * that object. A thread that has called <tt>Thread.join()</tt>
    * is waiting for a specified thread to terminate.
    */
    WAITING,

    解除了阻塞原因后,线程重新进入就绪状态

  • Timed waiting

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * Thread state for a waiting thread with a specified waiting time.
    * A thread is in the timed waiting state due to calling one of
    * the following methods with a specified positive waiting time:
    * <ul>
    * <li>{@link #sleep Thread.sleep}</li>
    * <li>{@link Object#wait(long) Object.wait} with timeout</li>
    * <li>{@link #join(long) Thread.join} with timeout</li>
    * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
    * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
    * </ul>
    */
    TIMED_WAITING,
  • Terminated

    1
    2
    3
    4
    5
    /**
    * Thread state for a terminated thread.
    * The thread has completed execution.
    */
    TERMINATED;

    线程结束运行,或调用了stop方法,被error/exception

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.lang.Thread.State;

public class ThreadState{

public static void main(String[] args) {

State state = null;
Thread thread= new Thread(()->{
for(int i=0;i<3;i++)
{
try {
Thread.sleep(1000); //TIMED_WAITING

} catch (InterruptedException e) {
e.printStackTrace();
}
}

});

state=thread.getState();
System.out.println(state);//NEW

thread.start(); //RUNNABLE
state = thread.getState();
System.out.println(state);

while(state!=Thread.State.TERMINATED)
{
state = thread.getState();
System.out.println(state);
}

//TERMINATED
state = thread.getState();
System.out.println(state);
}
}

7. 控制线程

7.1. interrupt()

请求终止线程,线程的中断状态将被置位

1
boolean flag = Thread.currentThread().isInterrupted();

void interrupt()向线程发送中断请求,线程中断状态被设置为true。如果目前该线程被一个sleep调用阻塞,InterruptedException异常被抛出。

boolean isInterrupted() 判断中断状态,不改变中断状态

static boolean interrupted()测试当前线程是否被中断。会清除线程的中断状态,将中断状态设置为false

7.2. join() 插队

new Thread().join() 等待该线程执行完才执行其他线程

7.3. 守护线程

为其他线程提供服务

new Thread().setDeamon(true)

jvm不会等待该线程运行结束。当整个虚拟机中只有这种线程时,虚拟机选择退出

7.4. sleep()

Thread.sleep(long millis) 暂停当前线程进入阻塞状态,不释放资源

7.5. yield() 礼让

Thread.yield() 该线程主动让出时间片进入就绪状态

7.6. wait()

该方法来自于Object类

1
public final native void wait(long timeout) throws InterruptedException;

阻塞线程,让出cpu时间片及资源,等待notify/notifyAll方法的唤醒

7.7. notify()

该方法来自于Object类

1
public final native void notify();

唤醒除当前线程以外的线程

7.8. 改变线程优先级

优先级也不能保证线程执行的先后顺序

new Thread().setPriority(Priority.MIN_PRORITY) 1

Priority.MAX_PRORITY 10

Priority.NORM_PRORITY 5

8. Volatile域

声明一个域为volatile,编译器和虚拟机就知道该域可以被多个线程并发更新

1
private volatile boolean done;

volatile并不保证域中值的一致性

9. 竞争

9.1. ReentrantLock 可重入锁

锁可以延续使用并拥有计数器

一旦线程进入就封锁锁对象,其他线程无法通过lock语句,被阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.locks.ReentrantLock;

ReentrantLock lock = new ReentrantLock();
lock.lock();
//...
lock.unlock();

public class ReentrantLock{
private boolean isLocked=false;
public Thread lockedBy=null;
private int holdCount=0;

public synchronized void lock(){
Thread t = Thread.currentThread();
while(isLocked&&lockedBy!=t){
this.wait();
}
isLocked=true;
lockedBy=t;
holdCount++;
}

public synchronized void unlock(){
if(Thread.currentThread()==lockBy){
holdCount--;
if(holdCount==0){
isLocked=false;
this.notify();
}
}
}
}

9.2. synchronized 同步

可以是同步块,同步方法

1
2
3
4
5
6
7
8
synchronized(obj){

}

//锁的是对象的资源
private synchronized void method(){

}

一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法,其他线程都只能等待,锁的是当前的this,实例对象

而类中的普通方法和同步锁无关

当有多个对象时,就不是同一把锁了,每个对象的方法可以并发运行

静态同步方法中,锁的是整个类 .class

同步方法块中,synchronized锁的是括号里的代码

锁在退出或异常退出后必须释放锁,防止其他线程饥饿等待

代码例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//代码段一
public Bank method(){
/*多个线程必须阻塞等待,即使资源没有后,线程都需要等待判断资源情况*/
synchronized(this){
if(instance==null){
instance = new Bank();
}
}
return instance;
}

//代码段二-double check
public Bank method(){
/*多个线程直接判断资源情况,减少了等待时间*/
if(instance==null){
synchronized(this){
/*每个线程判断资源情况*/
if(instance==null){
instance = new Bank();
}
}
}
return instance;
}

9.3. 影院购票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.util.ArrayList;
import java.util.List;

public class Movie {

public static void main(String[] args) {
List<Integer> seats = new ArrayList<Integer>();
seats.add(1);
seats.add(3);
seats.add(6);
seats.add(4);
Cinema cinema = new Cinema(seats,"Happy Cinema");

List<Integer> need = new ArrayList<Integer>();
need.add(1);
need.add(3);
Customer c1 = new Customer(cinema,need);
new Thread(c1,"A").start();

List<Integer> need2 = new ArrayList<Integer>();
need2.add(6);
//need2.add(4);
Customer c2 = new Customer(cinema,need2);
new Thread(c2,"B").start();

}

}
class Cinema{
private List<Integer> seats;
String name;
public Cinema(List<Integer> seats,String name)
{
this.seats = seats;
this.name=name;
}

//购票
public boolean book(List<Integer> need)
{
System.out.println("可用位置:"+seats);
List<Integer> copyList = new ArrayList<Integer>();
//拷贝
copyList.addAll(seats);

//差
copyList.removeAll(need);
//System.out.println(seats+" "+copyList);
int seatNum = seats.size()-copyList.size();
if(seatNum!=need.size())
{
return false;
}
seats=copyList;
return true;
}

public String getName() {
return name;
}

public List<Integer> getSeats() {
return seats;
}

}
class Customer implements Runnable{
private Cinema cinema;
List<Integer> need;
public Customer(Cinema cinema,List<Integer> need)
{
this.cinema=cinema;
this.need=need;
}
public void run()
{
synchronized(cinema)
{
boolean flag = cinema.book(need);
if(flag)
{
System.out.println("购"+cinema.getName()+"票成功,购票人"
+Thread.currentThread().getName()+",位置为"+need
+" 剩余位置 "+cinema.getSeats());
}else {
System.out.println("购"+cinema.getName()+"票失败,购票人"+Thread.currentThread().getName()+",位置不够");
}

}
}

}

9.4. 购火车票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class web12306 {

public static void main(String[] args) {
web w = new web(3,"火车票网");
new Passanger(w,"A",2).start();
new Passanger(w,"B",2).start();

}

}
class Passanger extends Thread{
private int seats;
public Passanger(Runnable target,String name,int seats)
{
super(target,name);
this.seats=seats;
}
public int getSeats() {
return seats;
}
}
class web implements Runnable{
private int ticket;
private String name;
public web(int ticket,String name)
{
this.ticket=ticket;
this.name=name;
}
private boolean book(int need)
{
System.out.println("可用票 "+ticket);
if(need>ticket)
{
return false;
}
ticket-=need;
return true;
}
public void run()
{
synchronized(this) {
Passanger p=(Passanger) Thread.currentThread();
boolean flag = book(p.getSeats());
if(flag)
{
System.out.println(Thread.currentThread().getName()+ " 购"+name+" 票成功 ");
}
else {
System.out.println(Thread.currentThread().getName()+" 购"+name+" 票失败");
}
}
}
}

9.5. 死锁处理-生产者消费者

9.6. 死锁问题

1
2
3
4
5
6
7
synchronized(A){
synchronized(B){}
}

synchronized(B){
synchronized(A){}
}

互占资源

9.6.1. 管程法

利用一个缓冲区暂存数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

public class siShuo {

public static void main(String[] args) {
Container c = new Container();
new Productor(c).start();
new Customers(c).start();
}

}
//生产者
class Productor extends Thread{
Container container;

public Productor(Container container) {
this.container = container;
}

public void run()
{
for(int i=1;i<=100;i++)
{
System.out.println("生产第 "+i+" 个馒头");
container.push(new Steam(i));
}
}
}
//消费者
class Customers extends Thread{
Container container;

public Customers(Container container) {
this.container = container;
}
public void run()
{
for(int i=1;i<=100;i++)
{
System.out.println("消费第 "+container.pop().id+" 个馒头");

}
}
}
//缓冲区
class Container{
Steam[] steam = new Steam[10];
int count=0;
public synchronized void push(Steam s)
{
if(count==steam.length)
{
try {
this.wait();//线程阻塞 消费者通知生产解除
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产
steam[count++]=s;
this.notifyAll();//通知消费
}

public synchronized Steam pop()
{
if(count==0)
{
try {
this.wait(); //线程阻塞 生产者通知消费解除
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费
Steam sm=steam[--count];
this.notifyAll();//通知生产
return sm;
}
}
//馒头
class Steam{
int id;

public Steam(int id) {
this.id = id;
}

}

9.6.2. 信号灯法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

public class siShuo2 {

public static void main(String[] args) {
TV tv=new TV();
new Productors2(tv).start();
new Customers2(tv).start();
}

}
class Productors2 extends Thread{
TV tv;

public Productors2(TV tv) {
this.tv = tv;
}
public void run() {
for(int i=0;i<10;i++)
{
if(i%2==0)
{
tv.play("综艺");
}else {
tv.play("电影");
}
}
}

}
class Customers2 extends Thread{
TV tv;
public Customers2(TV tv) {
this.tv = tv;
}
public void run() {
for(int i=0;i<10;i++)
{
tv.watch();
}
}
}
class TV{
String voice;
//信号灯
//T-演员表演 观众等待
//F-观众观看 演员等待
boolean flag = true;

public synchronized void play(String voice) {
if(!flag)
{
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("表演了:"+voice);
this.voice=voice;
this.notifyAll();
this.flag=!flag;
}
public synchronized void watch()
{
if(flag)
{
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("听到了:"+voice);
this.notifyAll();
this.flag=!flag;
}

}

10. 调度任务Timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Timer timer = new Timer();

//运行n次
timer.schedule(TimerTask task,long delay, long period);
timer.schedule(TimerTask task,Date firstTime, long period);
//仅运行一次
timer.schedule(TimerTask task,long delay);

//example
import java.util.Timer;
import java.util.TimerTask;
public class TimeTest extends TimerTask{

public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimeTest(),1000);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Hello");
}
}
}

11. ThreadLocal

1
2
3
4
5
6
7
8
9
10
11
private static ThreadLocal<Integer> tLocal = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 20;
}
};

threadLocal = ThreadLocal.withInitial(()->20);

threadLocal.get();
threadLocal.set(10);

12. 线程池

经常创建和销毁线程,对性能影响很大。使用线程池可以实现重复利用。

  • 减少创建线程的时间,提高响应速度
  • 重复利用线程池中的线程,降低资源消耗
  • 便于线程管理

12.1. 案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void testSystem() throws ExecutionException, InterruptedException {

ExecutorService service = Executors.newFixedThreadPool(2);

Future<String> submit = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Thread";
}
});

String s = submit.get();
System.out.println(s);
}

提交指定的任务去执行:

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

12.2. ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadPoolExecutor extends AbstractExecutorService {
//池大小
private int corePoolSize;

//线程没有任务时最多保持多长时间终止
private volatile long keepAliveTime;

//最大线程数
private volatile int maximumPoolSize;

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}

12.3. newFixedThreadPool

该池包含固定数量的线程,空闲线程会一直被保留

1
ExecutorService pool = Executors.newFixedThreadPool(2);

源码

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

12.4. newCachedThreadPool

必要时创建新线程,空闲线程会被保留60秒

1
ExecutorService pool = Executors.newCachedThreadPool();

源码

1
2
3
4
5
6
7
8

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, //corePoolSize
Integer.MAX_VALUE,//maximumPoolSize
60L, //keepAliveTime
TimeUnit.SECONDS,//TimeUnit
new SynchronousQueue<Runnable>());
}

12.5. newSingleThreadExecutor

只有一个线程的”池”,该线程顺序执行每一个提交的任务

1
ExecutorService ex = Executors.newSingleThreadExecutor();

源码

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));

12.6. newScheduledThreadPool

用于预定执行而构建的固定线程池

1
ExecutorService pool = Executors.newScheduledThreadPool(2);

源码

1
2
3
4
5
6
7
8
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

使用ScheduledThreadPoolExecutor需要注意的问题

ExecutorService的十个使用技巧

从零单排 Java Concurrency, SkipList&ConcurrnetSkipListMap

13. CompletableFuture

13.1. thenApply(T->U)

对结果应用一个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}

private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

13.2. thenCompose(T->U)

对结果调用函数并执行返回的future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}

private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
if (f == null) throw new NullPointerException();
Object r; Throwable x;
if (e == null && (r = result) != null) {
// try to return function result directly
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
return new CompletableFuture<V>(encodeThrowable(x, r));
}
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
Object s = g.result;
if (s != null)
return new CompletableFuture<V>(encodeRelay(s));
CompletableFuture<V> d = new CompletableFuture<V>();
UniRelay<V> copy = new UniRelay<V>(d, g);
g.push(copy);
copy.tryFire(SYNC);
return d;
} catch (Throwable ex) {
return new CompletableFuture<V>(encodeThrowable(ex));
}
}
CompletableFuture<V> d = new CompletableFuture<V>();
UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
return d;
}

13.3. handle((T,Throwable)->U)

处理结果或错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}

private <V> CompletableFuture<V> uniHandleStage(
Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniHandle(this, f, null)) {
UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

13.4. thenAccept(T->void)

类似于thenApply方法,结果为void

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e,Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

13.5. whenComplete((T,Throwable)->void)

类似于handle,结果为void

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

private CompletableFuture<T> uniWhenCompleteStage(
Executor e, BiConsumer<? super T, ? super Throwable> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<T> d = new CompletableFuture<T>();
if (e != null || !d.uniWhenComplete(this, f, null)) {
UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

13.6. thenRun(Runnable)

执行Runnable,结果为void

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniRun(this, f, null)) {
UniRun<T> c = new UniRun<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}

14. 同步器

14.1. Semaphore信号量

允许线程等待直到被允许继续运行为止,限制访问资源的线程总数,如果许可数为1,常常阻塞线程直到另一个线程给出许可为止。

14.2. CountDownLatch倒计时门栓

允许线程集等待直到计数器减为0,当一个或多个线程需要等待直到指定数目的事件发生

14.3. CyclicBarrier障珊

允许线程集等待直至其中预定数目的线程到达一个公共障珊,染灰可以选择执行一个处理障珊的动作

14.4. Exchanger交换器

允许两个线程在要交换的对象准备好时交换对象,当两个线程工作在同一个数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据

14.5. SynchronousQueue同步队列

允许一个线程把对象交给另一个对象,在没有显式同步情况下,当两个线程准备好将一个对象从一个线程传递到另一个时

本文结束  感谢您的阅读
  • 本文作者: Wang Ting
  • 本文链接: /zh-CN/2019/09/14/Java基础知识-线程/
  • 发布时间: 2019-09-14 00:11
  • 更新时间: 2021-10-29 14:21
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!