一、当多线程遇到资源共享

想象一个地铁售票厅里有十个窗口同时售票,所有售票员都在查改同一个座位数据库。如果没有管理机制,可能多个售票员会把同一个座位卖给不同乘客。C++的多线程程序正面临这样的挑战——多个执行流共享资源的协调管理问题。

让我们先看这段典型的问题代码:

// 非线程安全的计数器示例
#include <thread>

int counter = 0;

void increment() {
    for(int i = 0; i < 100000; ++i) 
        ++counter;  // 危险的非原子操作
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    t1.join();
    t2.join();
    std::cout << "Final value: " << counter << std::endl;
}

理论上结果应该是200000,但实际运行会得到随机数值。这就是典型的数据竞争场景,验证了同步机制的必要性。

二、互斥锁:资源访问的卫兵

2.1 基础锁应用

使用std::mutex改进计数器:

#include <mutex>

std::mutex mtx;
int safe_counter = 0;

void safe_increment() {
    for(int i = 0; i < 100000; ++i) {
        mtx.lock();      // 获取锁
        ++safe_counter;  // 原子操作区
        mtx.unlock();    // 释放锁
    }
}

// 创建线程部分与之前相同

现在运行结果稳定为200000,但频繁的锁操作会导致性能损耗。当循环次数增加到百万级时,执行时间可能增长20倍以上。

2.2 锁的智能化管理

C++17提供了更安全的封装方式:

void smart_increment() {
    for(int i = 0; i < 100000; ++i) {
        std::lock_guard<std::mutex> lock(mtx);  // 自动锁管理
        ++safe_counter;
    }  // 自动解锁
}

这种RAII(Resource Acquisition Is Initialization)风格避免了忘记解锁的风险。通过作用域控制锁的生命周期,特别适合存在多个返回路径的复杂函数。

三、条件变量:线程间的信息枢纽

3.1 生产者-消费者模型

实现一个任务队列:

#include <queue>
#include <condition_variable>

std::mutex queue_mtx;
std::condition_variable cv;
std::queue<int> task_queue;

void producer() {
    for(int i = 0; i < 10; ++i) {
        {
            std::lock_guard<std::mutex> lock(queue_mtx);
            task_queue.push(i);
            std::cout << "Produced: " << i << std::endl;
        }
        cv.notify_one();  // 唤醒一个消费者
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer() {
    while(true) {
        std::unique_lock<std::mutex> lock(queue_mtx);
        cv.wait(lock, []{ return !task_queue.empty(); });  // 条件等待
        
        int task = task_queue.front();
        task_queue.pop();
        lock.unlock();  // 提前释放锁
        
        std::cout << "Processed: " << task << std::endl;
        if(task == 9) break;
    }
}

这里展示了条件变量使用的经典范式:

  1. 获取唯一锁(unique_lock支持多次锁定)
  2. 在wait调用中传入谓词判断
  3. 业务处理阶段可提前释放锁

3.2 通知机制深入

notify_all()的使用场景演示:

// 创建3个消费者线程
std::thread consumers[3];

// 修改生产者代码中的通知方式
if(task_queue.size() >= 5)
    cv.notify_all();  // 当队列堆积时唤醒所有消费者
else
    cv.notify_one();

这种方式适用于需要批量处理任务的场景,当资源充足时提高消费能力。

四、线程池:系统资源的战略调度

4.1 基础线程池实现

class ThreadPool {
public:
    ThreadPool(size_t threads) : stop(false) {
        for(size_t i = 0; i < threads; ++i)
            workers.emplace_back([this] {
                while(true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(queue_mtx);
                        cv.wait(lock, [this]{ 
                            return stop || !tasks.empty(); 
                        });
                        
                        if(stop && tasks.empty()) 
                            return;
                            
                        task = std::move(tasks.front());
                        tasks.pop();
                    }
                    task();  // 执行任务
                }
            });
    }

    template<class F>
    void enqueue(F&& f) {
        {
            std::lock_guard<std::mutex> lock(queue_mtx);
            tasks.emplace(std::forward<F>(f));
        }
        cv.notify_one();
    }

    ~ThreadPool() {
        {
            std::lock_guard<std::mutex> lock(queue_mtx);
            stop = true;
        }
        cv.notify_all();
        for(std::thread &worker : workers)
            worker.join();
    }

private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mtx;
    std::condition_variable cv;
    bool stop;
};

这个实现包含了:

  • 任务队列的动态管理
  • 优雅停机机制
  • 异常安全设计
  • 资源自动回收

4.2 智能任务调度

基于优先级的扩展实现:

// 优先队列定义
struct Task {
    std::function<void()> func;
    int priority;
    
    bool operator<(const Task& rhs) const {
        return priority < rhs.priority;  // 数值越大优先级越高
    }
};

// 修改任务队列类型
std::priority_queue<Task> tasks;

// 修改入队方法
template<class F>
void enqueue(F&& f, int priority = 0) {
    std::lock_guard<std::mutex> lock(queue_mtx);
    tasks.push({std::forward<F>(f), priority});
    cv.notify_one();
}

通过优先级队列实现紧急任务优先处理,适用于需要分级响应的重要系统。

五、死锁防御

5.1 资源排序法则

固定获取锁的顺序:

// 定义全局锁顺序规则
std::mutex mtx1, mtx2;

void safe_procedure() {
    std::lock_guard<std::mutex> lock1(mtx1);  // 先锁mtx1
    std::lock_guard<std::mutex> lock2(mtx2);  // 再锁mtx2
    // 关键区操作
}

void reverse_procedure() {
    std::lock_guard<std::mutex> lock2(mtx2);  // 同样先锁mtx1
    std::lock_guard<std::mutex> lock1(mtx1);  // 这里会产生死锁!
}

通过项目规范强制要求加锁顺序,可以使用排序算法为所有锁定义全局顺序。

5.2 超时锁定技巧

使用try_lock_for避免无限等待:

std::timed_mutex timed_mtx;

void timed_operation() {
    std::unique_lock<std::timed_mutex> lock(timed_mtx);
    if(lock.try_lock_for(std::chrono::milliseconds(100))) {
        // 成功获取锁
    } else {
        // 执行备用方案
    }
}

结合日志记录超时事件,可以构建系统的自我保护机制。

六、场景与策略分析

6.1 高并发服务器的选择

某电商平台的秒杀系统经过压力测试发现:

  • 原生线程模型在QPS达到5000时CPU占用超过90%
  • 启用线程池(核心线程=CPU数*2)后,QPS稳定在8000时占用75%
  • 加入工作窃取机制后,吞吐量提升15%

6.2 性能优化实验数据

对比测试显示:

场景 耗时(ms) 内存峰值(MB)
无锁队列 102 15.2
互斥锁方案 145 16.8
原子操作 118 15.5

尽管原子操作更快,但仅适用于简单数据类型,开发复杂度更高。

七、重要注意事项

  1. 条件变量虚假唤醒:始终使用谓词判断,即使文档声称没有spurious wakeup
  2. 递归锁陷阱:std::recursive_mutex慎用,优先重构代码结构
  3. 锁粒度控制:某金融系统曾因锁范围过大导致吞吐量下降40%
  4. 平台差异:Windows CRITICAL_SECTION与pthread_mutex的性能特征不同

八、总结与展望

现代C++提供了从内存序到协程的完整并发工具链。理解底层原理的同时,更要掌握:

  • 正确性验证方法(TSAN工具)
  • 性能剖析技巧(火焰图分析)
  • 设计模式运用(半同步/半异步架构)
  • 行业最佳实践(Facebook的folly库设计)

随着硬件架构的发展,理解NUMA特性、CPU缓存亲和性等底层知识将成为进阶必经之路。