JUC概述

创建线程的四种方式

  1. 实现 Runnable 接口;
  2. 实现 Callable 接口;
  3. 继承 Thread 类(很少使用,单继承原因)。
  4. 线程池

实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用

Runnable

使用 Runnable 实例再创建一个 Thread 实例,然后调用 Thread 实例的 start() 方法来启动线程。

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        // ...
    }
}
public static void main(String[] args) {
    MyRunnable instance = new MyRunnable();
    Thread thread = new Thread(instance);
    thread.start();
}

Callable

与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask 进行封装,多个FutureTask 进行同样的计算,只会运行一次
若线程运行没有结束,get()方法会导致阻塞

public class MyCallable implements Callable<Integer> {
    public Integer call() {
        return 123;
    }
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
    MyCallable mc = new MyCallable();
    FutureTask<Integer> ft = new FutureTask<>(mc);
    Thread thread = new Thread(ft);
    thread.start();
    System.out.println(ft.get());
}

Thread

当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法

public class MyThread extends Thread {
    public void run() {
        // ...
    }
}


public static void main(String[] args) {
    MyThread mt = new MyThread();
    mt.start();
}

进程和线程

死锁

class resourse implements Runnable {
    private Object lockA;
    private Object lockB;

    public resourse(Object lockA, Object lockB) {
        this.lockA = lockA;
        this.lockB = lockB;
    }

    @Override
    public  void run() {
        synchronized (lockA) {
            System.out.println(Thread.currentThread().getName() + "持有"+lockA+"尝试"+lockB);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            synchronized (lockB) {
                System.out.println(Thread.currentThread().getName() + "持有"+lockB+"尝试"+lockA);
            }
        }
    }
}

public class DeadLock {
    public static void main(String[] args) {
        new Thread(new resourse("A", "B"), "线程A").start();
        new Thread(new resourse("B", "A"), "线程B").start();
    }
}

死锁条件

多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。

  1. 互斥条件:该资源任意一个时刻只由一个线程占用。
  2. 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 不剥夺条件:线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。
  4. 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

预防死锁

  1. 破坏请求与保持条件 :一次性申请所有的资源。
  2. 破坏不剥夺条件 :占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源。
  3. 破坏循环等待条件 :靠按序申请资源来预防。按某一顺序申请资源,释放资源则反序释放。破坏循环等待条件。

进程

进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。

在 Java 中,当启动 main 函数时其实就是启动了一个 JVM 的进程,而 main 函数所在的线程就是这个进程中的一个线程,也称主线程。

线程

线程与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中可以产生多个线程。与进程不同的是同类的多个线程共享进程的堆和方法区资源,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程。

线程分类

  1. 用户线程
    开发者自定义使用的线程
    主线程结束,用户线程还在运行,jvm存活
  2. 守护线程
    系统使用的线程,如垃圾回收

用户线程和主线程都结束,只剩守护线程运行,jvm结束

线程的不同状态

线程在运行的生命周期中的指定时刻只可能处于下面 6 种不同状态的其中一个状态
Java线程状态变迁.png

  1. NEW(新建)
    初始状态,线程被创建,但是还没有调用start()方法

  2. RUNABLE(准备就绪)
    可运行状态,正在 Java 虚拟机中运行。但是在操作系统层面,它可能处于运行状态,也可能等待资源调度(例如处理器资源),资源调度完成就进入运行状态。所以该状态的可运行是指可以被运行,具体有没有运行要看底层操作系统的资源调度

  3. BLOCKED(阻塞)
    阻塞状态,表明线程阻塞于锁.要结束该状态从而进入 RUNABLE 需要其他线程释放锁

  4. WAITING(不见不散)
    等待状态.进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断)
    阻塞和等待的区别在于,阻塞是被动的,它是在等待获取 monitor lock。而等待是主动的,通过调用 Object.wait() 等方法进入

进入方法退出方法
没有设置 Timeout 参数的 Object.wait() 方法Object.notify() / Object.notifyAll()
没有设置 Timeout 参数的 Thread.join() 方法被调用的线程执行完毕
LockSupport.park() 方法LockSupport.unpark(Thread)
  1. TIMED_WAITING(过期不候)
    超时等待状态,无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒
进入方法退出方法
Thread.sleep() 方法时间结束
设置了 Timeout 参数的 Object.wait() 方法时间结束 / Object.notify() / Object.notifyAll()
设置了 Timeout 参数的 Thread.join() 方法时间结束 / 被调用的线程执行完毕
LockSupport.parkNanos() 方法LockSupport.unpark(Thread)
LockSupport.parkUntil() 方法LockSupport.unpark(Thread)
调用 Thread.sleep() 方法使线程进入超时等待状态时,常常用“使一个线程睡眠”进行描述。调用 Object.wait() 方法使线程进入超时等待或者无限期等待时,常常用“挂起一个线程”进行描述。睡眠和挂起是用来描述行为,而阻塞和等待用来描述状态
  1. TERMINATED(终结)
    终止状态,是线程结束任务之后自己结束,或者产生了异常而结束

进程和线程的关系

Java运行时数据区域JDK1.8

线程是进程划分成的更小的运行单位。线程和进程最大的不同在于基本上各进程是独立的,而各线程则不一定,因为同一进程中的线程极有可能会相互影响。线程执行开销小,但不利于资源的管理和保护;而进程正相反。

并行和并发

  • 并发:同一时刻多个线程在访问同一个资源,多个线程对一个点(秒杀商品)
  • 并行:多项工作一起执行,之后汇总(cpu运行多个程序)

线程间通信

join()

在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束

对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出

public class JoinExample {

    private class A extends Thread {
        @Override
        public void run() {
            System.out.println("A");
        }
    }

    private class B extends Thread {

        private A a;

        B(A a) {
            this.a = a;
        }

        @Override
        public void run() {
            try {
                a.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
        }
    }

    public void test() {
        A a = new A();
        B b = new B(a);
        b.start();
        a.start();
    }
}

wait() notify() notifyAll()

调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程(虚假唤醒问题,在哪里等待在哪里唤醒,因此wait需要放在循环中)。

它们都属于 Object 的一部分,而不属于 Thread。

只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。

使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

public class WaitNotifyExample {

    public synchronized void before() {
        System.out.println("before");
        notifyAll();
    }

    public synchronized void after() {
        try {
            wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after");
    }
}
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    WaitNotifyExample example = new WaitNotifyExample();
    executorService.execute(() -> example.after());
    executorService.execute(() -> example.before());
}
before
after

await() signalAll()

    private int num = 0;
    private Lock lock=new ReentrantLock();
    private Condition condition = lock.newCondition();

    lock.lock();
    while (num != 0) {
            condition.await();
    }
    num++;
    condition.signalAll();
    lock.unlock();

await(), signal(),signalAll() 的功能和 wait(), notify(), notifyAll() 基本相同, 区别是,基于 Condition 的 await(), signal(), signalAll() 使得我们可以在同一个锁的代码块内,优雅地实现基于多个条件的线程间挂起与唤醒操作

wait() 和 sleep() 的区别

  • wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
  • wait() 会释放锁,sleep() 不会释放锁
  • 可以被interruped方法中断

JUC三大辅助类

减少计数CountDownLatch

CountDownLatch类可以设置一个计数器,然后通过countDown()方法来进行减1的操作,使用await()方法等待计数器为0,然后继续执行await()方法之后的语句

  • CountDownLatch主要有两个方法,当一个或多个线程调用await()方法时,这些线程会阻塞
  • 其他线程调用countDown()方法会将计数器减1(调用countDown()方法的线程不会阻塞)
  • 当计数器的值变为0时,因await()方法阻塞的线程会被唤醒,继续执行

循环栅栏CycliBarrier

CycliBarrier构造方法的第一个参数是目标障碍数,每次执行CycliBarrier障碍数会加一,如果达到了目标障碍数,才会执行cycliBarrier.await()之后的语句

让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,所有被阻塞的线程才能继续工作,线程通过cycliBarrier.await()方法

信号量Semaphore

信号量主要有两个目的:一个是用于多个共享资源的互斥使用,另一个是用于并发线程数的控制

Semaphore构造方法的参数是目标信号灯个数,每次执行acquire()会申请一个信号灯,release()会释放一个信号灯,申请不到灯就一直阻塞线程

同步

管程(锁):一种同步机制,保证同一时间,只有一个线程访问被保护的资源

jvm同步基于进入和退出,使用管程对象实现

互斥同步(阻塞同步)

除非需要使用 ReentrantLock 的高级功能,否则优先使用 synchronized。这是因为 synchronized 是 JVM 实现的一种锁机制,JVM 原生地支持它,而 ReentrantLock 不是所有的 JDK 版本都支持。并且使用 synchronized 不用担心没有释放锁而导致死锁问题,因为 JVM 会确保锁的释放。

synchronized(JVM 实现)

1. 同步代码块

    public void func() {
        synchronized (this) {                
		// ...        
        }
    }

指定加锁对象,对给定对象/类加锁。synchronized(this|object)表示进入同步代码库前要获得给定对象的锁。synchronized(类.class)表示进入同步代码前要获得当前class的锁

2. 同步一个方法

    public synchronized void func() {
        // ...
    }

作用于同一个对象

3. 同步一个类

    public void func() {
        synchronized (SynchronizedExample.class) {
            // ...
        }
    }

作用于整个类,也就是说两个线程调用同一个类的不同对象上的这种同步语句,也会进行同步。

4. 同步一个静态方法

    public synchronized static void fun() {
        // ...
    }

作用于整个类。但是对于其他线程调用非静态方法不会产生影响,因为访问静态 synchronized方法占用的锁是当前类的锁,而访问非静态synchronized方法占用的锁是当前实例对象锁

Lock接口(JDK实现)

Lock锁比使用synchronized可以获得更多的功能,可以允许更灵活的结构

ReentrantLock 是 java.util.concurrent(J.U.C)包中的锁。

public class LockExample {
    private Lock lock = new ReentrantLock();

    public void func() {
        lock.lock();
        try {
            for (int i = 0; i < 10; i++) {
                System.out.print(i + " ");
            }
        } finally {
            lock.unlock(); // 确保释放锁,从而避免发生死锁。                
        }
    }
}

对比

  1. 两者都是可重入锁(递归锁)
    可重入锁 指的是同一线程外层函数获得锁之后,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没释放而阻塞。也就是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。同一个线程每次获取锁,锁的计数器都自增 1,所以要等到锁的计数器下降为 0 时才能释放锁。如果加锁次数和释放次数不一样,第二个线程始终无法获取到锁,导致一直再等待

    可重入锁最大的最用就是避免死锁

  2. 等待可中断
    当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。
    使用synchronized 时,等待的线程会一直等待下去,不能够响应中断,除非抛出异常或正常运行完成

    ReentrantLock 可中断:

    1. 设置超时方法
    2. 调用interrupt()方法可中断
  3. 公平锁
    公平锁是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁。
    synchronized 中的锁是非公平的,ReentrantLock 默认情况下也是非公平的,但是也可以是公平的

    非公平锁的优点在于吞吐量比公平锁大

  4. 原始构成
    Lock是一个接口,而synchronized是关键字。Lock在发生异常时,如果没主动unlock()去释放锁,则很可能造成死锁现象;synchronized是内置实现,jvm原生支持,由jvm保证不会发生死锁现象,只有在同步代码块中才能调用wait/notify方法。

  5. 通过Lock可以知道有没有成功获取到锁,而synchronized无法办到.

  6. 锁绑定多个条件Condition

    synchronized不能精确唤醒,与wait()和notify()/notifyAll()方法相结合可以实现等待/通知机制

    ReentrantLock用来实现分组唤醒不同线程,可以精确唤醒。用ReentrantLock类结合Condition实例可以实现“选择性通知” ,Condition实例的signalAll()方法 只会唤醒注册在该Condition实例中的所有等待线程。

非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁

乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步

ReadWriteLock

读锁:共享锁,会发生死锁

写锁:独占锁,会发生死锁

读写锁:一个资源可以被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享

读写锁缺点:

  1. 造成锁饥饿,一直读操作,没有写操作
  2. 读操作时不能写,只有读完成后才能写
        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        readWriteLock.writeLock().lock();
        readWriteLock.readLock().unlock();

锁降级:将写入锁降级为读锁

获取写锁->获取读锁->释放写锁->释放读锁

阻塞队列

通过一个共享的队列,可以使得数据由队列的一段输入,从另一端输出

阻塞队列

当队列是空的,从队列中获取元素的操作将会被阻塞

当队列是满的,从队列中添加元素的操作将会被阻塞

试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空队列中插入新的元素

试图向已满的队列中添加元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素,使队列变得空闲起来并后续新增

分类

  1. ArrayBlockingQueue
    基于数组的阻塞队列,除了维护一个定长数组外,ArrayBlockingQueue内部还保存两个整形变量,分别表示队列的头部和尾部

  2. LinkedBlockingQueue
    基于链表的阻塞队列,由链表结构组成的有界(大小默认值为integer.MAX_VALUE)阻塞队列,慎用

  3. DelayQueue
    使用优先级队列实现的延迟无界阻塞队列

  4. PriorityBlockingQueue
    支持优先级排序的无界阻塞队列

  5. SynchronousQueue
    不存储元素的阻塞队列,即单个元素的队列。每个put操作必须等待一个take操作,否则不能继续添加元素,反之亦然

  6. LinkedTransferQueue
    由链表组从的无界阻塞队列

  7. LinkedBlockingDeque
    由链表组成的双向阻塞队列

常用方法

方法类型抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()不可用不可用

线程池

线程池架构

优点:

  1. 降低资源消耗
  2. 提高响应速度
  3. 提高线程的可管理性

创建线程池

《阿里巴巴Java开发手册》中强制线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式可以更加明确线程池的运行规则,规避资源耗尽的风险

1. 通过构造方法实现

ThreadPoolExecutor构造方法

2. 通过工具类Executors来实现

工具类中的方法内部实际调用了ThreadPoolExecutor的构造方法

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

ThreadPoolExecutor类分析

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,
        long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory
        threadFactory,RejectedExecutionHandler handler){
        if(corePoolSize< 0||maximumPoolSize<=0||maximumPoolSize<corePoolSize ||keepAliveTime< 0)
        throw new IllegalArgumentException();
        if(workQueue==null||threadFactory==null||handler==null)throw new NullPointerException();
        this.corePoolSize=corePoolSize;
        this.maximumPoolSize=maximumPoolSize;
        this.workQueue=workQueue;
        this.keepAliveTime=unit.toNanos(keepAliveTime);
        this.threadFactory=threadFactory;
        this.handler=handler;
        }

构造函数参数分析

  • corePoolSize: 常驻线程数量(核心线程数)
  • maximumPoolSize: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数量
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
  • keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可。
  • handler:拒绝策略

拒绝策略

当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务,则会按照策略执行

  • ThreadPoolExecutor.AbortPolicy(默认): 抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy: “调用者运行”调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务回退到调用者,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃等待最久的任务请求,然后把当前任务加入队列。

如果在代码中模拟了10个任务,配置的核心线程数为5、等待队列容量为100,> > 所以每次只可能存在5个任务同时执行,剩下的5个任务会被放到等待队列中去。> 当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

execute方法和submit方法

//execute方法,无返回值
void execute(Runnable command);
//submit方法
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

execute() 提交没有返回值,也就不能判断是否执行成功。

submit()会返回一个Future对象,通过future的get方法来获取返回值,不过get方法会阻塞住直到任务完成。

Fork/Join

Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出

内存模型

JMM

JAVA内存模型(JMM)本身是一种抽象概念并不真实存在,它描述的是一组规范,通过这组规范定义了程序中各个变量的访问方式。

JMM关于同步的规定:

  1. 线程解锁前,必须把共享变量的值刷新回主内存
  2. 线程加锁前,必须读取主内存的最新值到自己的工作内存
  3. 加锁解锁是同一把锁

JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存,工作内存是每个线程私有的数据区域,而JMM规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都能访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作内存,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存中存储着主内存中的变量拷贝副本,因此不同的线程间无法访问对方的工作内存,线程间的通信必须通过主内存来完成。

JMM定义了一套在多线程读写共享数据时(成员变量,数组)时,对数据的可见性、有序性和原子性的规则和保障

  • 原子性 - 保证指令不会受到线程上下文切换的影响

  • 可见性 - 保证指令不会受 cpu 缓存的影响

  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

原子性

两个线程对初始值为 0 的静态变量一个做自增,一个做自减,各做 5000 次,以上的结果可能是正数、负数、零。

对于 i++ 而言(i 为静态变量),实际会产生如下的 JVM 字节码指令:

//自增
getstatic i
// 获取静态变量i的值
iconst_1
// 准备常量1
iadd
// 加法
putstatic i
// 将修改后的值存入静态变量i

多线程下自增自减可能交错运行,导致读取到错误的值

解决方法

  • 使用synchronized关键字
  • 使用AtomicInteger(原子数据类型,基于CAS实现)

可见性

main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止

static boolean run = true;
public static void main (String[]args) throws InterruptedException {
        Thread t = new Thread(() -> {
        while (run) {
        // ....
        }
        });
        t.start();
        Thread.sleep(1000);
        run = false;
        // 线程t不会如预想的停下来
        }

工作内存与主内存同步延迟现象造成了可见性问题

解决方法

  • 使用volatile关键字
  • 它可以用来修饰成员变量静态成员变量(放在主存中的变量),可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存

可见性保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一 个线程可见, 不能保证原子性,仅用在一个写线程,多个读线程的情况

注意

synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized是属于重量级操作,性能相对更低

有序性

指令重排:JVM 会在不影响正确性的前提下,可以调整语句的执行顺序多线程下『指令重排』会影响正确性

解决办法volatile 修饰的变量,可以禁用指令重排

  • 禁止的是加volatile关键字变量之前的代码被重排序

CAS与volatile

CAS 即 Compare and Swap ,它体现的一种乐观锁的思想。它的功能是判断内存某个位置的值是否为预期值,如果是则更新,这个过程是原子的。Java中调用Unsafe类中的CAS方法,JVM会实现出原子操作,达到数据一致

比如多个线程要对一个共享的整型变量执 行 +1 操作:

// 需要不断尝试
while(true) {
	int 旧值 = 共享变量 ; // 比如拿到了当前值 0
	int 结果 = 旧值 + 1; // 在旧值 0 的基础上增加 1 ,正确结果是 1
    /*
    这时候如果别的线程把共享变量改成了 5,本线程的正确结果 1 就作废了,这时候
    compareAndSwap 返回 false,重新尝试,直到:
    compareAndSwap 返回 true,表示我本线程做修改的同时,别的线程没有干扰
    */
	if( compareAndSwap ( 旧值, 结果 )) {
	// 成功,退出循环
	}
}

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • synchronized 是基于悲观锁的思想
  • CAS 是基于乐观锁的思想,体现的是无锁并发、无阻塞并发,缺点:
    • 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一,但如果竞争激烈,重试必然频繁发生,可能给cpu带来很大的开销
    • 只能保证一个共享变量的原子操作
    • 引出ABA问题

Unsafe

  1. unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地方法来访问,基于该类可以直接操作特定内存的数据。
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    private volatile int value;
    
    public final int getAndIncrement() {
        //this指的是对象本身
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }
}
  1. 变量valueOffset表示该变量值在内存中的偏移地址,因为unsafe就是根据内存偏移地址获取数据的。
  2. 变量value用volatile修饰,保证了多线程间的内存可见性。

ABA问题

CAS会导致ABA问题。CAS算法实现的一个前提是需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差会发生数据的变化。

如线程1从内存位置V取出数据A,此时线程2也从内存取出数据A,由于线程2运行速度快,它将A变成B又变成A,这时候线程1进行CAS操作发现内存中仍然是A,然后线程1操作成功。虽然线程1CAS操作成功,但是这个过程是存在问题的。

ABA解决:AtomicStampedReference(原子引用+时间戳)

class user {
    String name;
    int age;

    public user(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
//普通原子引用类
public class AtomicDemo {
    public static void main(String[] args) {
        user user1 = new user("小米", 11);
        user user2 = new user("华为", 22);
        AtomicReference<user> userAtomicReference = new AtomicReference<>();
        userAtomicReference.set(user1);
        System.out.println(userAtomicReference.compareAndSet(user1, user2));
    }
}

AtomicStampedReference使用详解

public class AtomicDemo {
    static AtomicReference<String> atomicReference = new AtomicReference<>("A");
    static AtomicStampedReference<String> atomicStampedReference = new AtomicStampedReference("A", 1);

    public static void main(String[] args) {
//        System.out.println("===============ABA问题产生============");
//        new Thread(() -> {
//            atomicReference.compareAndSet("A", "B");
//            atomicReference.compareAndSet("B", "A");
//        }, "线程1").start();
//
//        new Thread(() -> {
//            try {
//                TimeUnit.SECONDS.sleep(1);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//            System.out.println(atomicReference.compareAndSet("A", "B") + "\t" + atomicReference.get());
//        }, "线程2").start();

        System.out.println("===============ABA问题解决============");
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName()+"\t第1次版本号"+atomicStampedReference.getStamp());
            atomicStampedReference.compareAndSet("A", "B",atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"\t第2次版本号"+atomicStampedReference.getStamp());
            atomicStampedReference.compareAndSet("B", "A",atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"\t第3次版本号"+atomicStampedReference.getStamp());
        }, "线程3").start();

        new Thread(() -> {
            int stamp = atomicStampedReference.getStamp();
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"\t第1次版本号"+stamp);
            System.out.println(atomicStampedReference.compareAndSet("A", "B", stamp, stamp + 1));
            System.out.println(Thread.currentThread().getName()+"\t第2次版本号"+atomicStampedReference.getStamp());
        }, "线程4").start();
    }
}

原子操作类

juc(java.util.concurrent)中提供了原子操作类,可以提供线程安全的操作,例如:AtomicInteger、 AtomicBoolean等,它们底层就是采用 CAS 技术 + volatile 来实现的。

 AtomicInteger i = new AtomicInteger(0);
 
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ 
System.out.println(i.getAndIncrement());
 
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i 
System.out.println(i.incrementAndGet());
 
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i 
System.out.println(i.decrementAndGet());
 
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
 
// 获取并加值(i = 0, 结果 i = 5, 返回 0) 
System.out.println(i.getAndAdd(5));
 
// 加值并获取(i = 5, 结果 i = 0, 返回 0) 
System.out.println(i.addAndGet(-5));
 
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0) 
// 其中函数中的操作能保证原子,但函数需要无副作用 
System.out.println(i.getAndUpdate(p -> p - 2));
 
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用 
System.out.println(i.updateAndGet(p -> p + 2));
 
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0) 
// 其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的 
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 
final System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
 
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0) 
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

synchronized优化

synchronized关键字解决的是多个线程之间访问资源的同步性,它可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。

在Java早期版本中,synchronized属于重量级锁,依赖于底层的操作系统,而操作系统实现线程之间的切换时需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,效率低下。

JDK1.6对锁的实现引入了大量的优化,如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。

构造方法不能使用 synchronized 关键字修饰,因为构造方法就是线程安全的

锁可以升级, 但不能降级. 即: 无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

偏向锁

偏向锁是针对于一个线程而言的, 线程获得锁之后就不会再有解锁等操作了, 这样可以省略很多开销. 当出现有两个线程来竞争锁的话, 那么偏向锁就失效了, 此时锁就会膨胀, 升级为轻量级锁

大多数时候是不存在锁竞争的,常常是一个线程多次获得同一个锁,因此如果每次都要竞争锁会增大很多没有必要付出的代价,为了降低获取锁的代价,才引入的偏向锁。如果在重入到一定阈值之后仍然没有任何线程抢占行为发生,JVM就会省略CAS这个操作,以后只要不发生竞争,这个对象就归该线程所有。

以下几种情况会使对象的偏向锁失效:

  • 调用对象的hashCode方法
  • 多个线程使用该对象
  • 调用了wait/notify方法(调用wait方法会导致锁膨胀而使用重量级锁

偏向锁的加锁

  1. 偏向锁标志是未偏向状态,使用 CAS 将 MarkWord 中的线程ID设置为自己的线程ID,如果成功,则获取偏向锁成功;如果失败,则进行锁升级。
  2. 偏向锁标志是已偏向状态,MarkWord 中的线程 ID 是自己的线程 ID,则成功获取锁;MarkWord 中的线程 ID 不是自己的线程 ID,需要进行锁升级

轻量级锁

轻量级锁使用场景:当一个对象被多个线程所访问,但访问的时间是错开的(不存在竞争),此时就可以使用轻量级锁来优化。之所以是轻量级,是因为它仅仅使用 CAS 进行操作,实现获取锁。

  • 创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录对象,内部可以存储锁定对象的mark word(不再一开始就使用Monitor)
    轻量级锁1
  • 让锁记录中的Object reference指向锁对象(Object),并尝试用cas去替换Object中的mark word,将此mark word放入lock record中保存
    轻量级锁2
  • 如果cas替换成功,则将Object的对象头替换为锁记录的地址状态 00(轻量级锁状态),并由该线程给对象加锁
    轻量级锁3

有线程A和线程B来竞争对象c的锁(如: synchronized(c){}), 这时线程A和线程B同时将对象c的MarkWord复制到自己的锁记录中, 两者竞争去获取锁, 假设线程A成功获取锁, 并将对象c的对象头中的线程ID(MarkWord中)修改为指向自己的锁记录的指针, 这时线程B仍旧通过CAS去获取对象c的锁, 因为对象c的MarkWord中的内容已经被线程A改了, 所以获取失败. 此时为了提高获取锁的效率, 线程B便尝试使用自旋来获取锁, 这个循环是有次数限制的, 如果在循环结束之前CAS操作成功, 那么线程B就获取到锁, 如果循环结束依然获取不到锁, 则获取锁失败, 对象c的MarkWord中的记录会被修改为重量级锁, 然后线程B就会被挂起, 之后有线程C来获取锁时, 看到对象c的MarkWord中的是重量级锁的指针, 说明竞争激烈, 直接挂起

在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋。

如果自旋次数到了线程1还没有释放锁,或者线程1还在执行,线程2还在自旋等待,这时又有一个线程3过来竞争这个锁对象,那么这个时候轻量级锁就会膨胀为重量级锁。重量级锁把除了拥有锁的线程都阻塞,防止CPU空转

重量级锁

重量级锁是使用操作系统互斥量(mutex)来实现的传统锁。当所有对锁的优化都失效时,将退回到重量级锁。它与轻量级锁不同,竞争的线程不再通过自旋来竞争线程,而是直接进入堵塞状态,此时不消耗CPU,然后等拥有锁的线程释放锁后,唤醒堵塞的线程,然后线程再次竞争锁。

优点缺点适用场景
偏向锁加锁和解锁不需要额外的消耗, 和执行非同步代码方法的性能相差无几如果线程间存在锁竞争, 会带来额外的锁撤销的消耗.适用于只有一个线程访问的同步场景
轻量级锁竞争的线程不会阻塞, 提高了程序的响应速度如果始终得不到锁竞争的线程, 使用自旋会消耗CPU追求响应时间, 同步快执行速度非常快
重量级锁线程竞争不适用自旋, 不会消耗CPU线程堵塞, 响应时间缓慢追求吞吐量, 同步快执行时间速度较长

其他优化

  1. 减少上锁时间
    同步代码块中尽量短

  2. 减少锁的粒度
    将一个锁拆分为多个锁提高并发度,例如:ConcurrentHashMap

  3. 锁粗化
    多次循环进入同步块不如同步块内多次循环,另外 JVM 可能会做如下优化,把多次 append 的加锁操作粗化为一次(因为都是对同一个对象加锁, 没必要重入多次)

    new StringBuffer().append("a").append("b").append("c");
    
  4. 锁消除
    JVM 会进行代码的逃逸分析,例如某个加锁对象是方法内局部变量,不会被其它线程所访问到,这时候 就会被即时编译器忽略掉所有同步操作。

  5. 自旋锁
    是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是会循环消耗CPU

    //手写自旋锁
    public class SpinLockDemo {
       AtomicReference<Thread> atomicReference = new AtomicReference<>();
    
       public void myLock() {
           Thread thread = Thread.currentThread();
           System.out.println(thread.getName() + "myLock");
           while (!atomicReference.compareAndSet(null, thread)) {
    
           }
       }
    
       public void myUnLock() {
           Thread thread = Thread.currentThread();
           System.out.println(thread.getName() + "myUnLock");
           while (!atomicReference.compareAndSet(thread, null)) {
    
           }
       }
    
       public static void main(String[] args) throws Exception {
           SpinLockDemo spinLockDemo = new SpinLockDemo();
           new Thread(() -> {
               spinLockDemo.myLock();
               try {
                   TimeUnit.SECONDS.sleep(3);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               spinLockDemo.myUnLock();
           }, "AA").start();
    
           TimeUnit.SECONDS.sleep(1);
           new Thread(() -> {
               spinLockDemo.myLock();
               try {
                   TimeUnit.SECONDS.sleep(3);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               spinLockDemo.myUnLock();
           }, "BB").start();
       }
    }
    
  6. 读写分离

    1. CopyOnWriteArrayList
    2. ConyOnWriteSet

读写锁

独占锁:指该锁一次只能被一个线程所持有。synchronizedReentrantLock都是独占锁

共享锁:值该锁可被多个线程锁持有

ReentrantReadWriteLock其读锁是共享锁,写锁是独占锁

读锁的共享锁保证并发读是高效的,读写、写读、写写的过程是互斥的

synchronized和volatile的区别

  1. volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比synchronized关键字要好。但是volatile关键字只能用于变量而 synchronized关键字可以修饰方法以及代码块。
  2. volatile关键字能保证数据的可见性,但不能保证数据的原子性。synchronized关键字两者都能保证。
  3. volatile关键字主要用于解决变量在多个线程之间的可见性,而 synchronized关键字解决的是多个线程之间访问资源的同步性。

AQS

前置:LockSupport

用于创建锁和其他同步类的基本线程阻塞原语,LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程,每个线程都有一个Permit,Permit只有1和0,默认是0

可以把Permit看成是一种(0,1)信号量,但与信号量不同的是,Permit的累加上限是1.

唤醒线程:

  1. 使用Object中的wait()方法让线程等待,使用Object中的notify()方法唤醒线程
    1. 必须在同步代码块中使用wait()方法和notify()方法,否则会出现异常
    2. 将notify()方法放在wait()前面,程序无法执行,无法唤醒
  2. 使用JUC包中Condition的await()方法让现场等待,使用signal()方法唤醒线程
    1. 必须在同步代码块中使用,否则会出现异常
    2. signal()方法放在await()方法前面,程序无法执行,无法唤醒
  3. LockSupport的park()和unpark()作用分别是阻塞线程和解除阻塞线程

常用方法

  1. park()/park(Object blocker)
    permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park()方法会被唤醒,然后会将permit再次设置为0并返回
  2. unpark(Thread thread)
    调用unpark(thread)方法后,就会将thread线程的permit设置成1(多次调用不会累加,最大为1)会自动唤醒thread线程,即之前阻塞中的park()方法会立即返回
public static void park() {
    UNSAFE.park(false, 0L);
}

public static void unpark(Thread thread) {
    if (thread != null)
    UNSAFE.unpark(thread);
}

LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞后也有对应的唤醒方法.归根到底,LockSupport调用的Unsafe中的native代码.

AQS入门

AbstractQueuedSynchronizer简称AQS,是一个抽象类(模板设计模式),作为顶层设计供其余类继承.是用来构建锁或者其他同步器组件的重量级基础框架以及整个JUC体系的基石.

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装呈一个Node节点来实现锁的分配,通过CAS完成对State值的修改
AQS

AQS具有头尾指针,前后指针.Node内部类的等待状态变量waitStatus

//AbstractQueuedSynchronizer的内部Node类

static final class Node {
	//共享
        static final Node SHARED = new Node();

	//独占
        static final Node EXCLUSIVE = null;

	//线程被取消了
        static final int CANCELLED =  1;

	//后继线程需要唤醒
        static final int SIGNAL    = -1;

	//等待condition唤醒
        static final int CONDITION = -2;

	//共享式同步状态获取将会无条件传播下去
        static final int PROPAGATE = -3;

	//初始0,状态是以上介绍
        volatile int waitStatus;

	//前置节点
        volatile Node prev;

	//后置节点
        volatile Node next;


        volatile Thread thread;
}

AQS解读(Lock)

Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的

public class ReentrantLock implements Lock, java.io.Serializable {

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class FairSync extends Sync {}

    static final class NonfairSync extends Sync {}

}

非公平锁与公平锁对比

存储线程的双向链表中,第一个节点为虚节点(哨兵节点),并不存储任何信息,只是占位.真正的第一个有数据的节点,是从第二个节点开始的

总结代码

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class share {
    private volatile boolean flag = true;
    private AtomicInteger atomicInteger = new AtomicInteger();
    BlockingQueue<String> blockingQueue = null;

    public share(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public void pro() throws Exception {
        String data = null;
        boolean res;
        while (flag) {
            data = atomicInteger.incrementAndGet() + "";
            res = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
            if (res) {
                System.out.println(Thread.currentThread().getName() + data + "插入队列成功");
            } else {
                System.out.println(Thread.currentThread().getName() + "插入队列失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "生产结束");
    }

    public void cus() throws Exception {
        String res = null;
        while (flag) {
            res = blockingQueue.poll(2, TimeUnit.SECONDS);

            if (res == null || res.equalsIgnoreCase("")) {
                flag = false;
                System.out.println(Thread.currentThread().getName() + "超过两秒没有取到,退出");
                return;
            }
            System.out.println(Thread.currentThread().getName() + res + "消费队列成功");
        }
    }

    public void stop() {
        this.flag = false;
    }
}

//volatile/CAS/AtomicInteger/BlockingQueue/线程交互/原子引用
public class NewDemo {
    public static void main(String[] args) throws InterruptedException {
        share share = new share(new ArrayBlockingQueue<>(10));
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "生产启动");
            try {
                share.pro();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "生产者").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "消费启动");
            try {
                share.cus();
                System.out.println();
                System.out.println();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "消费者").start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println();
        System.out.println("结束");
        share.stop();
    }
}

Q.E.D.