1、前言

由于JVM的Synchronized重量级锁涉及操作系统(如Linux)内核态下互斥锁的使用,因此其线程阻塞和唤醒都涉及进程在用户态到内核态的频繁切换,导致重量级锁开销大、性能低。而JVM的Synchronized轻量级锁使用CAS(Compare And Swap,比较并交换)进行自旋抢锁。

2、CAS深入理解

2.1 什么是 CAS

CAS(Compare And Swap,比较并交换),通常指的是这样一种原子操作:针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同,就给它赋一个新值。JDK 5所增加的JUC(java.util.concurrent)并发包对操作系统的底层CAS原子操作进行了封装,为上层Java程序提供了CAS操作的API。

CAS 的逻辑用伪代码描述如下:

if (value == expectedValue) {
    value = newValue;

代码解释:

以上伪代码描述了一个由比较和赋值两阶段组成的复合操作,CAS 可以看作是它们合并后的整体——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。

CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java原子类中的递增操作就通过CAS自旋实现的。

CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java原子类中的递增操作就通过CAS自旋实现的。
CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。

2.2 Unsafe类中的CAS方法

2.2.1 概念

Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全的底层操作,如直接访问系统内存资源、自主管理内存资源等。Unsafe大量的方法都是native方法,基于C++语言实现,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。

2.2.2 CAS操作主要涉及Unsafe方法的调用流程

(1)获取Unsafe实例
Unsafe类是一个final修饰的不允许继承的最终类,而且其构造函数是private类型的方法

public final class Unsafe {
         private static final Unsafe theUnsafe;
         public static final int INVALID_FIELD_OFFSET = -1;
     
         private static native void registerNatives();
         // 构造函数是private的,不允许外部实例化
         private Unsafe() {
         }
         ...
}

获取Unsafe实例:

     // 省略import
     public class JvmUtil
     { 
         //自定义地获取Unsafe实例的辅助方法
         public static Unsafe getUnsafe()
         {
             try
             {
                 Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
                 theUnsafe.setAccessible(true);
                 return (Unsafe) theUnsafe.get(null);
             } catch (Exception e)
             {
                 throw new AssertionError(e);
             }
         }
         // 省略不相干代码
     }

(2)调用Unsafe提供的CAS方法,这些方法主要封装了底层CPU的CAS原子操作。
Unsafe提供的CAS方法:
Unsafe提供的CAS方法包含4个操作数——字段所在的对象、字段内存位置、预期原值及新值。在执行Unsafe的CAS方法时,这些方法首先将内存位置的值与预期值(旧的值)比较,如果相匹配,那么CPU会自动将该内存位置的值更新为新值,并返回true;如果不匹配,CPU不做任何操作,并返回false。

/**
       *  定义在Unsafe类中的三个“比较并交换”原子方法
       * @param o          需要操作的字段所在的对象
       * @param offset    需要操作的字段的偏移量(相对的,相对于对象头)
       * @param expected  期望值(旧的值)
       * @param update    更新值(新的值)
       * @return            true  更新成功   | false  更新失败
       */
     public final native boolean compareAndSwapObject(
               Object o, long offset,  Object expected, Object update);
     
     public final native boolean compareAndSwapInt(
               Object o, long offset, int expected,int update);
     
     public final native boolean compareAndSwapLong(
               Object o, long offset, long expected, long update);

(3)调用Unsafe提供的字段偏移量方法,这些方法用于获取对象中的字段(属性)偏移量,此偏移量值需要作为参数提供给CAS操作。
Unsafe提供的获取字段(属性)偏移量的相关操作

/**
       *  定义在Unsafe类中的几个获取字段偏移量的方法
       * @param o         需要操作字段的反射
       * @return          字段的偏移量
       */
     
     public native long staticFieldOffset(Field field);
     
     public native long objectFieldOffset(Field field);

staticFieldOffset()方法用于获取静态属性Field在Class对象中的偏移量,在CAS中操作静态属性时会用到这个偏移量。objectFieldOffset()方法用于获取非静态Field(非静态属性)在Object实例中的偏移量,在CAS中操作对象的非静态属性时会用到这个偏移量。
一个获取非静态Field(非静态属性)在Object实例中的偏移量

static
	{
		try
			{
				//获取反射的Field对象
				OptimisticLockingPlus.class.getDeclaredField("value");
				//取得内存偏移
				valueOffset = unsafe.objectFieldOffset();
			} catch (Exception ex)
			{
				throw new Error(ex);
			}
	}
2.2.3 应用案例

以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。

代码案列:

public class CASTest {

    public static void main(String[] args) {
        Entity entity = new Entity();

        Unsafe unsafe = UnsafeFactory.getUnsafe();

        long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");

        boolean successful;

        // 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段新值
        successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
        System.out.println(successful + "\t" + entity.x);

        successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
        System.out.println(successful + "\t" + entity.x);

        successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
        System.out.println(successful + "\t" + entity.x);
    }
}

public class UnsafeFactory {

    /**
     * 获取 Unsafe 对象
     * @return
     */
    public static Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 获取字段的内存偏移量
     * @param unsafe
     * @param clazz
     * @param fieldName
     * @return
     */
    public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
        try {
            return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }

针对 entity.x 的 3 次 CAS 操作,分别试图将它从 0 改成 3、从 3 改成 5、从 3 改成 8。执行结果如下:

2.3 CAS进行无锁编程

2.3.1 概念

CAS是一种无锁算法,该算法关键依赖两个值——期望值(旧值)和新值,底层CPU利用原子操作判断内存原值与期望值是否相等,如果相等就给内存地址赋新值,否则不做任何操作。

2.3.2 使用CAS进行无锁编程的步骤
  1. 获得字段的期望值(oldValue)
  2. 计算出需要替换的新值(newValue)
  3. 通过CAS将新值(newValue)放在字段的内存地址上,如果CAS失败就重复第(1)步到第(2)步,一直到CAS成功,这种重复俗称CAS自旋
2.3.3 CAS进行无锁编程的伪代码
do
          {
               获得字段的期望值(oldValue);
               计算出需要替换的新值(newValue);
          } while (!CAS(内存地址,oldValue,newValue))
2.3.4 举例图解CAS 无所编程的流程

图解:

假如某个内存地址(某对象的属性)的值为100,现在有两个线程(线程A和线程B)使用CAS无锁编程对该内存地址进行更新,线程A欲将其值更新为200,线程B欲将其值更新为300
线程是并发执行的,谁都有可能先执行。但是CAS是原子操作,对同一个内存地址的CAS操作在同一时刻只能执行一个。因此,在这个例子中,要么线程A先执行,要么线程B先执行。假设线程A的CAS(100,200)执行在前,由于内存地址的旧值100与该CAS的期望值100相等,因此线程A会操作成功,内存地址的值被更新为200。

线程A执行CAS(100,200)成功之后

接下来执行线程B的CAS(100,300)操作,此时内存地址的值为200,不等于CAS的期望值100,线程B操作失败。线程B只能自旋,开始新的循环,这一轮循环首先获取到内存地址的值200,然后进行CAS(200,300)操作,这一次内存地址的值与CAS的预期值(oldValue)相等,线程B操作成功。

2.3.5 使用无锁编程实现轻量级安全自增

基于CAS无锁编程的安全自增实现版本的具体代码

public class TestCompareAndSwap
  {
    // 基于CAS无锁实现的安全自增
    static class OptimisticLockingPlus
      {
        //并发数量
        private static final int THREAD_COUNT = 10;

        //内部值,使用volatile保证线程可见性
        private  volatile int value;//值

        //不安全类
        private static final Unsafe unsafe = getUnsafe();;

        //value 的内存偏移(相对于对象头部的偏移,不是绝对偏移)
        private static final long valueOffset;

        //统计失败的次数
        private static final AtomicLong failure = new AtomicLong(0);

        static
        {
          try
            {
              //取得value属性的内存偏移
              valueOffset = unsafe.objectFieldOffset(
                OptimisticLockingPlus.class.getDeclaredField("value"));

              Print.tco("valueOffset:=" + valueOffset);
            } catch (Exception ex)
            {
              throw new Error(ex);
            }
        }
        //通过CAS原子操作,进行“比较并交换”
        public final boolean unSafeCompareAndSet(int oldValue, int newValue)
        { 
          //原子操作:使用unsafe的“比较并交换”方法进行value属性的交换
          return unsafe.compareAndSwapInt(
            this, valueOffset,oldValue ,newValue );
        }

        //使用无锁编程实现安全的自增方法
        public void selfPlus()
        {
          int oldValue = value;
          //通过CAS原子操作,如果操作失败就自旋,一直到操作成功
          do
            {
              // 获取旧值
              oldValue = value;
              //统计无效的自旋次数
              if (i++ > 1)
              {
                //记录失败的次数
                failure.incrementAndGet();
              }

            } while (!unSafeCompareAndSet(oldValue, oldValue + 1));
        }

        //测试用例入口方法
        public static void main(String[] args) throws InterruptedException
        {
          final OptimisticLockingPlus cas = new OptimisticLockingPlus();
          //倒数闩,需要倒数THREAD_COUNT次
          CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
          for (int i = 0; i < THREAD_COUNT; i++)
            {
              // 提交10个任务 
              ThreadUtil.getMixedTargetThreadPool().submit(() ->
                                                           {
                                                             //每个任务累加1000次
                                                             for (int j = 0; j < 1000; j++)
                                                               {
                                                                 cas.selfPlus();
                                                               }
                                                             latch.countDown();  // 执行完一个任务,倒数闩减少一次
                                                           });
            }
          latch.await(); //主线程等待倒数闩倒数完毕
          Print.tco("累加之和:" + cas.value);
          Print.tco("失败次数:" + cas.failure.get());
        }
      }
  }
2.3.6 字段偏移量的计算

调用Unsafe.objectFieldOffset(…)方法获取到的Object字段(也叫Object成员属性)的偏移量值是字段相对于Object头部的偏移量,是一个相对的内存地址值,不是绝对的内存地址值。
模拟代码:

// 模拟CAS 算法
         static class OptimisticLockingPlus
         {   //静态常量:线程数
             private static final int THREAD_COUNT = 10;
     
             //成员属性:包装的值
             volatile private int value;
     
             //静态常量:JDK不安全类的实例
             private static final Unsafe unsafe = JvmUtil.getUnsafe();
     
             //静态常量:value 成员的相对偏移(相对于对象头)
             private static final long valueOffset;
     
            //静态常量:CAS的失败次数
             private static final AtomicLong failure = new AtomicLong(0);
            // 省略其他不相干的代码
     }

虽然OptimisticLockingPlus类有5个字段,但是其中有4个是静态字段,属于类的成员而不是对象的成员,真正属于对象的字段只有其中的value字段

图解对象结构:

在64位的JVM堆区中一个OptimisticLockingPlus对象的Object Header(头部)占用了12字节,其中Mark Word占用了8字节(64位),压缩过的Class Pointer占用了4字节。接在Object Header之后的就是成员属性value的内存区域,所以value属性相对于Object Header的偏移量为12。

3、JUC原子类

在多线程并发执行时,诸如“++”或“–”类的运算不具备原子性,不是线程安全的操作。通常情况下,大家会使用synchronized将这些线程不安全的操作变成同步操作,但是这样会降低并发程序的性能。所以,JDK为这些类型不安全的操作提供了一些原子类,与synchronized同步机制相比,JDK原子类是基于CAS轻量级原子操作的实现,使得程序运行效率变得更高。

3.1 JUC中的Atomic原子操作包位置

JUC并发包中的原子类都存放在java.util.concurrent.atomic类路径下

3.2 JUC包中的原子类分类

3.2.1 基本原子类

基本原子类的功能是通过原子方式更新Java基础类型变量的值。基本原子类主要包括以下三个:

  • AtomicInteger:整型原子类。
  • AtomicLong:长整型原子类。
  • AtomicBoolean:布尔型原子类。

基础原子类AtomicInteger常用的方法

public final int get() //获取当前的值
public final int getAndSet(int newValue) //获取当前的值,然后设置新的值
public final int getAndIncrement()       //获取当前的值,然后自增
public final int getAndDecrement()       //获取当前的值,然后自减
public final int getAndAdd(int delta)    //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update)  //通过CAS方式设置整数值

基础原子类AtomicInteger的使用示例

public class AtomicTest
    {
        @Test
        public  void atomicIntegerTest()
        {
            int tempvalue = 0;
            //定义一个整数原子类实例,赋值到变量 i 
            AtomicInteger i = new AtomicInteger(0);

            //取值,然后设置一个新值
            tempvalue = i.getAndSet(3);
            //输出tempvalue:0;  i:3
            Print.fo("tempvalue:" + tempvalue + ";  i:" + i.get());

            //取值,然后自增
            tempvalue = i.getAndIncrement();
            //输出tempvalue:3;  i:4
            Print.fo("tempvalue:" + tempvalue + ";  i:" + i.get());

            //取值,然后增加5
            tempvalue = i.getAndAdd(5);
            //输出tempvalue:4;  i:9
            Print.fo("tempvalue:" + tempvalue + ";  i:" + i.get());

            //CAS交换
            boolean flag = i.compareAndSet(9, 100);
            //输出flag:true;  i:100
            Print.fo("flag:" + flag + ";  i:" + i.get());
        }
    }

基础原子类的综合示例

public class AtomicTest
    {
        @Test
        public static void main(String[] args) throws InterruptedException
        {
            CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
            //定义一个整数原子类实例,赋值到变量 i
            AtomicInteger atomicInteger = new AtomicInteger(0);

            for (int i = 0; i < THREAD_COUNT; i++)
                {
                    // 创建10个线程,模拟多线程环境
                    ThreadUtil.getMixedTargetThreadPool().submit(() ->
                                                                 {

                                                                     for (int j = 0; j < 1000; j++)
                                                                         {
                                                                             atomicInteger.getAndIncrement();
                                                                         }
                                                                     latch.countDown();

                                                                 });
                }
            latch.await();
            Print.tco("累加之和:" + atomicInteger.get());
        }
        // 省略不相关代码
    }

AtomicInteger线程安全原理
基础原子类(以AtomicInteger为例)主要通过CAS自旋+volatile的方案实现,既保障了变量操作的线程安全性,又避免了synchronized重量级锁的高开销,使得Java程序的执行效率大为提升。

原子类的CAS自旋+volatile的实现方案。AtomicInteger源码的具体代码:
AtomicInteger源码中的主要方法都是通过CAS自旋实现的。CAS自旋的主要操作为:如果一次CAS操作失败,获取最新的value值后,再次进行CAS操作,直到成功。

public class AtomicInteger extends Number
    implements java.io.Serializable {

        //Unsafe类实例
        private static final Unsafe unsafe = Unsafe.getUnsafe();

        //内部value值,使用volatile保证线程可见性
        private volatile int value;

        //value属性值的地址偏移量
        private static final long valueOffset;

        static {
            try {
                //计算value 属性值的地址偏移量
                valueOffset = unsafe.objectFieldOffset(
                    AtomicInteger.class.getDeclaredField("value"));
            } catch (Exception ex) { throw new Error(ex); }
        }

        //初始化
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }

        //获取当前value值
        public final int get() {
            return value;
        }

        //方法:返回旧值并赋新值
        public final int getAndSet(int newValue) {
            for (;;) {//自旋
                int current = get();//获取旧值

                //以CAS方式赋值,直到成功返回
                if (compareAndSet(current, newValue)) return current;
            }
        }

        //方法:封装底层的CAS操作,对比expect(期望值)与value,若不同则返回false
        //若expect与value相同,则将新值赋给value,并返回true
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(
                this, valueOffset, expect, update);
        }

        //方法:安全自增 i++
        public final int getAndIncrement() {
            for (;;) { //自旋
                int current = get();
                int next = current + 1;
                if (compareAndSet(current, next))
                    return current;
            }
        }

        //方法:自定义增量数
        public final int getAndAdd(int delta) {
            for (;;) { //自旋
                int current = get();
                int next = current + delta;
                if (compareAndSet(current, next))
                    return current;
            }
        }

        //方法:类似++i,返回自增后的值
        public final int incrementAndGet() {
            for (;;) { //自旋
                int current = get();
                int next = current + 1;
                if (compareAndSet(current, next))
                    return next;
            }
        }

        //方法:返回加上delta后的值 
        public final int addAndGet(int delta) {
            for (;;) {  //自旋
                int current = get();
                int next = current + delta;
                if (compareAndSet(current, next))
                    return next;
            }
        }
        // 省略其他源码
    }
3.2.2 数组原子类

数组原子类的功能是通过原子方式更数组中的某个元素的值。数组原子类主要包括以下三个:

  • AtomicIntegerArray:整型数组原子类。
  • AtomicLongArray:长整型数组原子类。
  • AtomicReferenceArray:引用类型数组原子类。

AtomicIntegerArray类的常用方法

//获取 index=i 位置元素的值
     public final int get(int i) 
     
     //返回 index=i 位置当前的值,并将其设置为新值:newValue
     public final int getAndSet(int i, int newValue)
     
     //获取 index=i 位置元素的值,并让该位置的元素自增
     public final int getAndIncrement(int i)
     
     //获取 index=i 位置元素的值,并让该位置的元素自减
     public final int getAndDecrement(int i) 
     
     //获取 index=i 位置元素的值,并加上预期的值
     public final int getAndAdd(int delta) 
     
     //如果输入的数值等于预期值,就以原子方式将位置i的元素值设置为输入值(update)
     boolean compareAndSet(int expect, int update) 
     
     //最终将位置i的元素设置为newValue
     //lazySet()方法可能导致其他线程在之后的一小段时间内还是可以读到旧的值
     public final void lazySet(int i, int newValue)

数组原子类AtomicIntegerArray的使用示例

public class AtomicTest
    {

        @Test
        public  void testAtomicIntegerArray () {
            int tempvalue = 0;
            //原始的数组
            int[] array = { 1, 2, 3, 4, 5, 6 };

            //包装为原子数组
            AtomicIntegerArray i = new AtomicIntegerArray(array);
            //获取第0个元素,然后设置为2
            tempvalue = i.getAndSet(0, 2);
            //输出tempvalue:1;  i:[2, 2, 3, 4, 5, 6]
            Print.fo("tempvalue:" + tempvalue + ";  i:" + i);

            //获取第0个元素,然后自增
            tempvalue = i.getAndIncrement(0);
            //输出tempvalue:2;  i:[3, 2, 3, 4, 5, 6]
            Print.fo("tempvalue:" + tempvalue + ";  i:" + i);

            //获取第0个元素,然后增加一个delta 5
            tempvalue = i.getAndAdd(0, 5);
            //输出tempvalue:3;  i:[8, 2, 3, 4, 5, 6]
            Print.fo("tempvalue:" + tempvalue + ";  i:" + i);
        }
    }
3.2.3 引用原子类

引用原子类主要包括以下三个:

  • AtomicReference:引用类型原子类。
  • AtomicMarkableReference:带有更新标记位的原子引用类型。
  • AtomicStampedReference:带有更新版本号的原子引用类型。

AtomicMarkableReference类将boolean标记与引用关联起来,可以解决使用AtomicBoolean进行原子更新时可能出现的ABA问题。AtomicStampedReference类将整数值与引用关联起来,可以解决使用AtomicInteger进行原子更新时可能出现的ABA问题。

对象操作的原子性:
基础的原子类型只能保证一个变量的原子操作,当需要对多个变量进行操作时,CAS无法保证原子性操作,这时可以用AtomicReference(原子引用类型)保证对象引用的原子性。
AtomicReference类的使用示例:

public class User implements Serializable
    {

        String uid;                                        //用户ID
        String nickName;                           //昵称
        public  volatile  int age;                 //年龄

        public User(String uid, String nickName)
        {
            this.uid = uid;
            this.nickName = nickName;
        }

        @Override
        public String toString()
        {
            return "User{" +
                "uid='" + getUid() + '\'' +
                ", nickName='" + getNickName() + '\'' +
                ", platform=" + getPlatform() +
                '}';
        }

使用AtomicReference对User的引用进行原子性修改

public class AtomicTest
    {
        @Test
        public void testAtomicReference() 
        {
            //包装的原子对象
            AtomicReference<User> userRef = new AtomicReference<User>();
            //待包装的User对象
            User user = new User("1", "张三");
            //为原子对象设置值
            userRef.set(user);
            Print.tco("userRef is:" +  userRef.get());

            //要使用CAS替换的User对象
            User updateUser = new User("2", "李四");
            //使用CAS替换
            boolean success = userRef.compareAndSet(user, updateUser);
            Print.tco(" cas result is:" + success);
            Print.tco(" after cas,userRef is:" +  userRef.get());
        }
        // 省略其他
    }
3.2.4 字段更新原子类

字段更新原子类主要包括以下三个:

  • AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
  • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
  • AtomicReferenceFieldUpdater:原子更新引用类型中的字段。

使用属性更新原子类保障属性安全更新的流程大致需要两步

  • 第一步,更新的对象属性必须使用public volatile修饰符。
  • 第二步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须调用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

AtomicIntegerFieldUpdater类的使用示例:

@Test
    public void testAtomicIntegerFieldUpdater() 
        {
        //调用静态方法newUpdater()创建一个更新器updater
        AtomicIntegerFieldUpdater<User> updater=
            AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
        User user = new User("1", "张三");

        //使用属性更新器的getAndIncrement、getAndAdd增加user的age值

        Print.tco(updater.getAndIncrement(user));// 1
        Print.tco(updater.getAndAdd(user, 100));// 101

        //使用属性更新器的get获取user的age值
        Print.tco(updater.get(user));// 101
    }

4、CAS 缺陷

CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方面:

  • 自旋 CAS 长时间地不成功,则会给 CPU 带来非常大的开销
  • 只能保证一个共享变量原子操作
  • ABA 问题

4.1 什么是ABA问题

一个线程A从内存位置M中取出V1,另一个线程B也取出V1。现在假设线程B进行了一些操作之后将M位置的数据V1变成了V2,然后又在一些操作之后将V2变成了V1。之后,线程A进行CAS操作,但是线程A发现M位置的数据仍然是V1,然后线程A操作成功。尽管线程A的CAS操作成功,但是不代表这个过程是没有问题的,线程A操作的数据V1可能已经不是之前的V1,而是被线程B替换过的V1

4.2 测试ABA 问题

代码:

@Slf4j
  public class ABATest {

    public static void main(String[] args) {
      AtomicInteger atomicInteger = new AtomicInteger(1);

      new Thread(()->{
        int value = atomicInteger.get();
        log.debug("Thread1 read value: " + value);

        // 阻塞1s
        LockSupport.parkNanos(1000000000L);

        // Thread1通过CAS修改value值为3
        if (atomicInteger.compareAndSet(value, 3)) {
          log.debug("Thread1 update from " + value + " to 3");
        } else {
          log.debug("Thread1 update fail!");
        }
      },"Thread1").start();

      new Thread(()->{
        int value = atomicInteger.get();
        log.debug("Thread2 read value: " + value);
        // Thread2通过CAS修改value值为2
        if (atomicInteger.compareAndSet(value, 2)) {
          log.debug("Thread2 update from " + value + " to 2");

          // do something
          value = atomicInteger.get();
          log.debug("Thread2 read value: " + value);
          // Thread2通过CAS修改value值为1
          if (atomicInteger.compareAndSet(value, 1)) {
            log.debug("Thread2 update from " + value + " to 1");
          }
        }
      },"Thread2").start();
    }

运行结果:

4.3 ABA问题解决方案

很多乐观锁的实现版本都是使用版本号(Version)方式来解决ABA问题。乐观锁每次在执行数据的修改操作时都会带上一个版本号,版本号和数据的版本号一致就可以执行修改操作并对版本号执行加1操作,否则执行失败。因为每次操作的版本号都会随之增加,所以不会出现ABA问题,因为版本号只会增加,不会减少。

4.3.1 使用AtomicStampedReference解决ABA问题

原理:

AtomicStampReference的compareAndSet()方法首先检查当前的对象引用值是否等于预期引用,并且当前印戳(Stamp)标志是否等于预期标志,如果全部相等,就以原子方式将引用值和印戳(Stamp)标志的值更新为给定的更新值。

AtomicStampReference的构造器有两个参数:

//构造器,V表示要引用的原始数据,initialStamp表示最初的版本印戳(版本号)
AtomicStampedReference(V initialRef, int initialStamp)

AtomicStampReference常用的几个方法:

//获取被封装的数据
public V getRerference();

//获取被封装的数据的版本印戳
public int getStamp();

AtomicStampedReference的CAS操作的定义:

public boolean compareAndSet(
    V  expectedReference,           //预期引用值
    V   newReference,                       //更新后的引用值
    int  expectedStamp,              //预期印戳(Stamp)标志值
    int  newStamp)                           //更新后的印戳(Stamp)标志值

实现原理:

compareAndSet()方法的第一个参数是原来的CAS中的参数,第二个参数是替换后的新参数,第三个参数是原来CAS数据旧的版本号,第四个参数表示替换后的新参数版本号。进行CAS操作时,若当前引用值等于预期引用值,并且当前印戳值等于预期印戳值,则以原子方式将引用值和印戳值更新为给定的更新值。

简单的AtomicStampedReference使用示例:

public class AtomicTest
    {

        @Test
        public void testAtomicStampedReference() 
        {

            CountDownLatch latch = new CountDownLatch(2);

            AtomicStampedReference<Integer> atomicStampedRef =
                new AtomicStampedReference<Integer>(1, 0);

            ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
                                                         {
                                                             @Override
                                                             public void run()
                                                             {
                                                                 boolean success = false;
                                                                 int stamp = atomicStampedRef.getStamp();
                                                                 Print.tco("before sleep 500: value=" 
                                                                           + atomicStampedRef.getReference()
                                                                           + " stamp=" + atomicStampedRef.getStamp());

                                                                 //等待500毫秒
                                                                 sleepMilliSeconds(500);
                                                                 success = atomicStampedRef.compareAndSet(1, 10,
                                                                                                          stamp, stamp + 1);

                                                                 Print.tco("after sleep 500 cas 1: success=" + success
                                                                           + " value=" + atomicStampedRef.getReference()
                                                                           + " stamp=" + atomicStampedRef.getStamp());

                                                                 //增加印戳值,然后更新,如果stamp被其他线程改了,就会更新失败
                                                                 stamp++;
                                                                 success = atomicStampedRef.compareAndSet(10, 1,
                                                                                                          stamp, stamp+1);
                                                                 Print.tco("after  sleep 500 cas 2: success=" + success
                                                                           + " value=" + atomicStampedRef.getReference()
                                                                           + " stamp=" + atomicStampedRef.getStamp());

                                                                 latch.countDown();
                                                             }
                                                         });

            ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
                                                         {
                                                             @Override
                                                             public void run()
                                                             {
                                                                 boolean success = false;
                                                                 int stamp = atomicStampedRef.getStamp();
                                                                 // stamp = 0
                                                                 Print.tco("before sleep 1000: value=" 
                                                                           + atomicStampedRef.getReference()
                                                                           + " stamp=" + atomicStampedRef.getStamp());

                                                                 //等待1000毫秒
                                                                 sleepMilliSeconds(1000);
                                                                 Print.tco("after sleep 1000: stamp = " 
                                                                           + atomicStampedRef.getStamp());

                                                                 //stamp = 1,这个值实际已经被修改了
                                                                 success = atomicStampedRef.compareAndSet(
                                                                     1, 20, stamp, stamp++);
                                                                 Print.tco("after cas 3 1000: success=" + success
                                                                           + " value=" + atomicStampedRef.getReference()
                                                                           + " stamp=" + atomicStampedRef.getStamp());
                                                                 latch.countDown();
                                                             }
                                                         });
            latch.await();

        }
        // 省略其他
    }
4.3.2 使用AtomicMarkableReference解决ABA问题

原理:
AtomicMarkableReference是AtomicStampedReference的简化版,不关心修改过几次,只关心是否修改过。因此,其标记属性mark是boolean类型,而不是数字类型,标记属性mark仅记录值是否修改过。

应用场景:
知道对象是否被修改过,而不适用于对象被反复修改的场景。

代码案列:

public class AtomicTest
    {
        @Test
        public void testAtomicMarkableReference() throws InterruptedException
        {

            CountDownLatch latch = new CountDownLatch(2);

            AtomicMarkableReference<Integer> atomicRef =
                new AtomicMarkableReference<Integer>(1, false);

            ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
                                                         {
                                                             @Override
                                                             public void run()
                                                             {
                                                                 boolean success = false;
                                                                 int value = atomicRef.getReference();
                                                                 boolean mark = getMark(atomicRef);
                                                                 Print.tco("before sleep 500: value=" + value
                                                                           + " mark=" + mark);

                                                                 //等待500毫秒
                                                                 sleepMilliSeconds(500);
                                                                 success = atomicRef.compareAndSet(1, 10, mark, !mark);

                                                                 Print.tco("after sleep 500 cas 1: success=" + success
                                                                           + " value=" + atomicRef.getReference()
                                                                           + " mark=" + getMark(atomicRef));

                                                                 latch.countDown();
                                                             }
                                                         });

            ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
                                                         {
                                                             @Override
                                                             public void run()
                                                             {
                                                                 boolean success = false;
                                                                 int value = atomicRef.getReference();
                                                                 boolean mark = getMark(atomicRef);
                                                                 Print.tco("before sleep 1000: value=" 
                                                                           + atomicRef.getReference()
                                                                           + " mark=" + mark);

                                                                 //等待1000毫秒
                                                                 sleepMilliSeconds(1000);
                                                                 Print.tco("after sleep 1000: mark = " + getMark(atomicRef));
                                                                 success = atomicRef.compareAndSet(1, 20, mark,!mark);
                                                                 Print.tco("after cas 3 1000: success=" + success
                                                                           + " value=" + atomicRef.getReference()
                                                                           + " mark=" + getMark(atomicRef));

                                                                 latch.countDown();
                                                             }
                                                         });
            latch.await();

        }

        //取得修改标志值
        private boolean getMark(AtomicMarkableReference<Integer> atomicRef)
        {
            boolean[] markHolder = {false};
            int value = atomicRef.get(markHolder);
            return markHolder[0];
        }
        // 省略其他
    }
4.3.3 提升高并发场景下CAS操作的性能

在争用激烈的场景下,会导致大量的CAS空自旋。比如,在大量线程同时并发修改一个AtomicInteger时,可能有很多线程会不停地自旋,甚至有的线程会进入一个无限重复的循环中。

5、以空间换时间:LongAdder

5.1 核心思想

LongAdder的核心思想是热点分离,与ConcurrentHashMap的设计思想类似:将value值分离成一个数组,当多线程访问时,通过Hash算法将线程映射到数组的一个元素进行操作;而获取最终的value结果时,则将数组的元素求和。

测试代码:

public class LongAdderVSAtomicLongTest
    {
        //每个线程的执行轮数
        final int TURNS = 100000000;

        //对比测试用例一:调用AtomicLong完成10个线程累加1000次
        @org.junit.Test
        public void testAtomicLong()
        {
            //并发任务数
            final int TASK_AMOUNT = 10;

            //线程池,获取CPU密集型任务线程池
            ExecutorService pool = ThreadUtil.getCpuIntenseTargetThreadPool();

            //定义一个原子对象
            AtomicLong atomicLong = new AtomicLong(0);

            //线程同步倒数闩
            CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
            long start = System.currentTimeMillis();
            for (int i = 0; i < TASK_AMOUNT; i++)
                {
                    pool.submit(() ->
                                {
                                    try
                                        {
                                            for (int j = 0; j < TURNS; j++)
                                                {
                                                    atomicLong.incrementAndGet();
                                                }
                                            // Print.tcfo("本线程累加完成");
                                        } catch (Exception e)
                                        {
                                            e.printStackTrace();
                                        }
                                    //倒数闩,倒数一次
                                    countDownLatch.countDown();
                                });
                }

            try
                {
                    //等待倒数闩完成所有的倒数操作
                    countDownLatch.await();
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            float time = (System.currentTimeMillis() - start) / 1000F;
            //输出统计结果
            Print.tcfo("运行的时长为:" + time);
            Print.tcfo("累加结果为:" + atomicLong.get());
        }

        //对比测试用例二:调用LongAdder完成10个线程累加1000次
        @org.junit.Test
        public void testLongAdder()
        {
            //并发任务数
            final int TASK_AMOUNT = 10;

            //线程池,获取CPU密集型任务线程池
            ExecutorService pool = ThreadUtil.getCpuIntenseTargetThreadPool();

            //定义一个LongAdder 对象
            LongAdder longAdder = new LongAdder();

            //线程同步倒数闩
            CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
            long start = System.currentTimeMillis();
            for (int i = 0; i < TASK_AMOUNT; i++)
                {
                    pool.submit(() ->
                                {
                                    try
                                        {
                                            for (int j = 0; j < TURNS; j++)
                                                {
                                                    longAdder.add(1);
                                                }
                                        } catch (Exception e)
                                        {
                                            e.printStackTrace();
                                        }
                                    //倒数闩,倒数一次
                                    countDownLatch.countDown();
                                });
                }
            try
                {
                    //等待倒数闩完成所有的倒数操作
                    countDownLatch.await();
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            float time = (System.currentTimeMillis() - start) / 1000F;
            //输出统计结果
            Print.tcfo("运行的时长为:" + time);
            Print.tcfo("累加结果为:" + longAdder.longValue());
        }
    }

5.2 LongAdder的原理

AtomicLong使用内部变量value保存着实际的long值,所有的操作都是针对该value变量进行的。也就是说,在高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。重试线程越多,就意味着CAS的失败概率更高,从而进入恶性CAS空自旋状态。

5.2.1 基本思路是分散热点

将value值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行CAS操作。这样热点就被分散了,冲突的概率就小很多。

5.2.2 .LongAdder实例的内部结构

LongAdder的内部成员包含一个base值和一个cells数组。在最初无竞争时,只操作base的值;当线程执行CAS失败后,才初始化cells数组,并为线程分配所对应的元素。

LongAdder中没有类似于AtomicLong中的getAndIncrement()或者incrementAndGet()这样的原子操作,所以只能通过increment()方法和longValue()方法的组合来实现更新和获取的操作。

5.2.3 基类Striped64内部三个重要的成员

LongAdder继承于Striped64类,base值和cells数组都在Striped64类中定义。基类Striped64内部三个重要的成员

/**
* 成员一:存放Cell的哈希表,大小为2的幂
*/
transient volatile Cell[] cells;
/**
* 成员二:基础值
* 1. 在没有竞争时会更新这个值
* 2. 在cells初始化时,cells不可用,也会尝试通过CAS操作值累加到base
*/
transient volatile long base;


/**
* 自旋锁,通过CAS操作加锁,为0表示cells数组没有处于创建、扩容阶段
* 为1表示正在创建或者扩展cells数组,不能进行新Cell元素的设置操作
*/
transient volatile int cellsBusy;
5.2.4 LongAdder类的add()方法
/**
* 自增1
*/
public void increment() {
    add(1L);
}

/**
* 自减1
*/
public void decrement() {
    add(-1L);
}

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null ||                                                            //CASE 1
    !casBase(b = base, b + x)) {                           //CASE 2
    if (as == null || (m = as.length - 1) < 0 ||           //CASE 3
        (a = as[getProbe() & m]) == null ||                    //CASE 4
        !(uncontended = a.cas(v = a.value, v + x))) //CASE 5
        longAccumulate(x, null, uncontended);
}
}
5.2.5 LongAdder类中的longAccumulate()方法

longAccumulate()是Striped64中重要的方法,实现不同的线程更新各自Cell中的值,其实现逻辑类似于分段锁

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
  int h;
  if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    wasUncontended = true;
  }

  //扩容意向,collide=true可以扩容,collide=false不可扩容
  boolean collide = false;  
  //自旋,一直到操作成功
  for (;;) {
    //as 表示cells引用
    //a 表示当前线程命中的Cell
    //n 表示cells数组长度
    //v 表示期望值
    Cell[] as; Cell a; int n; long v;

    //CASE1: 表示cells已经初始化了,当前线程应该将数据写入对应的Cell中
    //这个大的if分支有三个小分支
    if ((as = cells) != null && (n = as.length) > 0) {

      //CASE1.1:true表示下标位置的Cell为null,需要创建new Cell
      if ((a = as[(n - 1) & h]) == null) {
        if (cellsBusy == 0) {       // cells数组没有处于创建、扩容阶段
          Cell r = new Cell(x);   // Optimistically create
          if (cellsBusy == 0 && casCellsBusy()) {
            boolean created = false;
            try {               // Recheck under lock
              Cell[] rs; int m, j;
              if ((rs = cells) != null &&
                  (m = rs.length) > 0 &&
                  rs[j = (m - 1) & h] == null) {
                rs[j] = r;
                created = true;
              }
            } finally {
              cellsBusy = 0;
            }
            if (created)
              break;
            continue;   // Slot is now non-empty
          }
        }
        collide = false;
      }

        // CASE1.2:当前线程竞争修改失败,wasUncontended为false
      else if (!wasUncontended)              // CAS already known to fail
        wasUncontended = true;             // Continue after rehash

        //CASE 1.3:当前线程rehash过哈希值,CAS更新Cell
      else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                   fn.applyAsLong(v, x))))
        break;

        //CASE 1.4:调整扩容意向,然后进入下一轮循环
      else if (n >= NCPU || cells != as)
        collide = false;   // 达到最大值,或者as值过期

      //CASE 1.5:设置扩容意向为true,但是不一定真的发生扩容
      if (!collide)
        collide = true;

        //CASE 1.6:真正扩容的逻辑
      else if (cellsBusy == 0 && casCellsBusy()) {
        try {
          if (cells == as) {      // Expand table unless stale
            Cell[] rs = new Cell[n << 1];
            for (int i = 0; i < n; ++i)
              rs[i] = as[i];
            cells = rs;
          }
        } finally {
          cellsBusy = 0;          //释放锁
        }
        collide = false;
        continue;                  // Retry with expanded table
      }
      h = advanceProbe(h);           //重置(rehash)当前线程Hash值
    }

      //CASE2:cells还未初始化(as 为null),并且cellsBusy加锁成功 
    else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
      boolean init = false;
      try {                           // Initialize table
        if (cells == as) {
          Cell[] rs = new Cell[2];
          rs[h & 1] = new Cell(x);
          cells = rs;
          init = true;
        }
      } finally {
        cellsBusy = 0;
      }
      if (init)
        break;
    }

      //CASE3:当前线程cellsBusy加锁失败,表示其他线程正在初始化cells
      //所以当前线程将值累加到base,注意add(…)方法调用此方法时fn为null
    else if (casBase(v = base, ((fn == null) ? v + x :
                                fn.applyAsLong(v, x))))
      break;   // 在 base操作成功时跳出自旋
  }
}

longAccumulate的自旋过程中,有三个大的if分支

  • CASE1:表示cells已经初始化了,当前线程应该将数据写入对应的Cell中。
  • CASE2:cells还未初始化(as为null),本分支计划初始化cells,在此之前开始执行cellsBusy加锁,并且要求cellsBusy加锁成功。
  • CASE3:如果cellsBusy加锁失败,表示其他线程正在初始化cells,所以当前线程将值累加到base上。

CASE1表示当前线程应该将数据写入对应的Cell中,又分为以下几种细分情况:

  • CASE1.1:表示当前线程对应的下标位置的Cell为null,需要创建新Cell。
  • CASE1.2:wasUncontended是add(…)方法传递进来的参数如果为false,就表示cells已经被初始化,并且线程对应位置的Cell元素也已经被初始化,但是当前线程对Cell元素的竞争修改失败。如果add方法中的条件语句CASE 5通过CAS尝试把cells[m%cells.length]位置的Cell对象的value值设置为v+x而失败了,就说明已经发生竞争,就将wasUncontended设置为false。如果wasUncontended为false,就需要重新计算prob的值,那么自旋操作进入下一轮循环。
  • CASE 1.3:无论执行CASE1分支的哪个子条件,都会在末尾执行h=advanceProb()语句rehash出一个新哈希值,然后命中新的Cell,如果新命中的Cell不为空,在此分支进行CAS更新,将Cell的值更新为a.value+x,如果更新成功,就跳出自旋操作;否则还得继续自旋。
  • CASE 1.4:调整cells数组的扩容意向,然后进入下一轮循环。如果n≥NCPU条件成立,就表示cells数组大小已经大于等于CPU核数,扩容意向改为false,表示不扩容了;如果该条件不成立,就说明cells数组还可以扩容,尽管如此,如果cells !=as为true,就表示其他线程已经扩容过了,也会将扩容意向改为false,表示当前循环不扩容了。当前线程调到CASE1分支的末尾执行rehash操作重新计算prob的值,然后进入下一轮循环。
  • CASE 1.5:如果!collide=true满足,就表示扩容意向不满足,设置扩容意向为true,但是不一定真的发生扩容,然后进入CASE1分支末尾重新计算prob的值,接着进入下一轮循环。
  • CASE 1.6:执行真正扩容的逻辑。其条件一cellsBusy==0为true表示当前cellsBusy的值为0(无锁状态),当前线程可以去竞争这把锁;其条件二casCellsBusy()表示当前线程获取锁成功,CAS操作cellsBusy改为0成功,可以执行扩容逻辑。
5.2.6 LongAdder类的casCellsBusy()方法

当cellsBusy成员值为1时,表示cells数组正在被某个线程执行初始化或扩容操作,其他线程不能进行以下操作:(1)对cells数组执行初始化。(2)对cells数组执行扩容。(3)如果cells数组中某个元素为null,就为该元素创建新的Cell对象。因为数组的结构正在修改,所以其他线程不能创建新的Cell对象。

6、参考文档

Java高并发编程学习笔记
并发编程之CAS&Atomic原子操作学习笔记