Java進階:並行程式設計基礎
1. 引言
隨著多核處理器的普及和大數據處理需求的增加,有效利用系統資源以提高程式效能變得越來越重要,Java提供了豐富的並行程式設計工具和 API,使開發者能夠輕鬆地實現高效能的並行應用程式。
2. 並行程式設計的基本概念
在深入探討Java並行程式設計之前,我們需要先了解一些基本概念。
2.1 並行vs並發
雖然「並行」(Parallelism)和「並發」(Concurrency)這兩個詞經常被混用,但實際上有著不同的含義:
- 並行: 指同時執行多個任務,通常在多核處理器上實現。
- 並發: 指在同一時間段內處理多個任務,但這些任務可能是交替執行的。
2.2 執行緒(Thread)和處理程序(Process)
- 處理程序: 是一個獨立的執行環境,擁有自己的記憶體空間。
- 執行緒: 是處理程序內的執行單元,共享同一處理程序的記憶體空間。
2.3 同步vs非同步
- 同步(Synchronous): 操作按順序執行,一個操作完成後才能開始下一個。
- 非同步(Asynchronous): 操作可以重疊執行,不需要等待前一個操作完成。
2.4 競爭條件(Race Condition)
當多個執行緒同時訪問共享資源,並且至少有一個執行緒試圖修改該資源時,可能會發生競爭條件。這可能導致不可預測的結果。
2.5 死鎖(Deadlock)
當兩個或多個執行緒互相等待對方釋放資源時,就會發生死鎖。這會導致程式無法繼續執行。
2.6 執行緒安全(Thread Safety)
一個類或方法如果在多執行緒環境下能夠正確工作,我們就說是執行緒安全的。實現執行緒安全通常需要使用同步機制。
2.7 可見性(Visibility)
在多執行緒環境中,一個執行緒對共享變數的修改可能對其他執行緒不可見。
Java提供了volatile關鍵字和同步機制來解決這個問題。
2.8 原子性(Atomicity)
一個操作是原子的,意味著是不可分割的。
在多執行緒環境中,非原子操作可能導致不一致的結果。
2.9 可擴展性(Scalability)
指系統處理增加的工作負載的能力。
良好的並行設計應該能夠隨著處理器核心數的增加而提高性能。
2.10 Amdahl定律
Amdahl定律描述程式的整體速度提升受限於其串行部分的比例。
Amdahl定律可以用以下公式表示:
速度提升 = 1 / (串行部分 + (並行部分 / 處理器數量))
串行部分:程式中無法並行化的部分所佔的比例 並行部分:程式中可以並行化的部分所佔的比例 處理器數量:用於並行計算的處理器數量
3. Java中的執行緒(Thread)和Runnable介面
在Java中,執行緒是並行程式設計的基本單位。
Java提供兩種主要的方式來創建和使用執行緒:通過繼承Thread類和實現Runnable介面。
3.1 使用Thread類
Java的Thread類是執行緒的核心類,要創建一個新的執行緒,我們可以繼承Thread類並重寫其run()方法。
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("執行緒正在運行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}
3.2 實現Runnable介面
另一種創建執行緒的方法是實現Runnable介面,由於 Java 不支援多重繼承,但允許實現多個介面,使用 Runnable 介面可以讓您的類別同時實現其他介面,增加了設計的靈活性。
將執行緒的任務(what to do)與執行緒的控制(how to run)分離,符合單一職責原則,並且多個執行緒可以共享同一個 Runnable 物件,這在某些情況下可以提高效率。
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable正在運行: " + Thread.currentThread().getName());
}
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
3.3 使用Lambda表達式
從Java 8開始,我們可以使用Lambda表達式來更簡潔地創建Runnable對象:
public class LambdaThread {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("Lambda執行緒正在運行: " + Thread.currentThread().getName());
});
thread.start();
}
}
3.4 執行緒的生命週期
Java執行緒有以下幾種狀態:
- NEW: 執行緒被創建,但還沒有啟動。
- RUNNABLE: 執行緒正在運行或準備運行。
- BLOCKED: 執行緒被阻塞,等待監視器鎖。
- WAITING: 執行緒無限期等待另一個執行緒執行特定操作。
- TIMED_WAITING: 執行緒等待另一個執行緒執行操作,最多等待指定的時間。
- TERMINATED: 執行緒已經結束執行。
我們可以使用Thread.getState()方法來獲取執行緒的當前狀態。
3.5 執行緒的基本操作
Java提供了一些方法來控制執行緒的行為:
- start(): 啟動執行緒。
- join(): 等待執行緒結束。
- sleep(long millis): 使執行緒休眠指定的毫秒數。
- interrupt(): 中斷執行緒。
- isAlive(): 檢查執行緒是否仍在運行。
public class ThreadOperations {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("執行緒被中斷");
}
});
thread.start();
System.out.println("執行緒已啟動");
thread.join(1000);
System.out.println("等待1秒後");
if (thread.isAlive()) {
System.out.println("執行緒仍在運行");
thread.interrupt();
}
}
}
3.6 執行緒優先級
Java允許我們設置執行緒的優先級,範圍從1(最低)到10(最高)。
但是,執行緒調度器的實際行為取決於底層操作系統,因此不應過度依賴優先級。
Thread thread = new Thread(() -> {
// 執行緒程式碼
});
thread.setPriority(Thread.MAX_PRIORITY); // 設置最高優先級
thread.start();
3.7 守護執行緒
守護執行緒是在背景運行的低優先級執行緒。
當所有非守護執行緒結束時,JVM會自動終止守護執行緒。
Thread daemonThread = new Thread(() -> {
while (true) {
System.out.println("守護執行緒正在運行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
});
daemonThread.setDaemon(true);
daemonThread.start();
4. 同步化機制和鎖(Lock)
在多執行緒環境中,當多個執行緒同時訪問共享資源時,可能會導致數據不一致或其他並發問題。
4.1 synchronized關鍵字
synchronized是Java中最基本的同步化機制,可以用於方法或程式碼塊。
4.1.1 同步方法
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
4.1.2 同步程式碼塊
public class Counter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized(lock) {
count++;
}
}
public int getCount() {
synchronized(lock) {
return count;
}
}
}
4.2 volatile關鍵字
volatile關鍵字用於確保變數的可見性,但不保證原子性。
public class SharedFlag {
private volatile boolean flag = false;
public void setFlag(boolean value) {
flag = value;
}
public boolean isFlag() {
return flag;
}
}
4.3 java.util.concurrent.locks包
Java 5引入了java.util.concurrent.locks包,提供了更靈活的鎖機制。
4.3.1 ReentrantLock
ReentrantLock提供了與synchronized相似的功能,但具有更多的靈活性。
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
4.3.2 ReadWriteLock
ReadWriteLock允許多個讀操作同時進行,但寫操作需要獨佔訪問。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Cache {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Map<String, String> cache = new HashMap<>();
public String get(String key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void put(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
}
4.4 Condition
Condition提供了比Object的wait()和notify()更強大的等待/通知機制。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> {
private final T[] items;
private int putIndex, takeIndex, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int capacity) {
items = (T[]) new Object[capacity];
}
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putIndex] = item;
if (++putIndex == items.length) putIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T item = items[takeIndex];
if (++takeIndex == items.length) takeIndex = 0;
--count;
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
4.5 原子變數
java.util.concurrent.atomic包提供一系列原子變數類,可以在不使用同步的情況下實現執行緒安全。
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
4.6 同步化的最佳實踐
- 盡量減少同步範圍,只在必要的程式碼塊上使用同步。
- 避免在同步塊中進行耗時操作,如I/O操作。
- 優先使用java.util.concurrent包中的並發工具,而不是低級的synchronized和volatile。
- 使用不可變對象可以避免許多並發問題。
- 注意避免死鎖,例如始終以固定的順序獲取多個鎖。
- 考慮使用執行緒本地變數(ThreadLocal)來避免共享狀態。
5. 執行緒池(Thread Pool)和Executor框架
在並行程式設計中,創建和管理大量執行緒可能會導致效能問題和資源浪費。
執行緒池和Executor框架提供了一種更有效的方式來管理和重用執行緒,從而提高應用程式的效能和可擴展性。
5.1 執行緒池的概念
執行緒池是一種執行緒管理模式,預先創建一定數量的執行緒,並將這些執行緒保存在池中。
當有任務需要執行時,從池中取出一個執行緒來執行任務,任務執行完畢後,執行緒被返回到池中等待下一個任務。
執行緒池的主要優點包括: 1. 減少執行緒創建和銷毀的開銷 2. 控制並發執行緒的數量 3. 提高系統的響應速度 4. 方便進行執行緒管理
5.2 Executor框架
Java的java.util.concurrent包提供了Executor框架,用於異步執行任務的框架。
Executor框架的核心接口包括:
- Executor: 最基本的接口,定義了execute方法用於提交任務。
- ExecutorService: 擴展了Executor接口,提供了管理執行緒生命週期的方法。
- ScheduledExecutorService: 擴展了ExecutorService,增加了定時執行任務的功能。
5.3 常用的ExecutorService實現
5.3.1 固定大小的執行緒池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
ExecutorService executor = Executors.newFixedThreadPool(5);
5.3.2 單執行緒的執行器
5.3.3 可快取的執行緒池
5.3.4 定時任務執行器
5.4 使用ExecutorService
5.4.1 提交任務
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交Runnable任務
executor.execute(() -> System.out.println("執行Runnable任務"));
// 提交Callable任務
Future<String> future = executor.submit(() -> "執行Callable任務");
// 獲取Callable任務的結果
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
5.4.2 關閉ExecutorService
5.5 使用ScheduledExecutorService
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 延遲5秒後執行
scheduler.schedule(() -> System.out.println("延遲任務"), 5, TimeUnit.SECONDS);
// 每3秒執行一次
scheduler.scheduleAtFixedRate(() -> System.out.println("固定頻率任務"), 0, 3, TimeUnit.SECONDS);
// 每次執行完畢後等待2秒再執行
scheduler.scheduleWithFixedDelay(() -> System.out.println("固定延遲任務"), 0, 2, TimeUnit.SECONDS);
5.6 自定義ThreadPoolExecutor
對於更複雜的場景,我們可以自定義ThreadPoolExecutor:
import java.util.concurrent.*;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心執行緒數
10, // 最大執行緒數
60L, // 空閒執行緒存活時間
TimeUnit.SECONDS, // 時間單位
new ArrayBlockingQueue<>(100), // 任務隊列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
5.7 使用CompletableFuture
Java 8引入的CompletableFuture提供異步寫程式碼能力:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);
future.thenAccept(System.out::println);
5.8 實踐建議
- 選擇適當的執行緒池大小,考慮CPU核心數和任務類型(I/O密集型或CPU密集型)。
- 使用合適的任務隊列大小,避免OOM錯誤。
- 正確處理異常,避免任務靜默失敗。
- 適時關閉執行器,避免資源洩漏。
- 考慮使用自定義的執行緒工廠來命名執行緒,便於調試。
- 對於長時間運行的應用,監控執行緒池的狀態。
6. 並行集合(Concurrent Collections)
在多執行緒環境中,普通的Java集合類可能會出現執行緒安全問題。
為了解決這個問題,Java提供專門用於並行環境的集合類,這些類位於java.util.concurrent包中。
6.1 ConcurrentHashMap
ConcurrentHashMap是HashMap的執行緒安全版本。
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.putIfAbsent("key", 2); // 如果key不存在才插入
map.computeIfAbsent("newKey", k -> k.length()); // 如果key不存在,計算並插入新值
ConcurrentHashMap的主要特點: - 支援高並發讀取和更新 - 不允許null鍵或值 - 迭代器是弱一致性的,不會拋出ConcurrentModificationException
6.2 CopyOnWriteArrayList
CopyOnWriteArrayList是ArrayList的執行緒安全變體,適用於讀多寫少的場景。
import java.util.concurrent.CopyOnWriteArrayList;
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.addIfAbsent("item2");
CopyOnWriteArrayList的特點: - 讀操作不需要加鎖,性能很高 - 寫操作會複製整個底層數組,適合讀多寫少的場景 - 迭代器支援遍歷,但不支援修改操作
6.3 ConcurrentLinkedQueue
ConcurrentLinkedQueue是基於鏈接節點的無界執行緒安全隊列。
import java.util.concurrent.ConcurrentLinkedQueue;
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("item1");
String item = queue.poll();
ConcurrentLinkedQueue的特點: - 適用於高並發環境 - 非阻塞算法,offer和poll操作都是O(1)複雜度
6.4 BlockingQueue接口
BlockingQueue接口定義了阻塞隊列的行為,Java提供了多種實現:
6.4.1 ArrayBlockingQueue
固定大小的阻塞隊列,基於數組實現。
import java.util.concurrent.ArrayBlockingQueue;
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
queue.put("item"); // 如果隊列滿了,會阻塞
String item = queue.take(); // 如果隊列空了,會阻塞
6.4.2 LinkedBlockingQueue
基於鏈表的可選有界阻塞隊列。
import java.util.concurrent.LinkedBlockingQueue;
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100); // 有界
LinkedBlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>(); // 無界
6.4.3 PriorityBlockingQueue
帶優先級的無界阻塞隊列。
import java.util.concurrent.PriorityBlockingQueue;
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(3);
queue.put(1);
queue.put(2);
System.out.println(queue.take()); // 輸出1
6.5 ConcurrentSkipListMap和ConcurrentSkipListSet
這兩個類分別是TreeMap和TreeSet的並發版本,基於跳表數據結構實現。
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
map.put("key", 1);
ConcurrentSkipListSet<Integer> set = new ConcurrentSkipListSet<>();
set.add(1);
這些類的特點: - 保持元素的排序 - 支援高效的並發訪問 - 適用於需要排序的並發場景
6.6 使用並行集合的實踐
- 選擇合適的集合類: 根據你的使用場景(讀多寫少、需要排序等)選擇合適的並行集合。
- 避免過度同步: 並行集合已經是執行緒安全的,不需要額外的同步。
- 注意迭代器的一致性: 並行集合的迭代器通常是弱一致性的,要注意可能看不到最新的修改。
- 合理設置初始容量: 對於有界集合,合理設置初始容量可以提高性能。
- 使用批量操作: 許多並行集合提供了批量添加或刪除操作,這些操作通常更高效。
- 注意內存使用: 某些並行集合(如CopyOnWriteArrayList)在修改時可能會消耗大量內存。
7. Fork/Join框架
Fork/Join框架是Java 7引入的一個用於並行執行任務的強大工具,特別適合用於"分而治之"的問題,即可以將大任務分解為多個小任務並行執行,然後將結果合併的問題。
7.1 Fork/Join框架的基本概念
Fork/Join框架的核心思想是: - Fork: 將大任務分割成更小的子任務 - Join: 等待所有子任務完成,並合併其結果
使用了工作竊取(work-stealing)算法,空閒的執行緒可以竊取其他執行緒隊列中的任務來執行,從而提高效率。
7.2 RecursiveTask和RecursiveAction
Fork/Join框架主要通過兩個抽象類來實現: - RecursiveTask: 用於有返回值的任務 - RecursiveAction: 用於沒有返回值的任務
7.2.1 使用RecursiveTask
以下是一個使用RecursiveTask計算斐波那契數列(Successione di Fibonacci)的例子:
import java.util.concurrent.RecursiveTask;
public class FibonacciTask extends RecursiveTask<Integer> {
private final int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
FibonacciTask f1 = new FibonacciTask(n - 1);
f1.fork();
FibonacciTask f2 = new FibonacciTask(n - 2);
return f2.compute() + f1.join();
}
}
7.2.2 使用RecursiveAction
以下是一個使用RecursiveAction進行數組排序的例子:
import java.util.Arrays;
import java.util.concurrent.RecursiveAction;
public class MergeSortTask extends RecursiveAction {
private final int[] array;
private final int start;
private final int end;
public MergeSortTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= 1000) {
Arrays.sort(array, start, end);
} else {
int mid = (start + end) / 2;
MergeSortTask left = new MergeSortTask(array, start, mid);
MergeSortTask right = new MergeSortTask(array, mid, end);
invokeAll(left, right);
merge(start, mid, end);
}
}
private void merge(int start, int mid, int end) {
// 合併兩個已排序的子數組
// 實現省略...
}
}
7.3 ForkJoinPool
ForkJoinPool是執行Fork/Join任務的執行器服務。
從Java 8開始,我們可以使用ForkJoinPool.commonPool()來獲取公共的ForkJoinPool實例。
import java.util.concurrent.ForkJoinPool;
public class Main {
public static void main(String[] args) {
ForkJoinPool pool = ForkJoinPool.commonPool();
int result = pool.invoke(new FibonacciTask(20));
System.out.println("Fibonacci(20) = " + result);
int[] array = new int[10000];
// 初始化數組...
pool.invoke(new MergeSortTask(array, 0, array.length));
}
}
7.4 使用Fork/Join框架的最佳實踐
- 適當的任務粒度: 任務不應該太小(避免過多的開銷)或太大(影響負載平衡)。
- 避免同步: Fork/Join框架設計用於獨立任務,避免在任務間使用同步。
- 最小化任務間的依賴: 任務應該盡可能獨立,以充分利用並行性。
- 利用Java 8的並行流: 對於簡單的並行操作,可以考慮使用並行流。
import java.util.Arrays;
public class ParallelStreamExample {
public static void main(String[] args) {
int[] array = new int[1000000];
// 初始化數組...
long sum = Arrays.stream(array).parallel().sum();
System.out.println("Sum: " + sum);
}
}
- 注意異常處理: Fork/Join任務中的未捕獲異常可能導致整個計算失敗。
- 監控和調優: 使用ForkJoinPool的方法來監控執行狀況,必要時進行調優。
7.5 Fork/Join框架的適用場景
Fork/Join框架特別適合以下類型的問題: - 可以遞歸分解的問題(如快速排序、歸併排序) - 大規模數據處理(如並行搜索、並行聚合) - 圖像處理和計算機圖形學中的某些問題