【并发编程篇】并发编程之CAS&Atomic原子操作
1、前言
由于JVM的Synchronized重量级锁涉及操作系统(如Linux)内核态下互斥锁的使用,因此其线程阻塞和唤醒都涉及进程在用户态到内核态的频繁切换,导致重量级锁开销大、性能低。而JVM的Synchronized轻量级锁使用CAS(Compare And Swap,比较并交换)进行自旋抢锁。
2、CAS深入理解
2.1 什么是 CAS
CAS(Compare And Swap,比较并交换),通常指的是这样一种原子操作:针对一个变量,首先比较它的内存值与某个期望值是否相同,如果相同,就给它赋一个新值。JDK 5所增加的JUC(java.util.concurrent)并发包对操作系统的底层CAS原子操作进行了封装,为上层Java程序提供了CAS操作的API。
CAS 的逻辑用伪代码描述如下:
if (value == expectedValue) {
value = newValue;
代码解释:
以上伪代码描述了一个由比较和赋值两阶段组成的复合操作,CAS 可以看作是它们合并后的整体——一个不可分割的原子操作,并且其原子性是直接在硬件层面得到保障的。
CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java原子类中的递增操作就通过CAS自旋实现的。
CAS可以看做是乐观锁(对比数据库的悲观、乐观锁)的一种实现方式,Java原子类中的递增操作就通过CAS自旋实现的。
CAS是一种无锁算法,在不使用锁(没有线程被阻塞)的情况下实现多线程之间的变量同步。
2.2 Unsafe类中的CAS方法
2.2.1 概念
Unsafe是位于sun.misc包下的一个类,主要提供一些用于执行低级别、不安全的底层操作,如直接访问系统内存资源、自主管理内存资源等。Unsafe大量的方法都是native方法,基于C++语言实现,这些方法在提升Java运行效率、增强Java语言底层资源操作能力方面起到了很大的作用。
2.2.2 CAS操作主要涉及Unsafe方法的调用流程
(1)获取Unsafe实例
Unsafe类是一个final修饰的不允许继承的最终类,而且其构造函数是private类型的方法
public final class Unsafe {
private static final Unsafe theUnsafe;
public static final int INVALID_FIELD_OFFSET = -1;
private static native void registerNatives();
// 构造函数是private的,不允许外部实例化
private Unsafe() {
}
...
}
获取Unsafe实例:
// 省略import
public class JvmUtil
{
//自定义地获取Unsafe实例的辅助方法
public static Unsafe getUnsafe()
{
try
{
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(null);
} catch (Exception e)
{
throw new AssertionError(e);
}
}
// 省略不相干代码
}
(2)调用Unsafe提供的CAS方法,这些方法主要封装了底层CPU的CAS原子操作。
Unsafe提供的CAS方法:
Unsafe提供的CAS方法包含4个操作数——字段所在的对象、字段内存位置、预期原值及新值。在执行Unsafe的CAS方法时,这些方法首先将内存位置的值与预期值(旧的值)比较,如果相匹配,那么CPU会自动将该内存位置的值更新为新值,并返回true;如果不匹配,CPU不做任何操作,并返回false。
/**
* 定义在Unsafe类中的三个“比较并交换”原子方法
* @param o 需要操作的字段所在的对象
* @param offset 需要操作的字段的偏移量(相对的,相对于对象头)
* @param expected 期望值(旧的值)
* @param update 更新值(新的值)
* @return true 更新成功 | false 更新失败
*/
public final native boolean compareAndSwapObject(
Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt(
Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong(
Object o, long offset, long expected, long update);
(3)调用Unsafe提供的字段偏移量方法,这些方法用于获取对象中的字段(属性)偏移量,此偏移量值需要作为参数提供给CAS操作。
Unsafe提供的获取字段(属性)偏移量的相关操作
/**
* 定义在Unsafe类中的几个获取字段偏移量的方法
* @param o 需要操作字段的反射
* @return 字段的偏移量
*/
public native long staticFieldOffset(Field field);
public native long objectFieldOffset(Field field);
staticFieldOffset()方法用于获取静态属性Field在Class对象中的偏移量,在CAS中操作静态属性时会用到这个偏移量。objectFieldOffset()方法用于获取非静态Field(非静态属性)在Object实例中的偏移量,在CAS中操作对象的非静态属性时会用到这个偏移量。
一个获取非静态Field(非静态属性)在Object实例中的偏移量
static
{
try
{
//获取反射的Field对象
OptimisticLockingPlus.class.getDeclaredField("value");
//取得内存偏移
valueOffset = unsafe.objectFieldOffset();
} catch (Exception ex)
{
throw new Error(ex);
}
}
2.2.3 应用案例
以 compareAndSwapInt 为例,Unsafe 的 compareAndSwapInt 方法接收 4 个参数,分别是:对象实例、内存偏移量、字段期望值、字段新值。该方法会针对指定对象实例中的相应偏移量的字段执行 CAS 操作。
代码案列:
public class CASTest {
public static void main(String[] args) {
Entity entity = new Entity();
Unsafe unsafe = UnsafeFactory.getUnsafe();
long offset = UnsafeFactory.getFieldOffset(unsafe, Entity.class, "x");
boolean successful;
// 4个参数分别是:对象实例、字段的内存偏移量、字段期望值、字段新值
successful = unsafe.compareAndSwapInt(entity, offset, 0, 3);
System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, 5);
System.out.println(successful + "\t" + entity.x);
successful = unsafe.compareAndSwapInt(entity, offset, 3, 8);
System.out.println(successful + "\t" + entity.x);
}
}
public class UnsafeFactory {
/**
* 获取 Unsafe 对象
* @return
*/
public static Unsafe getUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 获取字段的内存偏移量
* @param unsafe
* @param clazz
* @param fieldName
* @return
*/
public static long getFieldOffset(Unsafe unsafe, Class clazz, String fieldName) {
try {
return unsafe.objectFieldOffset(clazz.getDeclaredField(fieldName));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
针对 entity.x 的 3 次 CAS 操作,分别试图将它从 0 改成 3、从 3 改成 5、从 3 改成 8。执行结果如下:

2.3 CAS进行无锁编程
2.3.1 概念
CAS是一种无锁算法,该算法关键依赖两个值——期望值(旧值)和新值,底层CPU利用原子操作判断内存原值与期望值是否相等,如果相等就给内存地址赋新值,否则不做任何操作。
2.3.2 使用CAS进行无锁编程的步骤
- 获得字段的期望值(oldValue)
- 计算出需要替换的新值(newValue)
- 通过CAS将新值(newValue)放在字段的内存地址上,如果CAS失败就重复第(1)步到第(2)步,一直到CAS成功,这种重复俗称CAS自旋
2.3.3 CAS进行无锁编程的伪代码
do
{
获得字段的期望值(oldValue);
计算出需要替换的新值(newValue);
} while (!CAS(内存地址,oldValue,newValue))
2.3.4 举例图解CAS 无所编程的流程

图解:
假如某个内存地址(某对象的属性)的值为100,现在有两个线程(线程A和线程B)使用CAS无锁编程对该内存地址进行更新,线程A欲将其值更新为200,线程B欲将其值更新为300
线程是并发执行的,谁都有可能先执行。但是CAS是原子操作,对同一个内存地址的CAS操作在同一时刻只能执行一个。因此,在这个例子中,要么线程A先执行,要么线程B先执行。假设线程A的CAS(100,200)执行在前,由于内存地址的旧值100与该CAS的期望值100相等,因此线程A会操作成功,内存地址的值被更新为200。
线程A执行CAS(100,200)成功之后

接下来执行线程B的CAS(100,300)操作,此时内存地址的值为200,不等于CAS的期望值100,线程B操作失败。线程B只能自旋,开始新的循环,这一轮循环首先获取到内存地址的值200,然后进行CAS(200,300)操作,这一次内存地址的值与CAS的预期值(oldValue)相等,线程B操作成功。
2.3.5 使用无锁编程实现轻量级安全自增
基于CAS无锁编程的安全自增实现版本的具体代码
public class TestCompareAndSwap
{
// 基于CAS无锁实现的安全自增
static class OptimisticLockingPlus
{
//并发数量
private static final int THREAD_COUNT = 10;
//内部值,使用volatile保证线程可见性
private volatile int value;//值
//不安全类
private static final Unsafe unsafe = getUnsafe();;
//value 的内存偏移(相对于对象头部的偏移,不是绝对偏移)
private static final long valueOffset;
//统计失败的次数
private static final AtomicLong failure = new AtomicLong(0);
static
{
try
{
//取得value属性的内存偏移
valueOffset = unsafe.objectFieldOffset(
OptimisticLockingPlus.class.getDeclaredField("value"));
Print.tco("valueOffset:=" + valueOffset);
} catch (Exception ex)
{
throw new Error(ex);
}
}
//通过CAS原子操作,进行“比较并交换”
public final boolean unSafeCompareAndSet(int oldValue, int newValue)
{
//原子操作:使用unsafe的“比较并交换”方法进行value属性的交换
return unsafe.compareAndSwapInt(
this, valueOffset,oldValue ,newValue );
}
//使用无锁编程实现安全的自增方法
public void selfPlus()
{
int oldValue = value;
//通过CAS原子操作,如果操作失败就自旋,一直到操作成功
do
{
// 获取旧值
oldValue = value;
//统计无效的自旋次数
if (i++ > 1)
{
//记录失败的次数
failure.incrementAndGet();
}
} while (!unSafeCompareAndSet(oldValue, oldValue + 1));
}
//测试用例入口方法
public static void main(String[] args) throws InterruptedException
{
final OptimisticLockingPlus cas = new OptimisticLockingPlus();
//倒数闩,需要倒数THREAD_COUNT次
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++)
{
// 提交10个任务
ThreadUtil.getMixedTargetThreadPool().submit(() ->
{
//每个任务累加1000次
for (int j = 0; j < 1000; j++)
{
cas.selfPlus();
}
latch.countDown(); // 执行完一个任务,倒数闩减少一次
});
}
latch.await(); //主线程等待倒数闩倒数完毕
Print.tco("累加之和:" + cas.value);
Print.tco("失败次数:" + cas.failure.get());
}
}
}
2.3.6 字段偏移量的计算
调用Unsafe.objectFieldOffset(…)方法获取到的Object字段(也叫Object成员属性)的偏移量值是字段相对于Object头部的偏移量,是一个相对的内存地址值,不是绝对的内存地址值。
模拟代码:
// 模拟CAS 算法
static class OptimisticLockingPlus
{ //静态常量:线程数
private static final int THREAD_COUNT = 10;
//成员属性:包装的值
volatile private int value;
//静态常量:JDK不安全类的实例
private static final Unsafe unsafe = JvmUtil.getUnsafe();
//静态常量:value 成员的相对偏移(相对于对象头)
private static final long valueOffset;
//静态常量:CAS的失败次数
private static final AtomicLong failure = new AtomicLong(0);
// 省略其他不相干的代码
}
虽然OptimisticLockingPlus类有5个字段,但是其中有4个是静态字段,属于类的成员而不是对象的成员,真正属于对象的字段只有其中的value字段
图解对象结构:

在64位的JVM堆区中一个OptimisticLockingPlus对象的Object Header(头部)占用了12字节,其中Mark Word占用了8字节(64位),压缩过的Class Pointer占用了4字节。接在Object Header之后的就是成员属性value的内存区域,所以value属性相对于Object Header的偏移量为12。
3、JUC原子类
在多线程并发执行时,诸如“++”或“–”类的运算不具备原子性,不是线程安全的操作。通常情况下,大家会使用synchronized将这些线程不安全的操作变成同步操作,但是这样会降低并发程序的性能。所以,JDK为这些类型不安全的操作提供了一些原子类,与synchronized同步机制相比,JDK原子类是基于CAS轻量级原子操作的实现,使得程序运行效率变得更高。
3.1 JUC中的Atomic原子操作包位置
JUC并发包中的原子类都存放在java.util.concurrent.atomic类路径下
3.2 JUC包中的原子类分类
3.2.1 基本原子类
基本原子类的功能是通过原子方式更新Java基础类型变量的值。基本原子类主要包括以下三个:
- AtomicInteger:整型原子类。
- AtomicLong:长整型原子类。
- AtomicBoolean:布尔型原子类。
基础原子类AtomicInteger常用的方法
public final int get() //获取当前的值
public final int getAndSet(int newValue) //获取当前的值,然后设置新的值
public final int getAndIncrement() //获取当前的值,然后自增
public final int getAndDecrement() //获取当前的值,然后自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //通过CAS方式设置整数值
基础原子类AtomicInteger的使用示例
public class AtomicTest
{
@Test
public void atomicIntegerTest()
{
int tempvalue = 0;
//定义一个整数原子类实例,赋值到变量 i
AtomicInteger i = new AtomicInteger(0);
//取值,然后设置一个新值
tempvalue = i.getAndSet(3);
//输出tempvalue:0; i:3
Print.fo("tempvalue:" + tempvalue + "; i:" + i.get());
//取值,然后自增
tempvalue = i.getAndIncrement();
//输出tempvalue:3; i:4
Print.fo("tempvalue:" + tempvalue + "; i:" + i.get());
//取值,然后增加5
tempvalue = i.getAndAdd(5);
//输出tempvalue:4; i:9
Print.fo("tempvalue:" + tempvalue + "; i:" + i.get());
//CAS交换
boolean flag = i.compareAndSet(9, 100);
//输出flag:true; i:100
Print.fo("flag:" + flag + "; i:" + i.get());
}
}
基础原子类的综合示例
public class AtomicTest
{
@Test
public static void main(String[] args) throws InterruptedException
{
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
//定义一个整数原子类实例,赋值到变量 i
AtomicInteger atomicInteger = new AtomicInteger(0);
for (int i = 0; i < THREAD_COUNT; i++)
{
// 创建10个线程,模拟多线程环境
ThreadUtil.getMixedTargetThreadPool().submit(() ->
{
for (int j = 0; j < 1000; j++)
{
atomicInteger.getAndIncrement();
}
latch.countDown();
});
}
latch.await();
Print.tco("累加之和:" + atomicInteger.get());
}
// 省略不相关代码
}
AtomicInteger线程安全原理
基础原子类(以AtomicInteger为例)主要通过CAS自旋+volatile的方案实现,既保障了变量操作的线程安全性,又避免了synchronized重量级锁的高开销,使得Java程序的执行效率大为提升。
原子类的CAS自旋+volatile的实现方案。AtomicInteger源码的具体代码:
AtomicInteger源码中的主要方法都是通过CAS自旋实现的。CAS自旋的主要操作为:如果一次CAS操作失败,获取最新的value值后,再次进行CAS操作,直到成功。
public class AtomicInteger extends Number
implements java.io.Serializable {
//Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
//内部value值,使用volatile保证线程可见性
private volatile int value;
//value属性值的地址偏移量
private static final long valueOffset;
static {
try {
//计算value 属性值的地址偏移量
valueOffset = unsafe.objectFieldOffset(
AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
//初始化
public AtomicInteger(int initialValue) {
value = initialValue;
}
//获取当前value值
public final int get() {
return value;
}
//方法:返回旧值并赋新值
public final int getAndSet(int newValue) {
for (;;) {//自旋
int current = get();//获取旧值
//以CAS方式赋值,直到成功返回
if (compareAndSet(current, newValue)) return current;
}
}
//方法:封装底层的CAS操作,对比expect(期望值)与value,若不同则返回false
//若expect与value相同,则将新值赋给value,并返回true
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(
this, valueOffset, expect, update);
}
//方法:安全自增 i++
public final int getAndIncrement() {
for (;;) { //自旋
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return current;
}
}
//方法:自定义增量数
public final int getAndAdd(int delta) {
for (;;) { //自旋
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}
//方法:类似++i,返回自增后的值
public final int incrementAndGet() {
for (;;) { //自旋
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
//方法:返回加上delta后的值
public final int addAndGet(int delta) {
for (;;) { //自旋
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return next;
}
}
// 省略其他源码
}
3.2.2 数组原子类
数组原子类的功能是通过原子方式更数组中的某个元素的值。数组原子类主要包括以下三个:
- AtomicIntegerArray:整型数组原子类。
- AtomicLongArray:长整型数组原子类。
- AtomicReferenceArray:引用类型数组原子类。
AtomicIntegerArray类的常用方法
//获取 index=i 位置元素的值
public final int get(int i)
//返回 index=i 位置当前的值,并将其设置为新值:newValue
public final int getAndSet(int i, int newValue)
//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndIncrement(int i)
//获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndDecrement(int i)
//获取 index=i 位置元素的值,并加上预期的值
public final int getAndAdd(int delta)
//如果输入的数值等于预期值,就以原子方式将位置i的元素值设置为输入值(update)
boolean compareAndSet(int expect, int update)
//最终将位置i的元素设置为newValue
//lazySet()方法可能导致其他线程在之后的一小段时间内还是可以读到旧的值
public final void lazySet(int i, int newValue)
数组原子类AtomicIntegerArray的使用示例
public class AtomicTest
{
@Test
public void testAtomicIntegerArray () {
int tempvalue = 0;
//原始的数组
int[] array = { 1, 2, 3, 4, 5, 6 };
//包装为原子数组
AtomicIntegerArray i = new AtomicIntegerArray(array);
//获取第0个元素,然后设置为2
tempvalue = i.getAndSet(0, 2);
//输出tempvalue:1; i:[2, 2, 3, 4, 5, 6]
Print.fo("tempvalue:" + tempvalue + "; i:" + i);
//获取第0个元素,然后自增
tempvalue = i.getAndIncrement(0);
//输出tempvalue:2; i:[3, 2, 3, 4, 5, 6]
Print.fo("tempvalue:" + tempvalue + "; i:" + i);
//获取第0个元素,然后增加一个delta 5
tempvalue = i.getAndAdd(0, 5);
//输出tempvalue:3; i:[8, 2, 3, 4, 5, 6]
Print.fo("tempvalue:" + tempvalue + "; i:" + i);
}
}
3.2.3 引用原子类
引用原子类主要包括以下三个:
- AtomicReference:引用类型原子类。
- AtomicMarkableReference:带有更新标记位的原子引用类型。
- AtomicStampedReference:带有更新版本号的原子引用类型。
AtomicMarkableReference类将boolean标记与引用关联起来,可以解决使用AtomicBoolean进行原子更新时可能出现的ABA问题。AtomicStampedReference类将整数值与引用关联起来,可以解决使用AtomicInteger进行原子更新时可能出现的ABA问题。
对象操作的原子性:
基础的原子类型只能保证一个变量的原子操作,当需要对多个变量进行操作时,CAS无法保证原子性操作,这时可以用AtomicReference(原子引用类型)保证对象引用的原子性。
AtomicReference类的使用示例:
public class User implements Serializable
{
String uid; //用户ID
String nickName; //昵称
public volatile int age; //年龄
public User(String uid, String nickName)
{
this.uid = uid;
this.nickName = nickName;
}
@Override
public String toString()
{
return "User{" +
"uid='" + getUid() + '\'' +
", nickName='" + getNickName() + '\'' +
", platform=" + getPlatform() +
'}';
}
使用AtomicReference对User的引用进行原子性修改
public class AtomicTest
{
@Test
public void testAtomicReference()
{
//包装的原子对象
AtomicReference<User> userRef = new AtomicReference<User>();
//待包装的User对象
User user = new User("1", "张三");
//为原子对象设置值
userRef.set(user);
Print.tco("userRef is:" + userRef.get());
//要使用CAS替换的User对象
User updateUser = new User("2", "李四");
//使用CAS替换
boolean success = userRef.compareAndSet(user, updateUser);
Print.tco(" cas result is:" + success);
Print.tco(" after cas,userRef is:" + userRef.get());
}
// 省略其他
}
3.2.4 字段更新原子类
字段更新原子类主要包括以下三个:
- AtomicIntegerFieldUpdater:原子更新整型字段的更新器。
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicReferenceFieldUpdater:原子更新引用类型中的字段。
使用属性更新原子类保障属性安全更新的流程大致需要两步
- 第一步,更新的对象属性必须使用public volatile修饰符。
- 第二步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须调用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
AtomicIntegerFieldUpdater类的使用示例:
@Test
public void testAtomicIntegerFieldUpdater()
{
//调用静态方法newUpdater()创建一个更新器updater
AtomicIntegerFieldUpdater<User> updater=
AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
User user = new User("1", "张三");
//使用属性更新器的getAndIncrement、getAndAdd增加user的age值
Print.tco(updater.getAndIncrement(user));// 1
Print.tco(updater.getAndAdd(user, 100));// 101
//使用属性更新器的get获取user的age值
Print.tco(updater.get(user));// 101
}
4、CAS 缺陷
CAS 虽然高效地解决了原子操作,但是还是存在一些缺陷的,主要表现在三个方面:
- 自旋 CAS 长时间地不成功,则会给 CPU 带来非常大的开销
- 只能保证一个共享变量原子操作
- ABA 问题
4.1 什么是ABA问题
一个线程A从内存位置M中取出V1,另一个线程B也取出V1。现在假设线程B进行了一些操作之后将M位置的数据V1变成了V2,然后又在一些操作之后将V2变成了V1。之后,线程A进行CAS操作,但是线程A发现M位置的数据仍然是V1,然后线程A操作成功。尽管线程A的CAS操作成功,但是不代表这个过程是没有问题的,线程A操作的数据V1可能已经不是之前的V1,而是被线程B替换过的V1

4.2 测试ABA 问题
代码:
@Slf4j
public class ABATest {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
new Thread(()->{
int value = atomicInteger.get();
log.debug("Thread1 read value: " + value);
// 阻塞1s
LockSupport.parkNanos(1000000000L);
// Thread1通过CAS修改value值为3
if (atomicInteger.compareAndSet(value, 3)) {
log.debug("Thread1 update from " + value + " to 3");
} else {
log.debug("Thread1 update fail!");
}
},"Thread1").start();
new Thread(()->{
int value = atomicInteger.get();
log.debug("Thread2 read value: " + value);
// Thread2通过CAS修改value值为2
if (atomicInteger.compareAndSet(value, 2)) {
log.debug("Thread2 update from " + value + " to 2");
// do something
value = atomicInteger.get();
log.debug("Thread2 read value: " + value);
// Thread2通过CAS修改value值为1
if (atomicInteger.compareAndSet(value, 1)) {
log.debug("Thread2 update from " + value + " to 1");
}
}
},"Thread2").start();
}
运行结果:

4.3 ABA问题解决方案
很多乐观锁的实现版本都是使用版本号(Version)方式来解决ABA问题。乐观锁每次在执行数据的修改操作时都会带上一个版本号,版本号和数据的版本号一致就可以执行修改操作并对版本号执行加1操作,否则执行失败。因为每次操作的版本号都会随之增加,所以不会出现ABA问题,因为版本号只会增加,不会减少。
4.3.1 使用AtomicStampedReference解决ABA问题
原理:
AtomicStampReference的compareAndSet()方法首先检查当前的对象引用值是否等于预期引用,并且当前印戳(Stamp)标志是否等于预期标志,如果全部相等,就以原子方式将引用值和印戳(Stamp)标志的值更新为给定的更新值。
AtomicStampReference的构造器有两个参数:
//构造器,V表示要引用的原始数据,initialStamp表示最初的版本印戳(版本号)
AtomicStampedReference(V initialRef, int initialStamp)
AtomicStampReference常用的几个方法:
//获取被封装的数据
public V getRerference();
//获取被封装的数据的版本印戳
public int getStamp();
AtomicStampedReference的CAS操作的定义:
public boolean compareAndSet(
V expectedReference, //预期引用值
V newReference, //更新后的引用值
int expectedStamp, //预期印戳(Stamp)标志值
int newStamp) //更新后的印戳(Stamp)标志值
实现原理:
compareAndSet()方法的第一个参数是原来的CAS中的参数,第二个参数是替换后的新参数,第三个参数是原来CAS数据旧的版本号,第四个参数表示替换后的新参数版本号。进行CAS操作时,若当前引用值等于预期引用值,并且当前印戳值等于预期印戳值,则以原子方式将引用值和印戳值更新为给定的更新值。
简单的AtomicStampedReference使用示例:
public class AtomicTest
{
@Test
public void testAtomicStampedReference()
{
CountDownLatch latch = new CountDownLatch(2);
AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<Integer>(1, 0);
ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
{
@Override
public void run()
{
boolean success = false;
int stamp = atomicStampedRef.getStamp();
Print.tco("before sleep 500: value="
+ atomicStampedRef.getReference()
+ " stamp=" + atomicStampedRef.getStamp());
//等待500毫秒
sleepMilliSeconds(500);
success = atomicStampedRef.compareAndSet(1, 10,
stamp, stamp + 1);
Print.tco("after sleep 500 cas 1: success=" + success
+ " value=" + atomicStampedRef.getReference()
+ " stamp=" + atomicStampedRef.getStamp());
//增加印戳值,然后更新,如果stamp被其他线程改了,就会更新失败
stamp++;
success = atomicStampedRef.compareAndSet(10, 1,
stamp, stamp+1);
Print.tco("after sleep 500 cas 2: success=" + success
+ " value=" + atomicStampedRef.getReference()
+ " stamp=" + atomicStampedRef.getStamp());
latch.countDown();
}
});
ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
{
@Override
public void run()
{
boolean success = false;
int stamp = atomicStampedRef.getStamp();
// stamp = 0
Print.tco("before sleep 1000: value="
+ atomicStampedRef.getReference()
+ " stamp=" + atomicStampedRef.getStamp());
//等待1000毫秒
sleepMilliSeconds(1000);
Print.tco("after sleep 1000: stamp = "
+ atomicStampedRef.getStamp());
//stamp = 1,这个值实际已经被修改了
success = atomicStampedRef.compareAndSet(
1, 20, stamp, stamp++);
Print.tco("after cas 3 1000: success=" + success
+ " value=" + atomicStampedRef.getReference()
+ " stamp=" + atomicStampedRef.getStamp());
latch.countDown();
}
});
latch.await();
}
// 省略其他
}
4.3.2 使用AtomicMarkableReference解决ABA问题
原理:
AtomicMarkableReference是AtomicStampedReference的简化版,不关心修改过几次,只关心是否修改过。因此,其标记属性mark是boolean类型,而不是数字类型,标记属性mark仅记录值是否修改过。
应用场景:
知道对象是否被修改过,而不适用于对象被反复修改的场景。
代码案列:
public class AtomicTest
{
@Test
public void testAtomicMarkableReference() throws InterruptedException
{
CountDownLatch latch = new CountDownLatch(2);
AtomicMarkableReference<Integer> atomicRef =
new AtomicMarkableReference<Integer>(1, false);
ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
{
@Override
public void run()
{
boolean success = false;
int value = atomicRef.getReference();
boolean mark = getMark(atomicRef);
Print.tco("before sleep 500: value=" + value
+ " mark=" + mark);
//等待500毫秒
sleepMilliSeconds(500);
success = atomicRef.compareAndSet(1, 10, mark, !mark);
Print.tco("after sleep 500 cas 1: success=" + success
+ " value=" + atomicRef.getReference()
+ " mark=" + getMark(atomicRef));
latch.countDown();
}
});
ThreadUtil.getMixedTargetThreadPool().submit(new Runnable()
{
@Override
public void run()
{
boolean success = false;
int value = atomicRef.getReference();
boolean mark = getMark(atomicRef);
Print.tco("before sleep 1000: value="
+ atomicRef.getReference()
+ " mark=" + mark);
//等待1000毫秒
sleepMilliSeconds(1000);
Print.tco("after sleep 1000: mark = " + getMark(atomicRef));
success = atomicRef.compareAndSet(1, 20, mark,!mark);
Print.tco("after cas 3 1000: success=" + success
+ " value=" + atomicRef.getReference()
+ " mark=" + getMark(atomicRef));
latch.countDown();
}
});
latch.await();
}
//取得修改标志值
private boolean getMark(AtomicMarkableReference<Integer> atomicRef)
{
boolean[] markHolder = {false};
int value = atomicRef.get(markHolder);
return markHolder[0];
}
// 省略其他
}
4.3.3 提升高并发场景下CAS操作的性能
在争用激烈的场景下,会导致大量的CAS空自旋。比如,在大量线程同时并发修改一个AtomicInteger时,可能有很多线程会不停地自旋,甚至有的线程会进入一个无限重复的循环中。
5、以空间换时间:LongAdder
5.1 核心思想
LongAdder的核心思想是热点分离,与ConcurrentHashMap的设计思想类似:将value值分离成一个数组,当多线程访问时,通过Hash算法将线程映射到数组的一个元素进行操作;而获取最终的value结果时,则将数组的元素求和。

测试代码:
public class LongAdderVSAtomicLongTest
{
//每个线程的执行轮数
final int TURNS = 100000000;
//对比测试用例一:调用AtomicLong完成10个线程累加1000次
@org.junit.Test
public void testAtomicLong()
{
//并发任务数
final int TASK_AMOUNT = 10;
//线程池,获取CPU密集型任务线程池
ExecutorService pool = ThreadUtil.getCpuIntenseTargetThreadPool();
//定义一个原子对象
AtomicLong atomicLong = new AtomicLong(0);
//线程同步倒数闩
CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < TASK_AMOUNT; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < TURNS; j++)
{
atomicLong.incrementAndGet();
}
// Print.tcfo("本线程累加完成");
} catch (Exception e)
{
e.printStackTrace();
}
//倒数闩,倒数一次
countDownLatch.countDown();
});
}
try
{
//等待倒数闩完成所有的倒数操作
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
Print.tcfo("运行的时长为:" + time);
Print.tcfo("累加结果为:" + atomicLong.get());
}
//对比测试用例二:调用LongAdder完成10个线程累加1000次
@org.junit.Test
public void testLongAdder()
{
//并发任务数
final int TASK_AMOUNT = 10;
//线程池,获取CPU密集型任务线程池
ExecutorService pool = ThreadUtil.getCpuIntenseTargetThreadPool();
//定义一个LongAdder 对象
LongAdder longAdder = new LongAdder();
//线程同步倒数闩
CountDownLatch countDownLatch = new CountDownLatch(TASK_AMOUNT);
long start = System.currentTimeMillis();
for (int i = 0; i < TASK_AMOUNT; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < TURNS; j++)
{
longAdder.add(1);
}
} catch (Exception e)
{
e.printStackTrace();
}
//倒数闩,倒数一次
countDownLatch.countDown();
});
}
try
{
//等待倒数闩完成所有的倒数操作
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
Print.tcfo("运行的时长为:" + time);
Print.tcfo("累加结果为:" + longAdder.longValue());
}
}
5.2 LongAdder的原理
AtomicLong使用内部变量value保存着实际的long值,所有的操作都是针对该value变量进行的。也就是说,在高并发环境下,value变量其实是一个热点,也就是N个线程竞争一个热点。重试线程越多,就意味着CAS的失败概率更高,从而进入恶性CAS空自旋状态。
5.2.1 基本思路是分散热点
将value值分散到一个数组中,不同线程会命中到数组的不同槽(元素)中,各个线程只对自己槽中的那个值进行CAS操作。这样热点就被分散了,冲突的概率就小很多。
5.2.2 .LongAdder实例的内部结构
LongAdder的内部成员包含一个base值和一个cells数组。在最初无竞争时,只操作base的值;当线程执行CAS失败后,才初始化cells数组,并为线程分配所对应的元素。
LongAdder中没有类似于AtomicLong中的getAndIncrement()或者incrementAndGet()这样的原子操作,所以只能通过increment()方法和longValue()方法的组合来实现更新和获取的操作。

5.2.3 基类Striped64内部三个重要的成员
LongAdder继承于Striped64类,base值和cells数组都在Striped64类中定义。基类Striped64内部三个重要的成员
/**
* 成员一:存放Cell的哈希表,大小为2的幂
*/
transient volatile Cell[] cells;
/**
* 成员二:基础值
* 1. 在没有竞争时会更新这个值
* 2. 在cells初始化时,cells不可用,也会尝试通过CAS操作值累加到base
*/
transient volatile long base;
/**
* 自旋锁,通过CAS操作加锁,为0表示cells数组没有处于创建、扩容阶段
* 为1表示正在创建或者扩展cells数组,不能进行新Cell元素的设置操作
*/
transient volatile int cellsBusy;
5.2.4 LongAdder类的add()方法
/**
* 自增1
*/
public void increment() {
add(1L);
}
/**
* 自减1
*/
public void decrement() {
add(-1L);
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || //CASE 1
!casBase(b = base, b + x)) { //CASE 2
if (as == null || (m = as.length - 1) < 0 || //CASE 3
(a = as[getProbe() & m]) == null || //CASE 4
!(uncontended = a.cas(v = a.value, v + x))) //CASE 5
longAccumulate(x, null, uncontended);
}
}
5.2.5 LongAdder类中的longAccumulate()方法
longAccumulate()是Striped64中重要的方法,实现不同的线程更新各自Cell中的值,其实现逻辑类似于分段锁
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
//扩容意向,collide=true可以扩容,collide=false不可扩容
boolean collide = false;
//自旋,一直到操作成功
for (;;) {
//as 表示cells引用
//a 表示当前线程命中的Cell
//n 表示cells数组长度
//v 表示期望值
Cell[] as; Cell a; int n; long v;
//CASE1: 表示cells已经初始化了,当前线程应该将数据写入对应的Cell中
//这个大的if分支有三个小分支
if ((as = cells) != null && (n = as.length) > 0) {
//CASE1.1:true表示下标位置的Cell为null,需要创建new Cell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // cells数组没有处于创建、扩容阶段
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// CASE1.2:当前线程竞争修改失败,wasUncontended为false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE 1.3:当前线程rehash过哈希值,CAS更新Cell
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//CASE 1.4:调整扩容意向,然后进入下一轮循环
else if (n >= NCPU || cells != as)
collide = false; // 达到最大值,或者as值过期
//CASE 1.5:设置扩容意向为true,但是不一定真的发生扩容
if (!collide)
collide = true;
//CASE 1.6:真正扩容的逻辑
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0; //释放锁
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); //重置(rehash)当前线程Hash值
}
//CASE2:cells还未初始化(as 为null),并且cellsBusy加锁成功
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:当前线程cellsBusy加锁失败,表示其他线程正在初始化cells
//所以当前线程将值累加到base,注意add(…)方法调用此方法时fn为null
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // 在 base操作成功时跳出自旋
}
}
longAccumulate的自旋过程中,有三个大的if分支
- CASE1:表示cells已经初始化了,当前线程应该将数据写入对应的Cell中。
- CASE2:cells还未初始化(as为null),本分支计划初始化cells,在此之前开始执行cellsBusy加锁,并且要求cellsBusy加锁成功。
- CASE3:如果cellsBusy加锁失败,表示其他线程正在初始化cells,所以当前线程将值累加到base上。
CASE1表示当前线程应该将数据写入对应的Cell中,又分为以下几种细分情况:
- CASE1.1:表示当前线程对应的下标位置的Cell为null,需要创建新Cell。
- CASE1.2:wasUncontended是add(…)方法传递进来的参数如果为false,就表示cells已经被初始化,并且线程对应位置的Cell元素也已经被初始化,但是当前线程对Cell元素的竞争修改失败。如果add方法中的条件语句CASE 5通过CAS尝试把cells[m%cells.length]位置的Cell对象的value值设置为v+x而失败了,就说明已经发生竞争,就将wasUncontended设置为false。如果wasUncontended为false,就需要重新计算prob的值,那么自旋操作进入下一轮循环。
- CASE 1.3:无论执行CASE1分支的哪个子条件,都会在末尾执行h=advanceProb()语句rehash出一个新哈希值,然后命中新的Cell,如果新命中的Cell不为空,在此分支进行CAS更新,将Cell的值更新为a.value+x,如果更新成功,就跳出自旋操作;否则还得继续自旋。
- CASE 1.4:调整cells数组的扩容意向,然后进入下一轮循环。如果n≥NCPU条件成立,就表示cells数组大小已经大于等于CPU核数,扩容意向改为false,表示不扩容了;如果该条件不成立,就说明cells数组还可以扩容,尽管如此,如果cells !=as为true,就表示其他线程已经扩容过了,也会将扩容意向改为false,表示当前循环不扩容了。当前线程调到CASE1分支的末尾执行rehash操作重新计算prob的值,然后进入下一轮循环。
- CASE 1.5:如果!collide=true满足,就表示扩容意向不满足,设置扩容意向为true,但是不一定真的发生扩容,然后进入CASE1分支末尾重新计算prob的值,接着进入下一轮循环。
- CASE 1.6:执行真正扩容的逻辑。其条件一cellsBusy==0为true表示当前cellsBusy的值为0(无锁状态),当前线程可以去竞争这把锁;其条件二casCellsBusy()表示当前线程获取锁成功,CAS操作cellsBusy改为0成功,可以执行扩容逻辑。
5.2.6 LongAdder类的casCellsBusy()方法
当cellsBusy成员值为1时,表示cells数组正在被某个线程执行初始化或扩容操作,其他线程不能进行以下操作:(1)对cells数组执行初始化。(2)对cells数组执行扩容。(3)如果cells数组中某个元素为null,就为该元素创建新的Cell对象。因为数组的结构正在修改,所以其他线程不能创建新的Cell对象。
6、参考文档
- 感谢你赐予我前进的力量

