Java 并发编程最佳实践
多线程和并发编程是 Java 平台的核心优势,也是构建高性能、响应式应用的基础。然而,并发编程也是最容易出错的领域之一,稍有不慎就会导致死锁、活锁、竞态条件等难以调试的问题。本文将详细介绍 Java 并发编程的核心概念、常见挑战和最佳实践。
1. 并发编程基础
1.1 线程基础
Java 中创建线程的两种基本方式:
方式一:继承 Thread 类
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread is running: " + Thread.currentThread().getName());
}
}
// 使用
MyThread thread = new MyThread();
thread.start(); // 不要直接调用 run() 方法
方式二:实现 Runnable 接口(推荐)
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable is running: " + Thread.currentThread().getName());
}
}
// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();
// Java 8 Lambda 表达式简化
Thread thread = new Thread(() -> {
System.out.println("Lambda Runnable is running");
});
thread.start();
1.2 线程状态与生命周期
Java 线程的生命周期包含六个状态:
- NEW:新创建但尚未启动的线程
- RUNNABLE:可运行状态,等待 CPU 分配时间片
- BLOCKED:线程被阻塞,等待监视器锁
- WAITING:无限期等待另一个线程执行特定操作
- TIMED_WAITING:有限期等待另一个线程执行操作
- TERMINATED:线程已执行完毕
// 获取线程状态
Thread thread = new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("Before starting: " + thread.getState()); // NEW
thread.start();
System.out.println("After starting: " + thread.getState()); // RUNNABLE
Thread.sleep(1000);
System.out.println("While sleeping: " + thread.getState()); // TIMED_WAITING
thread.join();
System.out.println("After completion: " + thread.getState()); // TERMINATED
1.3 线程优先级与调度
Java 线程调度是基于优先级的抢占式调度:
// 设置线程优先级
thread.setPriority(Thread.MIN_PRIORITY); // 1
thread.setPriority(Thread.NORM_PRIORITY); // 5 (默认)
thread.setPriority(Thread.MAX_PRIORITY); // 10
需要注意,线程优先级高并不保证一定先执行,只是获得 CPU 时间片的概率更高。优先级的效果受到操作系统和 JVM 实现的影响。
2. 线程安全与同步机制
2.1 竞态条件与临界区
竞态条件是指多个线程以不可预期的顺序访问共享资源,导致程序出现错误:
// 线程不安全的计数器示例
class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 非原子操作
}
public int getCount() {
return count;
}
}
2.2 同步机制
解决竞态条件的主要方法是使用同步机制:
1. synchronized 关键字
// 同步方法
class SafeCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
// 同步块
class SafeCounter {
private int count = 0;
private final Object lock = new Object();
public void increment() {
synchronized(lock) {
count++;
}
}
public int getCount() {
synchronized(lock) {
return count;
}
}
}
2. volatile 关键字
volatile
保证变量的可见性,但不保证原子性:
class SharedData {
private volatile boolean flag = false;
public void setFlag(boolean flag) {
this.flag = flag;
}
public boolean isFlag() {
return flag;
}
}
3. 原子类
java.util.concurrent.atomic
包提供了原子类,用于无锁编程:
import java.util.concurrent.atomic.AtomicInteger;
class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
2.3 死锁与避免策略
死锁是指两个或多个线程互相等待对方持有的锁,导致永久阻塞:
// 死锁示例
public void deadlockExample() {
final Object resource1 = new Object();
final Object resource2 = new Object();
Thread thread1 = new Thread(() -> {
synchronized(resource1) {
System.out.println("Thread 1: Holding resource 1");
try { Thread.sleep(100); } catch (InterruptedException e) {}
System.out.println("Thread 1: Waiting for resource 2");
synchronized(resource2) {
System.out.println("Thread 1: Holding resource 1 and 2");
}
}
});
Thread thread2 = new Thread(() -> {
synchronized(resource2) {
System.out.println("Thread 2: Holding resource 2");
try { Thread.sleep(100); } catch (InterruptedException e) {}
System.out.println("Thread 2: Waiting for resource 1");
synchronized(resource1) {
System.out.println("Thread 2: Holding resource 2 and 1");
}
}
});
thread1.start();
thread2.start();
}
避免死锁的策略:
- 锁顺序:始终按照相同的顺序获取锁
- 锁超时:使用带超时的锁获取方法
- 死锁检测:使用线程转储和监控工具检测死锁
- 使用 tryLock():尝试获取锁,如果不可用则执行其他操作
3. Java并发工具类
3.1 Lock 接口
java.util.concurrent.locks
包提供了比 synchronized
更灵活的锁机制:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ReentrantLockCounter {
private int count = 0;
private final Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 确保在异常情况下也能释放锁
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
Lock 接口相比 synchronized 的优势:
- 非阻塞获取锁 (
tryLock()
) - 可中断获取锁 (
lockInterruptibly()
) - 超时获取锁 (
tryLock(long timeout, TimeUnit unit)
) - 多条件变量 (
newCondition()
)
3.2 读写锁
读写锁允许多个线程同时读取,但只允许一个线程写入:
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class ReadWriteMap<K, V> {
private final Map<K, V> map = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public V get(K key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public V put(K key, V value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
writeLock.unlock();
}
}
}
3.3 并发集合
java.util.concurrent
包提供了线程安全的集合类:
- ConcurrentHashMap:线程安全的哈希表,比
Hashtable
性能更好 - CopyOnWriteArrayList:适用于读多写少的场景
- ConcurrentLinkedQueue:无界线程安全队列
- BlockingQueue:支持阻塞操作的队列,常用于生产者-消费者模式
// ConcurrentHashMap 示例
Map<String, String> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("key", "value");
// CopyOnWriteArrayList 示例
List<String> list = new CopyOnWriteArrayList<>();
list.add("item");
// BlockingQueue 示例
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);
queue.put(new Task()); // 如果队列满,则阻塞
Task task = queue.take(); // 如果队列空,则阻塞
3.4 线程池
使用线程池可以减少线程创建和销毁的开销,提高资源利用率:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);
// 缓存线程池(根据需要创建新线程,空闲线程会被复用)
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 单线程执行器
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 提交任务
fixedPool.submit(() -> {
System.out.println("Task executed by " + Thread.currentThread().getName());
});
// 关闭线程池
fixedPool.shutdown();
自定义线程池:
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
60, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(100), // 工作队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
4. 并发编程设计模式
4.1 生产者-消费者模式
生产者和消费者通过共享队列进行通信:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class ProducerConsumer {
private final BlockingQueue<Integer> queue;
public ProducerConsumer(int capacity) {
this.queue = new LinkedBlockingQueue<>(capacity);
}
public void produce() {
try {
for (int i = 0; i < 100; i++) {
queue.put(i);
System.out.println("Produced: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void consume() {
try {
while (true) {
Integer value = queue.take();
System.out.println("Consumed: " + value);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4.2 异步任务处理模式
通过 Future
和 CompletableFuture
实现异步任务:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 使用 CompletableFuture 进行异步任务处理
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟长时间运行的任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Task completed";
}, executor);
// 添加回调
future.thenAccept(result -> System.out.println("Result: " + result));
// 链式调用
CompletableFuture<Integer> processedFuture = future
.thenApply(String::length)
.thenApply(len -> len * 2);
// 组合多个 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
executor.shutdown();
4.3 线程局部存储模式
ThreadLocal
用于线程隔离的数据存储:
class UserContext {
private static final ThreadLocal<User> userThreadLocal = new ThreadLocal<>();
public static void setUser(User user) {
userThreadLocal.set(user);
}
public static User getUser() {
return userThreadLocal.get();
}
public static void clear() {
userThreadLocal.remove(); // 防止内存泄漏
}
}
// 使用
try {
User user = new User("John");
UserContext.setUser(user);
// 在任何地方访问用户信息,而不需要参数传递
processRequest();
} finally {
UserContext.clear(); // 重要:清理 ThreadLocal 避免内存泄漏
}
5. Java并发编程最佳实践
5.1 避免过度同步
- 最小化同步范围:只同步临界区,不要同步整个方法
// 不好的实践
public synchronized void processAndLog(Data data) {
// 耗时的数据处理
process(data);
// 日志记录
log(data);
}
// 好的实践
public void processAndLog(Data data) {
// 非临界区代码不需要同步
Data processedData = process(data);
// 只同步需要线程安全的操作
synchronized(this) {
log(processedData);
}
}
- 使用并发集合代替同步集合:
- 使用
ConcurrentHashMap
代替Hashtable
或Collections.synchronizedMap()
- 使用
CopyOnWriteArrayList
代替Vector
或Collections.synchronizedList()
- 使用
5.2 避免不必要的对象共享
- 使用不可变对象:不可变对象天生是线程安全的
- 线程封闭:确保对象只被一个线程访问
- 线程局部存储:使用
ThreadLocal
避免共享
5.3 正确处理线程中断
void interruptibleMethod() {
try {
while (!Thread.currentThread().isInterrupted()) {
// 执行任务...
// 阻塞操作
Thread.sleep(1000);
}
} catch (InterruptedException e) {
// 重新设置中断标志
Thread.currentThread().interrupt();
// 清理资源
} finally {
// 清理资源
}
}
5.4 合理使用线程池
选择合适的线程池类型:
- IO密集型任务:线程数 = CPU核心数 * (1 + 等待时间/计算时间)
- CPU密集型任务:线程数 = CPU核心数 + 1
合理设置线程池参数:
- 避免过大的队列和过多的线程
- 使用有界队列防止内存溢出
- 设置合理的拒绝策略
int cpuCores = Runtime.getRuntime().availableProcessors();
// CPU密集型任务
ThreadPoolExecutor cpuPool = new ThreadPoolExecutor(
cpuCores + 1,
cpuCores + 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("cpu-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// IO密集型任务
ThreadPoolExecutor ioPool = new ThreadPoolExecutor(
cpuCores * 2,
cpuCores * 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
5.5 使用 CompletableFuture 进行异步编程
// 避免嵌套回调
CompletableFuture.supplyAsync(() -> fetchUserData(userId))
.thenApply(user -> enrichUserData(user))
.thenApply(user -> formatUserData(user))
.thenAccept(formattedData -> display(formattedData))
.exceptionally(ex -> {
handleError(ex);
return null;
});
// 组合多个异步操作
CompletableFuture<UserData> userFuture = CompletableFuture.supplyAsync(() -> fetchUserData(userId));
CompletableFuture<ProductData> productFuture = CompletableFuture.supplyAsync(() -> fetchProductData(productId));
CompletableFuture<Page> pageFuture = userFuture.thenCombine(productFuture, (user, product) -> {
return createPage(user, product);
});
5.6 使用并发工具而非底层同步
优先使用高级工具:
- 使用
ConcurrentHashMap
代替手动同步的HashMap
- 使用
AtomicInteger
代替synchronized
的计数器 - 使用
BlockingQueue
代替手动实现的生产者-消费者队列
- 使用
使用 CountDownLatch 协调多线程:
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(workerCount);
for (int i = 0; i < workerCount; i++) {
executor.submit(() -> {
try {
startSignal.await(); // 等待开始信号
doWork();
} finally {
doneSignal.countDown(); // 通知任务完成
}
});
}
// 准备工作完成后,发送开始信号
startSignal.countDown();
// 等待所有工作线程完成
doneSignal.await();
- 使用 CyclicBarrier 协调多阶段计算:
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
// 每轮结束时执行
System.out.println("Phase completed");
});
executor.submit(() -> {
for (int i = 0; i < phases; i++) {
doPhaseWork();
barrier.await(); // 等待所有线程完成本阶段
}
});
6. 常见并发问题诊断与处理
6.1 死锁检测与处理
检测死锁:
// 获取Java线程转储
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null) {
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreads, true, true);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo);
}
}
处理策略:
- 按照固定顺序获取锁
- 使用
tryLock()
方法和超时机制 - 使用开放调用:不要在持有锁的情况下调用外部方法
6.2 线程安全问题排查
使用静态分析工具:
- FindBugs/SpotBugs
- SonarQube
- IntelliJ IDEA 内置的线程安全分析
使用并发调试工具:
- Java Flight Recorder
- VisualVM
压力测试暴露并发问题:
- JMeter
- Gatling
6.3 性能问题优化
减少锁争用:
- 使用分段锁(如 ConcurrentHashMap 的实现)
- 使用高效的并发数据结构
- 减小锁粒度
合理使用 volatile:
- 对于简单的标志变量,使用 volatile 比同步更轻量
- 但请记住 volatile 不保证原子性
优化线程池配置:
- 监控线程池使用情况
- 根据实际负载调整线程池大小
7. Java 并发编程实战案例
7.1 高性能缓存实现
public class ConcurrentCache<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, Long> timestamps = new ConcurrentHashMap<>();
private final long expirationTimeMs;
public ConcurrentCache(long expirationTimeMs) {
this.expirationTimeMs = expirationTimeMs;
// 启动清理线程
startCleanupThread();
}
public V get(K key) {
Long timestamp = timestamps.get(key);
if (timestamp == null) {
return null;
}
if (System.currentTimeMillis() - timestamp > expirationTimeMs) {
cache.remove(key);
timestamps.remove(key);
return null;
}
return cache.get(key);
}
public void put(K key, V value) {
cache.put(key, value);
timestamps.put(key, System.currentTimeMillis());
}
private void startCleanupThread() {
Thread cleanupThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(expirationTimeMs / 2);
long currentTime = System.currentTimeMillis();
for (Map.Entry<K, Long> entry : timestamps.entrySet()) {
if (currentTime - entry.getValue() > expirationTimeMs) {
K key = entry.getKey();
cache.remove(key);
timestamps.remove(key);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
cleanupThread.setDaemon(true);
cleanupThread.start();
}
}
7.2 并发请求限流器
public class RateLimiter {
private final AtomicInteger count = new AtomicInteger(0);
private final int limit;
private final long timeWindowMs;
private final AtomicLong lastResetTime;
public RateLimiter(int limit, long timeWindowMs) {
this.limit = limit;
this.timeWindowMs = timeWindowMs;
this.lastResetTime = new AtomicLong(System.currentTimeMillis());
}
public boolean allowRequest() {
long currentTime = System.currentTimeMillis();
long lastReset = lastResetTime.get();
if (currentTime - lastReset > timeWindowMs) {
// 尝试重置计数器,使用 CAS 避免竞态条件
if (lastResetTime.compareAndSet(lastReset, currentTime)) {
count.set(0);
}
}
return count.incrementAndGet() <= limit;
}
}
7.3 并发数据处理流水线
public class DataProcessingPipeline<T, R> {
private final BlockingQueue<T> inputQueue;
private final BlockingQueue<R> outputQueue;
private final ExecutorService executor;
private final Function<T, R> processingFunction;
private final AtomicBoolean running = new AtomicBoolean(false);
public DataProcessingPipeline(int queueSize, int workerCount, Function<T, R> processingFunction) {
this.inputQueue = new LinkedBlockingQueue<>(queueSize);
this.outputQueue = new LinkedBlockingQueue<>(queueSize);
this.processingFunction = processingFunction;
this.executor = Executors.newFixedThreadPool(workerCount);
}
public void start() {
if (running.compareAndSet(false, true)) {
IntStream.range(0, executor.getCorePoolSize()).forEach(i -> {
executor.submit(this::processItems);
});
}
}
public void stop() {
running.set(false);
executor.shutdownNow();
}
public CompletableFuture<Void> submit(T item) {
return CompletableFuture.runAsync(() -> {
try {
inputQueue.put(item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
}
});
}
public CompletableFuture<R> getOutput() {
return CompletableFuture.supplyAsync(() -> {
try {
return outputQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
}
});
}
private void processItems() {
while (running.get() && !Thread.currentThread().isInterrupted()) {
try {
T input = inputQueue.poll(100, TimeUnit.MILLISECONDS);
if (input != null) {
R result = processingFunction.apply(input);
outputQueue.put(result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
// 处理错误,可能记录日志或重试
}
}
}
}
总结
Java 并发编程既是一个强大的工具,也是一个充满挑战的领域。通过掌握核心概念、遵循最佳实践、使用正确的工具和模式,可以开发出高效、可靠的并发应用。
关键要点:
- 理解并发基础概念,如线程状态和同步机制
- 合理使用并发工具类,如 Lock、原子类和并发集合
- 遵循最佳实践,如避免过度同步、正确处理中断
- 使用高级 API,如 CompletableFuture 进行异步编程
- 实施积极的调试和监控策略
随着 Java 语言的发展,并发 API 也在不断演进,保持学习新特性和最佳实践将帮助开发者构建更优秀的并发应用。