本文内容譯自Lock-freedom without garbage collection,中間有少量自己的修改.
人們普遍認為,垃圾收集的一個優點是易于建構高性能的無鎖資料結構。對這些資料結構進行手動記憶體管理并不容易,而 GC 使其變得非常簡單。
這篇文章表明,使用 Rust,可以為并發資料結構建構一個記憶體管理 API:
- 使得實作無鎖資料結構和 有GC的語言(如Java) 一樣容易;
- 靜态保護以防止記憶體管理方案的濫用;
-
具有與 GC 競争的開銷(并且比 GC 更可預測)
在下面展示的基準測試中,Rust 能夠輕松地擊敗 Java 無鎖隊列實作,并且具有易于編寫的實作。
我已經在Crossbeam 這個crate中實作了“基于epoch的記憶體回收” ,這個crate現在就可以用于您自己的資料結構。本文介紹了一些關于無鎖資料結構的背景知識、 epoch 算法以及完整的Rust API。
Benchmarks
在深入研究基于epoch的記憶體回收的API設計和使用之前,讓我們直奔主題: 性能。
為了測試 Crossbeam 實作相對于完整 GC(有完整GC的語言,比如java) 的開銷,我在它上面實作了一個基本的無鎖隊列(Michael Scott queue) ,并在 Scala 中建構了相同的隊列。一般來說,基于 jvm 的語言是通向無鎖資料結構的“良好 GC”路徑的一個很好的測試用例。
除了上述實作,我還比較了:
- 基于crossbeam實作的更高效的分段隊列(segmented queue),它配置設定具有多個slot的節點
- 使用鎖保護的單線程隊列
- 使用java.util.concurrent實作的ConcurrentLinkedQueue,他是Michael Scott queue一個優化版本.
我使用了兩種方式來測試這些隊列:
- MPSC 一個多生産者、單消費者(MPSC)場景,其中兩個線程重複發送消息,一個線程接收消息,兩者都在一個循環中。
- MPMC 多生産者、多消費者(multi-producer,multi-consumer,MPMC)場景,其中兩個線程發送和兩個線程接收在一個循環中。
這樣的基準測試對于測量“競争”(多線程競争同時進行并發更新)下無鎖資料結構的可伸縮性相當典型。在建構生産隊列實作時,應該對許多變體進行基準測試; 這裡的目标僅僅是評估記憶體管理方案的大緻開銷。
對于 MPSC 測試,我還将其與 Rust 内置channel中使用的算法進行了比較,該算法針對此場景進行了優化(是以不支援 MPMC)。
測試的機器是4 Core 2.6 Ghz Intel Core i 7,16gb 記憶體。
以下是以納秒為機關給出的結果(越低越好)
分析
主要的收獲是,Crossbeam 的實作方式(尚未調整)在所有情況下都具有競争力。通過使用更聰明或更專業的隊列,可以在 Rust 和 JVM 兩方面都做得更好,但這些結果至少表明 epoch 的開銷是合理的。
注意,java / scala 版本在 MPMC 測試中的表現要比在 MPSC 測試中好得多。為什麼?
答案很簡單: 垃圾收集。在 MPSC 測試中,随着時間的推移,生産者往往會超過消費者,這意味着隊列中的資料量會緩慢增長。這反過來又增加了每次垃圾收集的成本,這涉及到周遊實時資料集。
相比之下,在 epoch 方案中,管理垃圾的成本是相對固定的: 它與線程的數量成正比,而不是與活動資料的數量成正比。事實證明,這會帶來更好、更一緻 / 更可預測的性能。
最後,我沒有在圖表中包括的一個比較(因為它會使其他比較相形見绌)是在 Rust 中圍繞 deque 使用 Mutex。對于 MPMC 測試,性能約為3040ns/op,比 Crossbeam 實作慢20倍以上。這生動地說明了為什麼無鎖資料結構非常重要——那麼讓我們開始深入了解它們是什麼。
無鎖資料結構
當您希望使用(和變更)來自許多并發線程的資料結構時,需要進行同步。最簡單的解決方案是全局鎖定 Rust,将整個資料結構包裝在一個 Mutex 中。
問題是,這種“粗粒度”同步意味着在通路資料結構時,多個線程總是需要協調,即使它們通路的是不相交的資料結構部分。這還意味着,即使一個線程隻是嘗試讀取,它也必須通過更新鎖狀态進行寫操作——由于鎖是一個全局通信點,這些寫操作将導緻大量緩存失效通信。即使對于更精細的内容使用了大量的鎖,仍然存在其他危險,如死鎖和優先轉置,并且通常仍然将性能提升有限(you often still leave performance on the table.)
一個更加激進的替代方案是無鎖資料結構,它使用原子操作直接更改資料結構,而不需要進一步同步。它們通常比基于鎖的設計更快、更可伸縮、更健壯。
在這篇文章中,我不打算給出一個完整的無鎖程式設計教程,但是關鍵的一點是,如果你沒有全局同步,那麼很難說你什麼時候可以釋放記憶體。許多已釋出的算法基本上假定是垃圾回收器或其他回收記憶體的方法。是以,在Rust中實作無鎖并發之前,我們需要一個關于記憶體回收的工具——這就是這篇部落格文章的目的所在。
特雷伯棧
為了讓事情更具體,讓我們看看無鎖資料結構的“ Hello world” : 特雷伯棧(Treiber’s stack)。棧表示為單連結清單,所有修改都發生在head這個指針上:
#![feature(box_raw)]
use std::ptr::{self, null_mut};
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{Relaxed, Release, Acquire};
pub struct Stack<T> {
head: AtomicPtr<Node<T>>,
}
struct Node<T> {
data: T,
next: *mut Node<T>,
}
impl<T> Stack<T> {
pub fn new() -> Stack<T> {
Stack {
head: AtomicPtr::new(null_mut()),
}
}
}
最簡單的方法就是從pop開始。要pop,你隻需要循環儲存棧頂指針head,然後做一個CAS,用next指針替換目前head:
請注意,如果舊值比對,則 compare_and_swap 原子性地将AtomicPtr的值從舊值更改為新值。同時,如果你不熟悉Acquire, Release和Relaxed 這些标簽的含義,你可以放心地忽略它們。
impl<T> Stack<T> {
pub fn pop(&self) -> Option<T> {
loop {
// take a snapshot
let head = self.head.load(Acquire);
// we observed the stack empty
if head == null_mut() {
return None
} else {
let next = unsafe { (*head).next };
// if snapshot is still good, update from `head` to `next`
if self.head.compare_and_swap(head, next, Release) == head {
// extract out the data from the now-unlinked node
// **NOTE**: leaks the node!
return Some(unsafe { ptr::read(&(*head).data) })
}
}
}
}
}
ptr::read
函數是 Rust 在沒有靜态或動态跟蹤的情況下提取資料所有權的方法。在這裡,我們使用
compare_and_swap
的原子性來保證隻有一個線程會調用
ptr::read
-正如我們将看到的,這個實作永遠不會釋放 Nodes,是以資料上的析構函數永遠不會被調用。這兩個事實一起使我們使用
ptr::read
是安全的。
push 和pop差不多:
impl<T> Stack<T> {
pub fn push(&self, t: T) {
// allocate the node, and immediately turn it into a *mut pointer
let n = Box::into_raw(Box::new(Node {
data: t,
next: null_mut(),
}));
loop {
// snapshot current head
let head = self.head.load(Relaxed);
// update `next` pointer with snapshot
unsafe { (*n).next = head; }
// if snapshot is still good, link in new node
if self.head.compare_and_swap(head, n, Release) == head {
break
}
}
}
}
The problem
如果是支援GC的語言編寫了上面的代碼,那麼我們已經實作了一個lock-free stack,但是在rust中不行,因為它發生了記憶體洩漏.特别是,當Node指針從堆棧中移除後,pop實作不會嘗試釋放它。
如果我們加上記憶體釋放會有什麼問題呢:
// extract out the data from the now-unlinked node
let ret = Some(unsafe { ptr::read(&(*head).data) });
// free the node
mem::drop(Box::from_raw(head));
return ret
問題是其他線程也可能同時運作 pop。這些線程可以獲得目前head的快照; 沒有什麼可以阻止它們讀取
(*head).next
。就在我們釋放他們指向的節點之後,快照上的下一個——一個正在形成的
user-after-free
bug!
這就是症結所在。我們希望使用無鎖算法,但是許多算法遵循與上面的棧類似的模式,這使得我們沒有明确的點可以安全地釋放一個節點。現在怎麼辦?
感覺這裡描述的問題并不是很清楚,我加上一個ABA問題
所有沒有gc的系統在lock-free程式設計的是一定要考慮這個問題,具體來說就是假設T1,T2兩個線程, 初始stack是a->b->c. 這是T1要pop,那麼得到a,這時候a.next=b,此時發生了線程排程.然後切換到T2,T2 pop a, pop b,push a 是以stack裡面是a->c.
然後切換到T1,這時候T1看到棧頂位址和自己取到的是一樣的(a的位址),然後就把棧頂設定為a.next,也就是b,但是這時候b已經被釋放了.
這是典型的ABAA問題:
基于epoch的記憶體回收
對于無鎖代碼,有幾種不是基于 gc 的記憶體管理方法,但它們都歸結為相同的核心特點:
- 可達性有兩個來源——資料結構和通路它的線程中的快照(引用)。在删除一個節點之前,我們需要知道它不能以這兩種方式中的任何一種方式到達。
- 一旦一個節點從資料結構中解除連結,就不會建立到達該節點的新快照(引用)。
其中一個最優雅和有希望的回收方案是Keir Fraser’s epoch-based reclamation,這來自于他的博士論文。
其基本思想是将未與資料結構連結的節點(可達性的第一個來源)隐藏起來,直到可以安全地删除它們。在删除隐藏的節點之前,我們需要知道當時通路資料結構的所有線程都已經完成了它們正在執行的操作。通過上面的特點2,這将意味着不再有任何引用留下(因為在此期間不可能建立新的引用)。最困難的部分是在不同步的情況下完成所有這些工作。否則,我們就失去了lock-free應該帶來的好處!
epoch的工作原理是:
- A 全局epoch計數器(取值0、1和2) ;
- A global list of garbage for each epoch;每個opoch都有一個全局垃圾清單;
- 每個線程的“活動”标志;
- 每個線程的epoch計數器
epoch用于發現何時垃圾可以安全地釋放,因為沒有線程可以到達它。與傳統的 GC 不同,這不需要周遊實時資料; 這隻是檢查opoch計數器。
當一個線程想要對資料結構執行一個操作時,它首先設定它的
active
标志,然後更新它的本地epoch以比對全局epoch。如果線程從資料結構中删除一個節點,則将該節點添加到目前全局epoch的垃圾清單中。(注意: 垃圾進入目前的全局epoch,而不是以前的本地快照,這一點非常重要。) 當它完成操作時,它清除
active
标志。
為了嘗試收集垃圾(可以在任何時候執行) ,線程周遊所有參與線程的标志,并檢查所有活動線程是否都在目前epoch中。如果是這樣,它可以嘗試增加全局opoch(模3)。如果增量成功,則可以釋放兩個eopch前的垃圾。
為什麼我們需要三個epoch?因為“垃圾收集”是并發完成的,是以線程在任何時候都可能處于兩個epoch之一(舊的epoch和新的epoch)。但是,因為我們在遞增之前檢查所有活動線程都處于舊eopch中,是以可以保證沒有活動線程處于第三個epoch中。
這個方案是經過精心設計的,以便在大多數時間裡,線程隻通路已經在緩存中或(通常)線程本地的資料。隻有在進行“ GC”時,才會涉及到改變全局epoch或者讀取其他線程的eopch。epoch 方法也與算法無關,易于使用,其性能與其他方法相比具有競争力。
這個方案也證明非常适合Rust的所有權系統.
The Rust API
我們希望 Rust API 反映基于epoch的回收的基本原則:
- 當對共享資料結構進行操作時,線程必須始終處于“活動”狀态。
- 當線程處于活動狀态時,從資料結構中讀出的所有資料将保持配置設定狀态,直到線程變為非活動狀态。
我們将利用 Rust 的所有權系統——特别是基于所有權的資源管理(又名 RAII)——在 epoch API 的類型簽名中直接捕獲這些限制。這反過來将有助于確定我們正确管理epoch。
Guard
要對無鎖資料結構進行操作,首先需要獲得一個
guard
,這是一個所有值,表示線程處于活動狀态:
pub struct Guard { ... }
pub fn pin() -> Guard;
Pin 函數将線程标記為 active,加載全局epoch,并可能嘗試執行 GC (稍後将詳細介紹)。另一方面,Guard 的析構函數通過将線程标記為非活動狀态退出epoch管理。
因為
Guard
表示“處于活動狀态” ,是以借用
&'a Guard
保證線程在整個生存期内處于活動狀态——這正是我們在無鎖算法中綁定快照生存期所需要的。
為了使
Guard
能夠工作,Crossbeam 提供了一組三指針類型,它們可以一起工作:
- Owned,類似于Box,擁有所有權,并且這些資料還尚未放進并發資料結構中.
- Shared<'a,T>,,類似于&'a T,指向并發資料結構中的資料,可能可通路,也可能不可通路,但是確定在生命周期'a期間不會被釋放.
- Atomic,類似于
,它使用 std::sync::atomic::AtomicPtr
和 Owned
類型提供對指針的原子更新,并将它們連接配接到一個 Shared
。Guard
然後我們一起來看看這些指針.
Owned and Shared pointers
Owned
Owned
接口與Box幾乎相同:
pub struct Owned<T> { ... }
impl<T> Owned<T> {
pub fn new(t: T) -> Owned<T>;
}
impl<T> Deref for Owned<T> {
type Target = T;
...
}
impl<T> DerefMut for Owned<T> { ... }
Shared
Shared<'a,T>
類似于
&'a T
,但是他可以
Deref
到
&'a T
. 它所提供的指針的生命周期實際上是 a。
pub struct Shared<'a, T: 'a> { ... }
impl<'a, T> Copy for Shared<'a, T> { ... }
impl<'a, T> Clone for Shared<'a, T> { ... }
impl<'a, T> Deref for Shared<'a, T> {
type Target = &'a T;
...
}
Atomic
crossbeam-epoch這個庫的核心就是Atomic,它提供了對于nullable pointer的原子通路,将庫的其他部分連接配接起來.
pub struct Atomic<T> { ... }
impl<T> Atomic<T> {
/// Create a new, null atomic pointer.
pub fn null() -> Atomic<T>;
}
我們一起一次隻看一個操作,因為這些操作的簽名非常微妙.
Loading
首先從load開始
impl<T> Atomic<T> {
pub fn load<'a>(&self, ord: Ordering, _: &'a Guard) -> Option<Shared<'a, T>>;
}
為了完成這個任務,我們必須借用一個
Guard
。如上所述,這是一種保證線程在整個生命周期中處于活動狀态的方法。作為回報,您将得到一個可選的
Shared
指針(如果 Atomic 目前為 null,則傳回 None) ,其生命期綁定到
Guard
。
将其與标準庫的 AtomicPtr 接口進行比較很有意思,在這個接口中 load 傳回一個 * mut t。
AtomicPtr的load簽名
pub fn load(&self, order: Ordering) -> *mut T
store
Storing is a bit more complicated because of the multiple pointer types in play.
如果我們隻是想寫一個 Owned 指針或一個 null 值,我們甚至不需要線程是活動的。我們隻是将所有權轉移到資料結構中,不需要任何關于指針生命周期的保證:
impl<T> Atomic<T> {
pub fn store(&self, val: Option<Owned<T>>, ord: Ordering);
}
但有時,我們希望将所有權轉移到資料結構中,并立即獲得一個指向所轉移資料的
Shared
指針——例如,因為我們希望添加到資料結構中同一節點的附加連結。在這種情況下,我們需要将其生命周期和一個
Guard
聯系起來:
impl<T> Atomic<T> {
pub fn store_and_ref<'a>(&self,
val: Owned<T>,
ord: Ordering,
_: &'a Guard)
-> Shared<'a, T>;
}
請注意,val的運作時表示和傳回值完全相同——我們傳入一個指針,然後得到相同的指針。但是從Rust的觀點來看,這一步的所有權狀況發生了根本性的變化。
最後,我們可以講一個
Shared
指針存回資料結構中.
impl<T> Atomic<T> {
pub fn store_shared(&self, val: Option<Shared<T>>, ord: Ordering);
}
這個操作不需要guard,因為我們沒有學到任何關于指針生命周期的新資訊.
CAS
compare-and-set,換出一個
Shared
,放入一個
Owned
:
impl<T> Atomic<T> {
pub fn cas(&self,
old: Option<Shared<T>>,
new: Option<Owned<T>>,
ord: Ordering)
-> Result<(), Option<Owned<T>>>;
}
與store一樣,此操作不需要
Guard
; 它不産生新的生命周期資訊。結果傳回CAS是否成功; 如果不成功,新指針的所有權将傳回給調用者。
我們有一個類似的操作
store_and_ref
:
impl<T> Atomic<T> {
pub fn cas_and_ref<'a>(&self,
old: Option<Shared<T>>,
new: Owned<T>,
ord: Ordering,
_: &'a Guard)
-> Result<Shared<'a, T>, Owned<T>>;
差別在于如果成功,會傳回一個
Shared
指針,指向剛剛放入的資料.
最後,我們可以使用一個Shared替換另一個Shared.
impl<T> Atomic<T> {
pub fn cas_shared(&self,
old: Option<Shared<T>>,
new: Option<Shared<T>>,
ord: Ordering)
-> bool;
}
當 CAS 成功時,布爾傳回值為 true。
釋放記憶體
上述所有機制的目标就是: 釋放不再可及(no longer reachable)的記憶體. 當一個節點已經從資料結構中unlink,unlink的線程可以通知他的
Guard
記憶體應被回收:
impl Guard {
pub unsafe fn unlinked<T>(&self, val: Shared<T>);
}
此操作将 Shared 指針添加到适當的垃圾清單中,允許在兩個epoch之後釋放它。
該行動是unsafe,因為它聲稱:
- 從資料結構無法通路指針,
- 沒有其他線程會再針對這個val,調用unlink
但關鍵的是,其他線程可能會繼續引用這個共享指針; epoch 系統将確定在實際釋放指針時沒有線程再繼續引用這個共享指針。
這裡的共享指針的生命周期和
Guard
之間沒有特定的聯系; 如果我們有一個可達的
Shared
指針,我們知道它來自的
Guard
是活動的。
基于epoch的特雷貝爾棧
下面給出使用Crossbeam實作的特雷貝爾棧的完整源碼:
use std::sync::atomic::Ordering::{Acquire, Release, Relaxed};
use std::ptr;
use crossbeam::mem::epoch::{self, Atomic, Owned};
pub struct TreiberStack<T> {
head: Atomic<Node<T>>,
}
struct Node<T> {
data: T,
next: Atomic<Node<T>>,
}
impl<T> TreiberStack<T> {
pub fn new() -> TreiberStack<T> {
TreiberStack {
head: Atomic::new()
}
}
pub fn push(&self, t: T) {
// allocate the node via Owned
let mut n = Owned::new(Node {
data: t,
next: Atomic::new(),
});
// become active
let guard = epoch::pin();
loop {
// snapshot current head
let head = self.head.load(Relaxed, &guard);
// update `next` pointer with snapshot
n.next.store_shared(head, Relaxed);
// if snapshot is still good, link in the new node
match self.head.cas_and_ref(head, n, Release, &guard) {
Ok(_) => return,
Err(owned) => n = owned,
}
}
}
pub fn pop(&self) -> Option<T> {
// become active
let guard = epoch::pin();
loop {
// take a snapshot
match self.head.load(Acquire, &guard) {
// the stack is non-empty
Some(head) => {
// read through the snapshot, *safely*!
let next = head.next.load(Relaxed, &guard);
// if snapshot is still good, update from `head` to `next`
if self.head.cas_shared(Some(head), next, Release) {
unsafe {
// mark the node as unlinked
guard.unlinked(head);
// extract out the data from the now-unlinked node
return Some(ptr::read(&(*head).data))
}
}
}
// we observed the stack empty
None => return None
}
}
}
}
一些現象:
- 該算法的基本邏輯與依賴于 GC 的版本相同,隻是我們顯式地将彈出的節點标記為“ unlinked”。一般來說,可以采用“現成的”無鎖算法(一般的算法實作通常假設GC的存在) ,并以這種方式直接針對 Crossbeam 對其進行編碼。
- 捕獲快照後,我們可以在不使用不安全的情況下解除對快照的引用,因為
保證了快照一定沒被釋放。Guard
- 在這裡使用
是合理的,因為我們使用了比較并交換來確定隻有一個線程調用它,而且 epoch 回收方案不運作析構函數,而隻是釋放記憶體。ptr::read
關于釋放的最後一點需要更多的注釋,是以讓我們通過讨論垃圾來總結 API 描述。
管理垃圾
Crossbeam 中的設計将epoch管理視為所有資料結構共享的服務: 存在一個全局的唯一的的epoch 狀态,對于每個線程狀态有一個本地狀态。這使得 epoch API 的使用非常簡單,因為沒有每個資料結構的設定。這還意味着(相當簡單的)空間使用量與使用 epoch 的線程數量相關,而不是與資料結構的數量相關。
Crossbeam 的實作與現有的 epoch 文獻的一個不同之處在于,每個線程都儲存本地垃圾清單。也就是說,當一個線程将一個節點标記為“ unlinked”時,該節點被添加到一些線程本地資料中,而不是立即添加到全局垃圾清單中(這将需要額外的同步)。
每次調用
epoch::pin()
時,目前線程将檢查其本地垃圾是否超過了收集門檻值,如果超過,則将嘗試進行收集。同樣,每當調用
epoch::pin()
時,如果全局 epoch 已超過前一個快照,則目前線程可以收集它的一些垃圾。除了避免在垃圾清單周圍進行全局同步之外,這個新方案還将實際釋放記憶體的工作分散到通路資料結構的所有線程中。
因為隻有當所有活動線程都在目前epoch上時,GC 才會發生,是以不可能總是收集。但實際上,給定線程上的垃圾很少超過門檻值。
但是有一個問題: 因為 GC 可能會失敗,如果一個線程正在退出,那麼它需要處理它的垃圾。是以 Crossbeam 實作還具有全局垃圾清單,當線程退出時,這些垃圾清單用作最後的抛垃圾的地方。這些全局垃圾清單由成功遞增全局epoch的線程收集。
最後,“收集”垃圾意味着什麼?如上所述,庫僅釋放記憶體; 它不運作析構函數。
從概念上講,架構将對象的破壞分為兩部分: 銷毀 / 移出内部資料,以及釋放包含該資料的對象。前者應該與調用 unlinked 同時發生——也就是說,除了實際釋放對象的能力之外,還有一個唯一的線程在每個意義上擁有對象。後者發生在某個未知的後續點,當已知的對象不再被引用時。這确實對使用者施加了一種義務: 通過快照通路應該隻讀取在釋放之前有效的資料。但是,無鎖資料結構基本上都是這樣,它傾向于在與容器相關的資料(即原子字段)和包含的實際資料(如 Node 中的資料字段)之間有一個明确的分隔。
以這種方式将對象拆分意味着在可預測的時間内以同步方式運作析構函數,減輕了 GC 的痛苦之一,并允許将架構用于非
'static
(和非
Send
)資料。
展望
橫梁仍處于起步階段。這裡的工作為探索 Rust 中大範圍的無鎖資料結構奠定了基礎,我希望 Crossbeam 最終發揮類似于 java.util.concurrent for Rust 的作用——包括無鎖 hashmap、竊取工作的 deques 和輕量級任務引擎。如果你對這項工作感興趣,我很樂意幫忙!
我的一些心得
我在嘗試編寫lock-free stack的初期,并沒有接觸crossbeam,隻是在碰到ABA問題,無法解決以後才尋求crossbeam的支援. 是以我實作的無鎖棧和文中的有一些差別,但是一緻的.下面附上我的實作.
use std::sync::atomic::Ordering;
extern crate crossbeam_epoch as epoch;
use epoch::{Atomic, Owned, Shared};
pub struct Node {
value: i32,
next: *const Node,
}
pub struct LockFreeStack {
next: Atomic<Node>,
}
unsafe impl Sync for LockFreeStack {}
unsafe impl Send for LockFreeStack {}
impl LockFreeStack {
pub fn new() -> LockFreeStack {
LockFreeStack {
next: Atomic::null(),
}
}
pub fn push(&self, v: i32) {
let guard = epoch::pin();
let mut new = Owned::new(Node {
value: v,
next: std::ptr::null(),
});
loop {
let old = self.next.load(Ordering::Relaxed,&guard);
new.next=old.as_raw();
match self.next.compare_and_set(old, new, Ordering::Release,&guard){
Ok(_)=>break,
Err(e)=>{
new=e.new;
// spin_loop_hint();
},
};
}
}
pub fn pop(&self) -> Option<i32> {
let guard = epoch::pin();
loop {
let old = self.next.load(std::sync::atomic::Ordering::Acquire,&guard);
/*
按照as_ref的文檔說明,old的load不能是Relaxed,隻要確定old的寫的另一方是Release,就是安全的
我們這裡無論是Push還是Pop對于old的set通路用得都是Release,是以是安全的.
*/
match unsafe{old.as_ref()}{
None=>return None,
Some(old2)=>{
let next=old2.next;
if self.next.compare_and_set(old, Shared::from(next),Ordering::Release,&guard).is_ok(){
unsafe {
/*
按照defer_destroy文檔,隻要我們保證old不會再其他線程使用就是安全的
而我們非常确信,這個old不會被其他線程使用,
因為這裡是defer_destroy,是以解決了ABA問題 (https://en.wikipedia.org/wiki/ABA_problem)
*/
guard.defer_destroy(old);
}
return Some(old2.value);
}
spin_loop_hint();
}
}
}
}
}
impl Drop for LockFreeStack {
/*
因為&mut self保證了不會有其他人同時操作這個Stack,是以可以放心的一個一個移除即可.
*/
fn drop(&mut self) {
let guard = epoch::pin();
let mut next = self.next.load(Ordering::Relaxed,&guard).as_raw() as *mut Node;
while !next.is_null() {
/*
這裡的next确定是通過Owned配置設定的,是以沒有安全問題
并且drop持有的是mutable stack,隻有一個線程可以通路,是以也沒有并發問題.
*/
let n = unsafe { Owned::from_raw(next) };
// println!("drop {}", n.value);
next = n.next as *mut Node ;
}
}
}