天天看點

Jdk1.6 JUC源碼解析(2)-atomic-AtomicXXXArray

Jdk1.6 JUC源碼解析(2)-atomic-AtomicXXXArray

作者:大飛

功能簡介:

  • 數組原子量。

源碼分析:

  • 和原子量一樣,數組原子量内部有一個Unsafe的靜态引用。
private static final Unsafe unsafe = Unsafe.getUnsafe();
           
  • 首先先看下AtomicIntegerArray。

        AtomicIntegerArray的構造方法如下:

/**
     * Creates a new AtomicIntegerArray of given length.
     *
     * @param length the length of the array
     */
    public AtomicIntegerArray(int length) {
        array = new int[length];
        // must perform at least one volatile write to conform to JMM
        if (length > 0)
            unsafe.putIntVolatile(array, rawIndex(0), 0);
    }
    /**
     * Creates a new AtomicIntegerArray with the same length as, and
     * all elements copied from, the given array.
     *
     * @param array the array to copy elements from
     * @throws NullPointerException if array is null
     */
    public AtomicIntegerArray(int[] array) {
        if (array == null)
            throw new NullPointerException();
        int length = array.length;
        this.array = new int[length];
        if (length > 0) {
            int last = length-1;
            for (int i = 0; i < last; ++i)
                this.array[i] = array[i];
            // Do the last write as volatile
            unsafe.putIntVolatile(this.array, rawIndex(last), array[last]);
        }
    }
           

       注:目前源碼來之jdk1.6,在裡面會看到第一個構造方法最後就會添加一個volatile write,但在jdk1.8(從jdk1.7某個小版本開始)裡面就看不到這個volatile write了,與群友讨論得知這應該是一個遺留代碼,因為final完全可以保證這個語義(其他線程可以看到完全構造的内部array);第二個構造方法首先對内部final修飾的array進行指派,然後進行數組元素copy,為了保證其他線程可以看到完全構造(内部元素copy完成)的array,是以要在copy最後加一個volatile write。          相對于AtomicInteger來說,AtomicIntegerArray裡面的方法都帶有下标:

/**
     * Atomically increments by one the element at index {@code i}.
     *
     * @param i the index
     * @return the updated value
     */
    public final int incrementAndGet(int i) {
        while (true) {
            int current = get(i);
            int next = current + 1;
            if (compareAndSet(i, current, next))
                return next;
        }
    }
           

        接下來看一下AtomicIntegerArray根據下标取值的get方法:

/**
     * Gets the current value at position {@code i}.
     *
     * @param i the index
     * @return the current value
     */
    public final int get(int i) {
        return unsafe.getIntVolatile(array, rawIndex(i));
    }
           

       方法裡調用了unsafe的getIntVolatile方法。在hotspot/src/share/vm/classfile/vmSymbols.hpp中找到:

do_intrinsic(_getIntVolatile,           sun_misc_Unsafe,        getIntVolatile_name, getInt_signature,         F_RN)  \
           

        接着在hotspot/src/share/vm/opto/library_call.cpp中找到實作:

case vmIntrinsics::_getIntVolatile:
    return inline_unsafe_access(!is_native_ptr, !is_store, T_INT, true);
  ...
bool LibraryCallKit::inline_unsafe_access(bool is_native_ptr, bool is_store, BasicType type, bool is_volatile) {
  if (callee()->is_static())  return false;  // caller must have the capability!
...//省略不重要部分
  if (is_volatile) {
    if (!is_store)
      insert_mem_bar(Op_MemBarAcquire);
    else
      insert_mem_bar(Op_MemBarVolatile);
  }
  if (need_mem_bar) insert_mem_bar(Op_MemBarCPUOrder);
  return true;
}
           

       在方法的最後可以看到,如果是volatile且不是寫操作的話,會加入一個Op_MemBarAcquire的記憶體屏障,再看下hotspot/src/cpu/x86/vm/x86_64.ad:

instruct membar_acquire()
%{
  match(MemBarAcquire);
  ins_cost(0);
  size(0);
  format %{ "MEMBAR-acquire ! (empty encoding)" %}
  ins_encode();
  ins_pipe(empty);
%}
           

       可見在x86_64下也相當于是對一個普通域的讀取。          最後看一下AtomicIntegerArray根據下标設定值的set方法:

/**
     * Sets the element at position {@code i} to the given value.
     *
     * @param i the index
     * @param newValue the new value
     */
    public final void set(int i, int newValue) {
        unsafe.putIntVolatile(array, rawIndex(i), newValue);
    }
           

       方法裡調用了unsafe的putIntVolatile方法。在hotspot/src/share/vm/classfile/vmSymbols.hpp中找到:

do_intrinsic(_putIntVolatile,           sun_misc_Unsafe,        putIntVolatile_name, putInt_signature,         F_RN)  \
           

       接着在hotspot/src/share/vm/opto/library_call.cpp中找到實作:

case vmIntrinsics::_putIntVolatile:
    return inline_unsafe_access(!is_native_ptr, is_store, T_INT, true);
           

       和上面getIntVolatile的實作一樣:

if (is_volatile) {
    if (!is_store)
      insert_mem_bar(Op_MemBarAcquire);
    else
      insert_mem_bar(Op_MemBarVolatile);
  }
           

       可見,如果是寫操作做的話,加入Op_MemBarVolatile記憶體屏障,繼續看hotspot/src/cpu/x86/vm/x86_64.ad:

instruct membar_volatile(rFlagsReg cr) %{
  match(MemBarVolatile);
  effect(KILL cr);
  ins_cost(400);
  format %{
    $$template
    if (os::is_MP()) {
      $$emit$$"lock addl [rsp + #0], 0\t! membar_volatile"
    } else {
      $$emit$$"MEMBAR-volatile ! (empty encoding)"
    }
  %}
  ins_encode %{
    __ membar(Assembler::StoreLoad);
  %}
  ins_pipe(pipe_slow);
%}
           

       如果是多核CPU,就會加入lock addl... 這個指令。其實就相當于是對一個volatile修飾的域的寫操作喽。    

  • 再看下AtomicLongArray。

       AtomicLongArray的内部結構和AtomicIntegerArray類似,這裡不做分析,隻看一下get和set方法中内部調用的unsafe的getLongVolatile和putLongVolatile方法。        首先看下get方法中的getLongVolatile方法:

/**
     * Gets the current value at position {@code i}.
     *
     * @param i the index
     * @return the current value
     */
    public final long get(int i) {
        return unsafe.getLongVolatile(array, rawIndex(i));
    }
           

       看下内聯實作,vmSymbols.hpp,裡面有如下代碼:

do_intrinsic(_getLongVolatile,          sun_misc_Unsafe,        getLongVolatile_name, getLong_signature,       F_RN)  \
           

       然後找到library_call.cpp中找到相應實作:

case vmIntrinsics::_getLongVolatile:
    return inline_unsafe_access(!is_native_ptr, !is_store, T_LONG, true);
  ...
bool LibraryCallKit::inline_unsafe_access(bool is_native_ptr, bool is_store, BasicType type, bool is_volatile) {
  if (callee()->is_static())  return false;  // caller must have the capability!
...
  if (!is_store) {
    Node* p = make_load(control(), adr, value_type, type, adr_type, is_volatile);
...
  if (is_volatile) {
    if (!is_store)
      insert_mem_bar(Op_MemBarAcquire);
    else
      insert_mem_bar(Op_MemBarVolatile);
  }
  if (need_mem_bar) insert_mem_bar(Op_MemBarCPUOrder);
  return true;
}
           

       首先,後面那個MemBarAcquire之前看過,在x86-64下沒什麼用,看下前面的make_load,找到hotspot/src/share/vm/opto/graphKit.cpp:

Node* GraphKit::make_load(Node* ctl, Node* adr, const Type* t, BasicType bt,
                          int adr_idx,
                          bool require_atomic_access) {
  assert(adr_idx != Compile::AliasIdxTop, "use other make_load factory" );
  const TypePtr* adr_type = NULL; // debug-mode-only argument
  debug_only(adr_type = C->get_adr_type(adr_idx));
  Node* mem = memory(adr_idx);
  Node* ld;
  if (require_atomic_access && bt == T_LONG) {
    ld = LoadLNode::make_atomic(C, ctl, mem, adr, adr_type, t);
  } else {
    ld = LoadNode::make(_gvn, ctl, mem, adr, adr_type, t, bt);
  }
  return _gvn.transform(ld);
}
           

       可見,至少對這個long值得加載是原子的(這裡的原子操作應該指的是将long的高4位元組和低4位元組的操作合并成一個原子操作,比如某些平台不支援非volatile的long/double域的原子操作)。          然後看下set方法中的setLongVolatile方法:

/**
     * Sets the element at position {@code i} to the given value.
     *
     * @param i the index
     * @param newValue the new value
     */
    public final void set(int i, long newValue) {
        unsafe.putLongVolatile(array, rawIndex(i), newValue);
    }
           

       直接到library_call.cpp中看實作:

case vmIntrinsics::_putLongVolatile:
    return inline_unsafe_access(!is_native_ptr, is_store, T_LONG, true);
  ...
bool LibraryCallKit::inline_unsafe_access(bool is_native_ptr, bool is_store, BasicType type, bool is_volatile) {
  if (callee()->is_static())  return false;  // caller must have the capability!
...
  if (is_volatile) {
    if (!is_store)
      insert_mem_bar(Op_MemBarAcquire);
    else
      insert_mem_bar(Op_MemBarVolatile);
  }
  if (need_mem_bar) insert_mem_bar(Op_MemBarCPUOrder);
  return true;
}
           

       之前說過,這裡加入了一個lock addl ... 的記憶體屏障。      

  • 最後看下AtomicReferenceArray。

      AtomicReferenceArray的内部結構和AtomicIntegerArray類似,有一點點細節上的差別:

public class AtomicReferenceArray<E> implements java.io.Serializable {
    private static final long serialVersionUID = -6209656149925076980L;
    private static final Unsafe unsafe;
    private static final int base;
    private static final int shift;
    private static final long arrayFieldOffset;
    private final Object[] array; // must have exact type Object[]
    static {
        int scale;
        try {
            unsafe = Unsafe.getUnsafe();
            arrayFieldOffset = unsafe.objectFieldOffset
                (AtomicReferenceArray.class.getDeclaredField("array"));
            base = unsafe.arrayBaseOffset(Object[].class);
            scale = unsafe.arrayIndexScale(Object[].class);
        } catch (Exception e) {
            throw new Error(e);
        }
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }
    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);
        return byteOffset(i);
    }
    private static long byteOffset(int i) {
        return ((long) i << shift) + base;
    }
           

       首先,AtomicReferenceArray裡面多了一個arrayFieldOffset,這個域用來支援反序列化的。其次,與AtomicIntegerArray不同,AtomicReferenceArray并沒有scale域,取而代之的是shift域。閱讀代碼可知,其實目的都是計算rawIndex = base + index * scale,隻不過AtomicReferenceArray裡面把一部分運算轉換為等價的位操作(當然前提是scala為2的幂)。(了解更多位操作技巧,可參考Hacker's Delight)。        其他涉及到unsafe的操作前面都分析過,這裡不做分析了。            好了,源碼就分析到這裡!