【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析
【1】LongAdder 性能优势与适用场景
AtomicLong VS LongAdder
LongAdder适合的场景是统计求和计数的场景
在竞争激烈的情况下,LongAdder的效率比AtomicLong高,但要消耗更多的空间
LongAdder 性能优势
LongAdder 的基本思路是分散热点,
AtomicLong 中存在成员变量 value 保存 long 值,在高并发的情况下该 value 值是一个热点,由 N 个线程竞争;
LongAdder 将 value 值分散到一个数组中,不同的线程命中到数组的不同的槽中,
各线程仅仅对自己槽中的值进行 CAS 操作,从而分散热点减少冲突
【2】AtomicLong 源码分析
// 使用 Unsafe 对 long 类型变量提供 CAS 操作
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
// setup to use Unsafe.compareAndSwapLong for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
private static native boolean VMSupportsCS8();
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile long value;
public AtomicLong(long initialValue) {
value = initialValue;
}
public AtomicLong() {
}
public final long get() {
return value;
}
public final void set(long newValue) {
value = newValue;
}
public final void lazySet(long newValue) {
unsafe.putOrderedLong(this, valueOffset, newValue);
}
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
public final boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
// // Unsafe.java
// public final long getAndAddLong(Object o, long offset, long delta) {
// long v;
// // 如果其它线程正在修改AtomicLong, 本线程只能是不断的重试, 直到成功
// do {
// v = getLongVolatile(o, offset);
// } while (!compareAndSwapLong(o, offset, v, v + delta));
// return v;
// }
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
public final long getAndUpdate(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return prev;
}
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}
public final long getAndAccumulate(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
public final long accumulateAndGet(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return next;
}
public String toString() {
return Long.toString(get());
}
public int intValue() {
return (int)get();
}
public long longValue() {
return get();
}
public float floatValue() {
return (float)get();
}
public double doubleValue() {
return (double)get();
}
}
【3】LongAdder 源码分析
【3.1】LongAdder -- public void add(long x)
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
// 进入该分支场景
// 两种可能场景:
// 1. as != null, 说明已经使用了分散存储
// 2. as == null, 但casBase失败了,说明另一线程正在修改base
boolean uncontended = true; // true表处于非竞争状态, false表示处于多线程竞争状态
if (as == null // 场景 2
|| (m = as.length - 1) < 0 // 场景 1 但 as 的长度为 0
|| (a = as[getProbe() & m]) == null || // 场景1 as长度>0,以线程hash值取出一个cell,但它还未初始化
// 场景1, 取出了一个cell, 并且已经初始化了. 尝试直接将x加进去, 但另外一线程正在占用它
!(uncontended = a.cas(v = a.value, v + x)))
// 以上的判断语句尝试加x加到cell中去, 但失败了. 此时LongAdder与AtomicLong差不多, 都得不断的重试
longAccumulate(x, null, uncontended);
}
}
public void add(long x) 执行流程图

【3.2】Striped64 -- final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
// 获取当前线程的探针值,若为0
// 则创建当前线程的 ThreadLocalRandom 变量
// 线程的探针,可以理解为线程的hash值,首次使用需要初始化
ThreadLocalRandom.current(); // force initialization
// 获取当前线程的探针值
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // cells数组已经new过了
if ((a = as[(n - 1) & h]) == null) { // 按线程hash取出一个cell,但为null, 需要初始化这个cell
// 以下代码目的为了实现: as[(n - 1) & h] = new Cell(x)
// [加锁] cellsBusy充当了自旋锁的作用,阻止其它的线程同时执行以下代码
// 当前没有线程在执行以下代码,此处判断并不安全,因此下面的代码还要再判断一次
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
// 使用cas再次确认cellsBusy的值,并将cellsBusy置为1,表明当前线程正在执行以下代码
// casCellsBusy失败,说明其它线程正在执行以下代码
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 加锁之前的那些if语句判断都是不安全的,于是,加锁之后,再判断一次
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
// 当前处于竞争状态,尝试跳过一个回合,再下回合再试
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//尝试直接将x加到cell上
break;
else if (n >= NCPU || cells != as)
// 另个一个线程正在扩容, 或者可能已经扩容到上限了
// 跳过一个回合
collide = false; // At max size or stale
else if (!collide)
// 跳过一个回合
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
// 扩容操作,cells容量翻倍(只有当n小于CPU数量时)
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 修改线程探针(hash值),尝试其它的cell
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 首次初始化
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) 执行流程图
【3.3】LongAdder -- public long sum()
// 取值操作
// 将base及所有的cell中的值加起来
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。
【1】多线程编程:并发加/减操作优化, LongAdder原理,与AtomicLong比较
【2】Java多线程进阶(十七)—— J.U.C之atomic框架:LongAdder