優先隊列
集合性質的資料類型離不開插入删除這兩操作,主要差別就在于删除的時候删哪個,像棧删最晚插入的,隊列删最早插入的,随機隊列就随便删,而優先隊列删除目前集合裡最大(或最小)的元素。優先隊列有很多應用,舉幾個見過的像:資料壓縮的哈夫曼編碼、圖搜尋中的 Dijkstra 算法和 Prim 算法、人工智能裡的 A* 算法等,優先隊列是這些算法的重要組成部分。
API and elementary implementations

先來個簡單的 API,應用例子是從 N 個輸入裡找到前 M 個大的元素,其中 N 的數目很大。因為 N 很大,沒有足夠的空間來存儲,是以不能全部排序後輸出前 M 個。得用優先隊列,保留 M 個,插入超過 M 就删除最小的。
像棧和隊列那樣的初級優先隊列實作,内部存儲又可以分為有序和無序兩種,有序的插入操作是 N 級别,删除是常數級别,而無序相反,是以在時間上也不盡人意。下節介紹的二叉堆實作都是 NlgM 級别,這裡先蠻貼一個初級實作。
public class UnorderedMaxPQ<Key extends Comparable<Key>> {
private Key[] pq; // pq[i] = ith element on pq
private int N; // number of elements on pq
public UnorderedMaxPQ(int capacity) {
pq = (Key[]) new Comparable[capacity];
}
public boolean isEmpty() {
return N == 0;
}
public void insert(Key x) {
pq[N++] = x;
}
public Key delMax() {
int max = 0;
for (int i = 0; i < N; i++)
if (less(max, i)) max = i;
exch(max, N-1);
return pq[--N];
}
}
binary heaps
二叉樹就是節點最多有兩個孩子的樹,其中完全二叉樹除了底層可能右邊空外都是滿的,二叉堆是用數組表示的符合堆順序(heap-ordered)的完全二叉樹,例:
堆順序大概就是指父母節點的值不比孩子小,然後一層一層從左到右放數組裡。另外,數組從下标 1 開始,這樣就可以很友善的在堆裡上下移動:下标為 k 的節點的父母下标為 k/2,孩子是 2k 和 2k+1,完全不需要其它顯式的連接配接。
二叉堆實作的優先隊列,内部存儲用的是數組,插入和删除都和數組最後一個元素打交道,比較友善:插入先直接插入末尾,删除也是把 a[1] 先和末尾交換再直接删除末尾。于是乎,我們需要上浮(swim)和下沉(sink)操作,來恢複插入删除中被破壞的堆順序。
swim
private void swim(int k) {
while (k > 1 && less(k/2, k)) {
exch(k, k/2);
k = k/2;
}
}
當孩子節點的值比父母大時,為了維護堆順序,這個值該上浮,于是和父母的值交換,繼續這一過程浮到合适的位置。是以,把元素插入到末尾之後,就給它來個上浮,安排到合适位置。
public void insert(Key x) {
pq[++N] = x;
swim(N);
}
sink
private void sink(int k) {
while (2*k <= N) {
int j = 2*k;
if (j < N && less(j, j+1)) j++;
if (!less(k, j)) break;
exch(k, j);
k = j;
}
}
當父母節點的值比孩子小時,這個節點該下沉來維護堆順序,選兩個孩子(如果有兩)中較大的交換值,繼續這一過程直到沉到合适的位置。于是,當删除元素時,把 a[1] 和末尾交換後删除末尾,現在的 a[1] 用下沉來找到該有的位置。
public Key delMax() {
Key max = pq[1];
exch(1, N--);
sink(1);
pq[N+1] = null;
return max;
}
因為有 N 個點的完全二叉樹的高度為 lgN 取下整(高度隻有在節點數為 2 的幂時才會加一),是以上浮和下沉的複雜度都是 lgN 級别。
圖例:
代碼:
public class MaxPQ<Key extends Comparable<Key>> {
private Key[] pq;
private int N;
public MaxPQ(int capacity) {
pq = (key[]) new Comparable[capacity+1];
}
public boolean isEmpty() {
return N == 0;
}
public void insert(Key key)
public Key delMax() {
/* see previous code */
}
private void swim(int k)
private void sink(int k) {
/* see previous code */
}
private boolean less(int i, int j) {
return pq[i].compareTo(pq[j]) < 0;
}
private void exch(int i, int j) {
Key t = pq[i];
pq[i] = pq[j];
pq[j] = t;
}
}
此外,還有一些可以考慮的,如把鍵設為不可變的(immutable),當它們在優先隊列裡的時候,用戶端程式不能改變它們;内部使用變長數組;改成删除最小元素;支援随機删除和改變優先級(這個後面好像會有)等。
關于不可變多說了點,Java 裡用關鍵字 final 就好,像 String、Integer、Double、Vector 等都是不可變的,而 StringBUilder、Stack、Java array 等是可變的。不可變資料建立之後就不能改變,這有很多好處:友善 debug,利于防範惡意代碼,可以放心地作為優先隊列或符号表的鍵等,雖然每個值都要建立,但還是利大于弊。反正,了解下啦。
heapsort
如果把數組塞進優先隊列,再一個個删掉,那實際上按删掉的順序就對數組排了個序,是以又有種全新的排序算法,叫做堆排序。這個算法自然地分成兩步:先把數組調整成符合堆順序,即構造堆,然後每次把 a[1] 和 a[N--] 交換,當 N 減到 1 時也就排好了序。另外,數組下标從 0 開始,而堆是從 1 開始,需要一定轉換,下面先假設待排數組也是從 1 開始,友善說明。
heap construction
對有孩子的節點用下沉操作,自底向上地構造堆。完全二叉樹有孩子的節點,從 N/2 開始到 1,隻要周遊一半數組。
for (int k = N/2; k >= 1; k--)
sink(a, k, N);
sortdown
有了堆,那排序比優先隊列出隊還簡單:
while (N > 1) {
exch(a, 1, N--);
sink(a, 1, N);
}
構造和排序的圖例:
public class Heap {
public static void sort(Comparable[] pq) {
int n = pq.length;
for (int k = n/2; k >= 1; k--)
sink(pq, k, n);
while (n > 1) {
exch(pq, 1, n--);
sink(pq, 1, n);
}
}
public static void sink(Comparable[] pq, int k, int n) {
while (2*k <= n) {
int j = 2*k;
if (j < n && less(pq, j, j+1)) j++;
if (!less(pq, k, j)) break;
exch(pq, k, j);
k = j;
}
}
private static boolean less(Comparable[] pq, int i, int j) {
return pq[i-1].compareTo(pq[j-1]) < 0;
}
private static void exch(Object[] pq, int i, int j) {
Object swap = pq[i-1];
pq[i-1] = pq[j-1];
pq[j-1] = swap;
}
}
可以看到,數組下标的轉換也就是方法 less() 和 exch() 裡索引減一。
再來張排序軌迹示例:
一開始調整堆順序看不出什麼,後面就有點像選擇排序了,每次找出最大的放尾巴,但是需要的比較次數少得多。
性能分析
構造堆的時候,複雜度甚至是 N 級别的,需要的交換次數少于 N,于是需要的比較的次數少于 2N(兩個孩子的話多比較一次孩子大小),可以這樣了解:
首先,n 個點的二叉堆有 n-1 條連結,因為除了根節點每個點都有條連結指向其父節點。然後,我們從點出發,按左-右-右-右-... 的順序配置設定這些連結,這樣連結隻會屬于一個點,甚至根左邊那條還沒點管,看上圖感受下。最後,點下沉需要最多的交換次數等于屬于它的連結數,是以構造時總共需要的交換次數不會超過 n。
來自 booksite 練習的最後一題。
20.Prove that sink-based heap construction uses at most 2n compares and at most n exchanges.
至于後面的排序,要周遊數組,下沉最多是樹高為 lgN,顯然是 NlgN 級别。于是乎,好像有了個很厲害的算法,不像歸并排序需要額外的空間,不像快排最壞情況不能保證 NlgN 的性能,但現代系統的很多應用并不使用堆排序,因為它無法有效地利用緩存(cache)。數組元素很少和相鄰的其它元素進行比較,是以緩存未命中的次數要遠遠高于大多數比較都在相鄰元素間進行的算法,如快排、歸并排序,甚至是希爾排序。另外,堆排序也不是穩定的。
sorting algorithms summary
event-driven simulation
介紹了一個借助優先隊列實作的剛性球體碰撞的模拟系統,剛性球體模型有以下特點:
- N 個運動的粒子,限制在單元格裡。
- 涉及的碰撞是彈性的,沒有能量損失。
- 每個粒子是已知位置、速度、品質和半徑的球體。
- 不存在其他外力,是以粒子碰撞前做勻速直線運動。
是個理想模型,在既與宏觀現象(溫度、壓力)有關又和微觀現象(單個原子和分子的運動)有關的統計力學中十分重要。
用計算機模拟符合這個模型的碰撞系統,其實也就是要知道每個時刻所有粒子的位置和速度,自然地想到時間驅動的解決政策:給定時間 t 時所有粒子的位置和速度,借此算出 經過時間 dt 後粒子的位置,檢查是否有發生碰撞,有就回退到碰撞的時間并考慮碰撞的影響。但這樣的花費太大,每個粒子檢查是否有碰撞就達到了 \(N^{2}\) 級别。另外,dt 也不好把握,太大會錯過很多碰撞,太小計算成本太高。于是乎,我們來考慮另外一種事件驅動的政策。
事件是未來某個時間的一次潛在碰撞,關聯的優先級就是發生的時間,可以用一個優先隊列來記錄所有事件,快速擷取下一次潛在的碰撞。碰撞預測需要些高中實體知識(繼重造線代後居然是實體嗎),首先,粒子和牆的碰撞相對簡單:
這裡的單元格是邊長為 1 的正方形,和其它牆的碰撞類似,不提。
粒子和粒子間的碰撞事件的發生時間比較複雜(這裡不考慮多個粒子同時碰撞的情況):
粒子 i 和 j 經過 \(\Delta t\) 時間發生碰撞,記 \(\sigma\) = \(\sigma_{i}\) + \(\sigma_{j}\),則有:
\(\sigma^{2}\) = \((rx_{i}'-rx_{j}')^{2}\) + \((ry_{i}'-ry_{j}')^{2}\)
又因為:
\(rx_{i}'\) = \(rx_{i}\) + \(\Delta t vx_{i}\), \(ry_{i}'\) = \(ry_{i}\) + \(\Delta t vy_{i}\)
\(rx_{j}'\) = \(rx_{j}\) + \(\Delta t vx_{j}\), \(ry_{j}'\) = \(ry_{j}\) + \(\Delta t vy_{j}\)
帶入解 \(\Delta t\) 的二進制方程得:
其中:
\(d = (\Delta v \cdot \Delta r)^{2} - (\Delta v \cdot \Delta v)(\Delta r \cdot \Delta r - \sigma^{2})\)
\(\Delta r = (\Delta rx, \Delta ry) = (rx_{j} - rx_{i}, ry_{i} - ry_{i})\)
\(\Delta v = (\Delta vx, \Delta vy) = (vx_{j} - vx_{i}, vy_{i} - vy_{i})\)
\(\Delta r \cdot \Delta r = (\Delta rx)^{2} + (\Delta ry)^{2}\)
\(\Delta v \cdot \Delta v = (\Delta vx)^{2} + (\Delta vy)^{2}\)
\(\Delta v \cdot \Delta r = (\Delta vx)(\Delta rx) + (\Delta vy)(\Delta ry)\)
然後,還有一件事,碰撞之後的速度問題,愣是沒怎麼懂。一般不是動量守恒加機械能守恒(碰撞前後總動能不變那個)聯立算碰撞之後的速度,課程大概是把原始式子推來推去?最後用沖量來算也:
There are three equations governing the elastic collision between a pair of hard discs: (i) conservation of linear momentum, (ii) conservation of kinetic energy, and (iii) upon collision, the normal force acts perpendicular to the surface at the collision point. Physics-ly inclined students are encouraged to derive the equations from first principles; the rest of you may keep reading.
Between two particles. When two hard discs collide, the normal force acts along the line connecting their centers (assuming no friction or spin). The impulse (Jx, Jy) due to the normal force in the x and y directions of a perfectly elastic collision at the moment of contact is:
\(J_{x} = \frac{J\Delta rx}{\sigma}\),\(J_{y} = \frac{J\Delta ry}{\sigma}\) where \(J = \frac{2m_{i}m_{j}(\Delta v\cdot\Delta r)}{\sigma (m_{i} + m_{j})}\)
and where mi and mj are the masses of particles i and j, and σ, Δx, Δy and Δv ⋅ Δr are defined as above. Once we know the impulse, we can apply Newton's second law (in momentum form) to compute the velocities immediately after the collision.
\(vx_{i}' = vx_{i} + J_{x}/m_{i}\), \(vx_{j}' = vx_{j} - J_{x}/m_{j}\)
\(vy_{i}' = vy_{i} + J_{y}/m_{i}\), \(vy_{i}' = vy_{i} - J_{y}/m_{j}\)
關于沖量怎麼算的,這個連結(點我)好像有點靠譜,第一眼看上去挺像的,感覺不是重點,不管啦。另外,以上來自:booksite-61event。
現在,我們把上面讨論的預計碰撞時間和碰撞處理封裝到 Particle 類裡:
public class Particle {
private double rx, ry; // position
private double vx, vy; // velocity
private final double radius; // radius;
private final double mass; // mass
private int count; // number of collisions
public Particle(...) { }
public void move(double dt) { }
public void draw() { }
// predict collision with particle or wall
public double timeToHit(Particle that) { }
public double timeToHitVerticalWall() { }
public double timeToHitHorizontalWall() { }
// resolve collision with particle or wall
public void bounceOff(particle that) { }
public void bounceOffVeerticalWall() { }
public void bounceOffHorizontalWall() { }
}
貼下例子,完整的參見:Particle.java。
public double timeToHit(Particle that) {
if (this == that) return INFINITY;
double dx = that.rx - this.rx, dy = that.ry - this.ry;
double dvx = that.vx - this.vx, dvy = that.vy - this.vy;
double dvdr = dx*dvx + dy*dvy;
if (dvdr > 0) return INFINITY;
double dvdv = dvx*dvx + dvy*dvy;
double drdr = dx*dx + dy*dy;
double sigma = this.radius + that.radius;
double d = (dvdr*dvdr) - dvdv * (drdr - sigma*sigma);
if (d < 0) return INFINITY;
return -(dvdr + Math.sqrt(d)) / dvdv;
}
public void bounceOff(Particle that) {
double dx = that.rx - this.rx, dy = that.ry - this.ry;
double dvx = that.vx - this.vx, dvy = that.vy - this.vy;
double dvdr = dx*dvx + dy*dvy;
double dist = this.radius + that.radius;
double J = 2 * this.mass * that.mass * dvdr / ((this.mass + that.mass) * dist);
double Jx = J * dx / dist;
double Jy = J * dy / dist;
this.vx += Jx / this.mass;
this.vy += Jy / this.mass;
that.vx -= Jx / that.mass;
that.vy -= Jy / that.mass;
this.count++;
that.count++;
}
變量 count 記錄粒子可能的碰撞次數,會被用于判斷一個事件是否有效,像球 A 和球 B 本來會碰撞,但先和球 C 碰了的話,前者就無效了。具體怎麼用,再來看看事件,我們把應該放入優先隊列中的所有對象資訊封裝在一個私有類中(各種事件):
private class Event implements Comparable<Event> {
private double time; // time of event
private Particle a, b; // particles involved in event
private int countA, countB; // collision counts for a and b
public Event(double t, Particle a, Particle b) { }
public int compareTo(Event that) {
return this.time - that.time;
}
public boolean isValid() {
if (a != null && a.count() != countA) return false;
if (b != null && b.count() != countB) return false;
return true;
}
}
如果事件從優先隊列裡取出來時變量 countA 和 countB 沒有變化,說明 A B 粒子此前沒有發生碰撞,事件仍是有效的;反之事件就是無效的,該丢棄取下一個。
有了上面這些鋪墊,模拟的邏輯就很簡單:從優先隊列裡取出最近的有效碰撞事件,按預計時間更新所有粒子的位置,更新參與碰撞的粒子的速度,預計參與碰撞粒子接下來可能發生的碰撞事件并插入優先隊列,繼續取有效碰撞,循環下去:
public class CollisionSystem {
private MinPQ<Event> pq; // the priority queue
private double t = 0.0; // simulation clock time
private Particle[] particles; // the array of particles
public CollisionSystem(Pariticle[] particles) { }
private void predict(Particle a) {
if (a == null) return;
for (int i = 0; i < particles.length; i++) {
double dt = a.timeToHit(particles[i]);
pq.insert(new Event(t + dt, a, particles[i]));
}
pq.insert(new Event(t + a.timeToHitVerticalWall(), a, null));
pq.insert(new Event(t + a.timeToHitHorizontalWall(), null, a));
}
public void simulate() {
pq = new MinPQ<Event>();
for (int i = 0; i < particle.lenght; i++)
predict(particles[i]);
pq.insert(new Event(0, null, null));
}
while (!pq.isEmpty()) {
Event event = pq.delMin();
if (!event.isValid()) continue;
Particle a = event.a;
Particle b = event.b;
for (int i = 0; i < particles.length; i++)
particles[i].move(event.time - t);
t = event.time;
if (a != null && b != null) a.bounceOff(b);
else if (a != null && b == null) a.bounceOffVerticalWall();
else if (a == null && b != null) b.bounceOffHorizontalWall();
else if (a == null && b == null) redraw();
predict(a);
predict(b);
}
}
完整的參見:CollisionSystem.java。
最後,貼些效果圖,也是很有趣啊,輸入檔案在 booksite-61event 上都有。
模拟花粉的布朗運動:java CollisionSystem < brownian2.txt
模拟擴散:java CollisionSystem < diffusion.txt