天天看點

處理大規模資料計算任務,Fork/Join架構是您的得力助手!

作者:你的老師父

1. JUC包概述

JUC是Java Util Concurrency的縮寫,即Java并發工具包。JUC包提供了一些常用的線程和并發程式設計工具類,幫助開發者更友善地開發多線程應用程式,提高程式的并發性能。JUC包的主要特點包括:

  • 安全性:JUC包提供了一些線程安全的資料結構和工具類,如原子類、同步隊列等,可以保證多線程通路時資料的正确性和一緻性。
  • 性能:JUC包中的一些并發工具類采用了高效的算法和資料結構,如CAS算法、樂觀鎖等,可以提高程式的并發性能。
  • 可擴充性:JUC包中的一些工具類支援可擴充性,如線程池、ForkJoin架構等,可以根據實際情況進行動态調整。

2. 原子操作類

在多線程環境下,由于多個線程同時通路同一個變量可能會導緻資料不一緻的問題。原子操作類可以保證在多線程環境下對變量的操作是原子性的,即不會出現線程安全問題。

JJUC包中提供了以下幾個原子操作類:

  • AtomicInteger:用于對int類型的變量進行原子操作。
  • AtomicLong:用于對long類型的變量進行原子操作。
  • AtomicBoolean:用于對boolean類型的變量進行原子操作。
  • AtomicIntegerArray:用于對int數組中的元素進行原子操作。
  • AtomicLongArray:用于對long數組中的元素進行原子操作。
  • AtomicReference:用于對引用類型的變量進行原子操作。
  • AtomicStampedReference:用于對引用類型的變量進行原子操作,并能夠檢測變量是否被修改過。
  • AtomicIntegerFieldUpdater:用于對某個對象中的int類型字段進行原子操作。
  • AtomicLongFieldUpdater:用于對某個對象中的long類型字段進行原子操作。
  • AtomicReferenceFieldUpdater:用于對某個對象中的引用類型字段進行原子操作。

這些原子操作類都提供了一系列的方法,如get、set、addAndGet、compareAndSet等,可以實作對變量的原子操作。值得注意的是,使用原子操作類并不能解決所有的線程安全問題,需要根據具體情況進行判斷和選擇。

2.1 AtomicInteger

AtomicInteger用于對int類型的變量進行原子操作。

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerDemo {
    private static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    count.getAndIncrement();
                }
            }).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Count: " + count.get());
    }
}
           

2.2 AtomicLong

AtomicLong用于對long類型的變量進行原子操作。

import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongDemo {
    private static AtomicLong count = new AtomicLong(0);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    count.getAndIncrement();
                }
            }).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Count: " + count.get());
    }
}
           

2.3 AtomicBoolean

AtomicBoolean用于對boolean類型的變量進行原子操作。

import java.util.concurrent.atomic.AtomicBoolean;

public class AtomicBooleanDemo {
    private static AtomicBoolean flag = new AtomicBoolean(true);

    public static void main(String[] args) {
        new Thread(() -> {
            while (flag.get()) {
                System.out.println("Running...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag.set(false);
        System.out.println("Stopped.");
    }
}
           

2.4 AtomicIntegerArray

AtomicIntegerArray用于對int數組中的元素進行原子操作。

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayDemo {
    private static AtomicIntegerArray arr = new AtomicIntegerArray(new int[]{0, 0});

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    arr.getAndIncrement(j % 2);
                }
            }).start();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedExceptione) {
       			e.printStackTrace();
        }
        System.out.println("Array: " + arr);
    }
}

           

2.5 AtomicReference

AtomicReference用于對引用類型的變量進行原子操作。

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceDemo {
    static class Person {
        String name;
        int age;

        public Person(String name, int age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return "Person{name='" + name + "', age=" + age + "}";
        }
    }

    private static AtomicReference<Person> personRef = new AtomicReference<>(new Person("Tom", 18));

    public static void main(String[] args) {
        Person oldPerson = personRef.get();
        Person newPerson = new Person("Jerry", 20);
        if (personRef.compareAndSet(oldPerson, newPerson)) {
            System.out.println("Update success, old value: " + oldPerson + ", new value: " + newPerson);
        } else {
            System.out.println("Update failed.");
        }
        System.out.println("Person: " + personRef.get());
    }
}           

3. 同步隊列類

同步隊列類是一種特殊的隊列,它可以在多線程環境下實作資料的生産和消費過程的同步。JUC包中提供了以下幾個同步隊列類:

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue:一個由連結清單結構組成的有界(但大小預設值為Integer.MAX_VALUE)阻塞隊列。
  • PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等待另一個線程的移除操作,否則插入操作會一直阻塞。

這些同步隊列類提供了一系列的方法,如put、take、offer、poll等,可以實作對隊列的操作。同步隊列類還提供了一些擴充方法,如drainTo、peek等。

同步隊列類的特點在于它們可以實作生産者-消費者模式。多個線程可以同時往隊列中添加元素或者同時從隊列中取出元素,當隊列為空或者已滿時,線程會被阻塞,直到有其他線程進行相應的操作。這種機制可以有效地控制線程間的同步和協作,避免了線程間的競争和死鎖問題。

使用同步隊列類時需要注意以下幾點:

  • 隊列大小:由于同步隊列類是有界的,是以需要根據實際情況來設定隊列的大小。
  • 隊列類型:不同的同步隊列類适用于不同的場景,需要根據具體情況進行選擇。

3.1 ArrayBlockingQueue

ArrayBlockingQueue是一個有界隊列,它的容量是固定的。當隊列已滿時,添加元素的線程會被阻塞,直到有其他線程取出元素後才能繼續添加。

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueDemo {
    private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    queue.put(i);
                    System.out.println("Producer: " + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                try {
                    Integer value = queue.take();
                    System.out.println("Consumer: " + value);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
           

3.2 LinkedBlockingQueue

LinkedBlockingQueue是一個無界隊列,它的容量是不限制的。當隊列為空時,取出元素的線程會被阻塞,直到有其他線程添加元素後才能繼續取出。

import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueDemo {
    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    queue.put(i);
                    System.out.println("Producer: " + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                try {
                    Integer value = queue.take();
                    System.out.println("Consumer: " + value);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
           

3.3 SynchronousQueue

SynchronousQueue是一個沒有緩沖的隊列,它的每個插入操作必須等待另一個線程執行相應的删除操作,反之亦然。當隊列中有一個元素時,插入操作會被阻塞,直到有其他線程取出元素後才能繼續插入。

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueDemo {
    private static SynchronousQueue queue = new SynchronousQueue<>();
    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    System.out.println("Producer: " + i);
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                try {
                    Integer value = queue.take();
                    System.out.println("Consumer: " + value);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}           

4. CountDownLatch類

CountDownLatch是一種同步工具類,它可以使一個或多個線程等待另一組線程完成操作後再繼續執行。CountDownLatch的作用類似于“計數器”,在初始化時設定一個計數值,每當一個線程完成任務後就将計數值減1,當計數值變為0時,等待線程就會被喚醒。

CountDownLatch類提供了兩個主要方法:

  • countDown:将計數值減1。
  • await:等待計數值變為0。

使用CountDownLatch可以很友善地實作線程間的協作和同步,尤其适用于某些場景下需要等待多個線程都完成某項任務後才能進行下一步操作的情況。

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    private static CountDownLatch latch = new CountDownLatch(3);

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("Thread A finished.");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Thread B finished.");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("Thread C finished.");
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            latch.await();
            System.out.println("All threads finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}           

5. CyclicBarrier類

CyclicBarrier也是一種同步工具類,它可以讓一組線程在到達某個屏障點之前互相等待,然後同時執行某個操作。CyclicBarrier的作用類似于“栅欄”,在初始化時設定一個屏障點,每當一個線程到達屏障點時就會被阻塞,直到所有線程都到達屏障點後才會繼續執行。

CyclicBarrier類提供了兩個主要方法:

  • await:讓目前線程到達屏障點,并等待其他線程到達。
  • reset:重置屏障點的計數器。

使用CyclicBarrier可以很友善地實作一組線程的同步和協作,尤其适用于某些場景下需要多個線程同時開始執行某項任務的情況。

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    private static CyclicBarrier barrier = new CyclicBarrier(3, () -> {
        System.out.println("All threads arrived at the barrier.");
    });

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("Thread A arrived at the barrier.");
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Thread B arrived at the barrier.");
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("Thread C arrived at the barrier.");
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
           

6. Semaphore類

信号量是一種經典的并發程式設計工具,它可以用來限制同時通路某個資源的線程數量。JUC包中提供了以下幾個信号量類:

  • Semaphore:用于控制通路某個共享資源的線程數量。
  • CountingSemaphore:是Semaphore的一個變體,可以限制通路某個共享資源的線程數量,并且支援語義上的“計數”。
  • ReentrantLock:是一個可重入的互斥鎖,它可以對共享資源進行通路控制,進而保證多線程間對共享資源的安全通路。

這些信号量類提供了一系列的方法,如acquire、release、tryAcquire等,可以實作對信号量的操作。使用信号量類可以有效地控制線程的并發通路,進而避免競争和死鎖問題。

Semaphore是一個同步工具類,用于控制對公共資源的通路。它通過計數器來實作對資源的通路控制,可以控制同時通路某個資源的線程數量。

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    private static Semaphore semaphore = new Semaphore(2);

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " acquired the semaphore.");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " released the semaphore.");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
           

7. Exchanger類

Exchanger是一種同步工具類,它可以使兩個線程之間交換資料。Exchanger的作用類似于“交換機”,兩個線程分别調用Exchanger對象的exchange方法,将各自持有的資料傳遞給對方,然後繼續執行。

Exchanger類提供了一個exchange方法,可以實作兩個線程之間的資料交換。使用Exchanger可以很友善地實作資料在不同線程之間的傳遞和同步,尤其适用于某些場景下需要進行線程間資料互動的情況。

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    private static Exchanger<String> exchanger = new Exchanger<>();
    public static void main(String[] args) {
        new Thread(() -> {
            try {
                String data = "Hello World";
                System.out.println("Thread A: before exchange, data = " + data);
                data = exchanger.exchange(data);
                System.out.println("Thread A: after exchange, data = " + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                String data = "123456789";
                System.out.println("Thread B: before exchange, data = " + data);
                data = exchanger.exchange(data);
                System.out.println("Thread B: after exchange, data = " + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}           

8. CompletableFuture類

CompletableFuture是Java8中新增的一個并發工具類,它可以以異步的方式執行任務,并支援任務之間的組合和串聯操作。CompletableFuture類的主要特點包括:

  • 異步執行:可以在新的線程中異步執行任務。
  • 鍊式調用:支援任務之間的鍊式調用,進而實作多個任務的組合和串聯操作。
  • 回調機制:可以通過回調機制來處理任務執行的結果。

CompletableFuture類提供了一系列的方法,如supplyAsync、thenApply、thenAccept、thenCompose等,可以實作對任務的異步執行、組合和串聯操作。使用CompletableFuture可以很友善地實作高效、簡潔的異步程式設計方式。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 1 is running.");
            return "Result 1";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task 2 is running.");
            return "Result 2";
        });

        CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            System.out.println("Task 3 is running.");
            System.out.println("result1: " + result1);
            System.out.println("result2: " + result2);
            return result1.length() + result2.length();
        });

        System.out.println("Combined result: " + combinedFuture.get());
    }
}
           

9. Fork/Join架構

ForkJoin架構是JDK7中引入的一個并行計算架構,它可以将一個大型任務劃分為若幹個小任務并行執行,然後将各個小任務的結果彙總得到最終結果。ForkJoin架構的主要特點包括:

  • 任務分解:可以将一個大型任務劃分為若幹個小任務并行執行。
  • 工作竊取:每個線程都有自己的任務隊列,當空閑時會“竊取”其他線程任務隊列中的任務進行執行,進而提高計算效率。
  • 可擴充性:可以根據實際情況動态增加或減少線程數。

ForkJoin架構通過ForkJoinPool類來管理線程池和任務排程。使用ForkJoin架構可以很友善地實作高效、簡潔的并行計算代碼。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo {
    static class Fibonacci extends RecursiveTask<Integer> {
        final int n;

        Fibonacci(int n) {
            this.n = n;
        }

        protected Integer compute() {
            if (n <= 1)
                return n;
            Fibonacci f1 = new Fibonacci(n - 1);
            f1.fork();
            Fibonacci f2 = new Fibonacci(n - 2);
            return f2.compute() + f1.join();
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        Fibonacci task = new Fibonacci(10);
        int result = pool.invoke(task);
        System.out.println(result);
    }
}
           

10. 總結

Java并發程式設計是一門非常重要的技術,在面對大規模并發處理、高性能計算、分布式系統和雲計算等領域時,它扮演着至關重要的角色。本文介紹了Java并發程式設計中常用的幾種并發工具類和架構,包括線程池、鎖、原子類、同步隊列、同步工具類、CompletableFuture和Fork/Join架構等,并提供了簡單的示例代碼,希望可以為讀者在實踐中應用并發程式設計提供一些參考和啟示。

繼續閱讀