线程同步操作:
C++ 标准库提供了以下几种线程同步的方式:
不同的同步方式具有不同的使用场景和性能,实际使用时根据不同的场景选择不同的同步方式,分别就几种以上几种方式进行简要介绍:
互斥量(mutex)是防止同时访问共享资源的程序对象。为避免线程更新共享变量时所出现问题,必须使用互斥量(mutex 是 mutual exclusion 的缩写)来确保同时仅有一个线程可以访问某项共享资源。 即使用互斥量来实现原子访问操作,防止多个线程对临界区同时操作而产生不一致的问题。mutex 只有锁定(locked)和未锁定(unlocked)两种状态。任何时候,至多只有一个线程可以锁定互斥量。试图对已经锁定的互斥量再次加锁,将可能阻塞线程或者报错失败,mutex 的底层可能封装的是操作系统 spinlock,不同的操作系统下可能有不同的实现。C++ 中关于 mutex 的头文件为 #include <mutex>。
C++
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void print_block (int n, char c) {
mtx.lock();
for (int i=0; i<n; ++i) { std::cout << c; }
std::cout << '\n';
mtx.unlock();
}
int main ()
{
std::thread th1 (print_block,50,'*');
std::thread th2 (print_block,50,'$');
th1.join();
th2.join();
return 0;
}
/*
****************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
*/
C++ 中还定义了 timed_mutex:在 mutex 的基础上增加了超时加锁的功能。
recursive_mutex:在 mutex 的基础上增加了递归加锁的功能(此时,lock() 函数可以被同一线程在不释放锁的情况下多次调用)。
C++
std::recursive_mutex mtx;
void fun1() {
mtx.lock();
mtx.unlock();
}
void fun2() {
mtx.lock();
fun1(); // recursive lock
mtx.unlock();
};
std::shared_mutex 是 C++ 17 标准中引入的,由 unique_lock 和 shared_lock 两个类模板配合 shared_mutex 使用,主要用于读写共享锁。unique_lock 用于写入时加锁,shared_lock 用于读取时加锁。对象在构造时自动对 std::shared_mutex 加锁,析构时自动对其解锁。头文件主要包含在 #include <shared_mutex>。shared_mutex 可用于保护共享数据不被多个线程同时访问。与其他便于独占访问的互斥锁类型相比,shared_mutex 具有两个访问级别:
共享互斥锁通常用于多个读取操作可以同时访问同一资源而不会导致数据竞争,但只有一个写入操作的场景。
C++
#include <iostream>
#include <mutex> // For std::unique_lock
#include <shared_mutex>
#include <thread>
class ThreadSafeCounter {
public:
ThreadSafeCounter() = default;
// 多个线程可以同时读取 countter 计数
unsigned int get() const {
std::shared_lock lock(mutex_);
return value_;
}
// 只有1个线程可以修改 countter 计数
void increment() {
std::unique_lock lock(mutex_);
value_++;
}
// 只有1个线程可以修改 countter 计数
void reset() {
std::unique_lock lock(mutex_);
value_ = 0;
}
private:
mutable std::shared_mutex mutex_;
unsigned int value_ = 0;
};
int main() {
ThreadSafeCounter counter;
auto increment_and_print = [&counter]() {
for (int i = 0; i < 3; i++) {
counter.increment();
std::cout << std::this_thread::get_id() << ' ' << counter.get() << '\n';
// Note: Writing to std::cout actually needs to be synchronized as well
// by another std::mutex. This has been omitted to keep the example small.
}
};
std::thread thread1(increment_and_print);
std::thread thread2(increment_and_print);
thread1.join();
thread2.join();
}
/*
139677317637888 2
139677317637888 3
139677309245184 4
139677309245184 5
139677309245184 6
139677317637888 6
*/
我们可以看到 increment 同时只能有一个线程对计数进行增加,但可能同时存在多个线程读取同一个计数。
shared_timed_mutex 是在 shared_mutex 的基础上增加了超时加锁的功能。
lock_guard:使用了 RAII 的机制对互斥量进行类模板封装,构造时加锁,析构时解锁。
C++
#include <mutex>
std::mutex mtx;
void f()
{
const std::lock_guard<std::mutex> lock(mtx);
// ...
// mtx is automatically released when lock goes out of scope
}
互斥量包装器对比原生的 mutex 来说,创建即加锁,作用域结束自动析构并解锁,无需手动解锁。缺点是不能中途解锁,不支持复制和移动。在需要加锁的地方,只需要任意实例化一个 lock_guard,调用构造函数成功上锁,出作用域时则 lock_guard 对象会被销毁,调用析构函数自动解锁可以有效避免死锁问题,但是提供的功能单一且不够灵活。
unique_lock:unique_lock 类模板也是采用 RAII 的方式对锁进行了封装,并且也是以独占所有权的方式管理 mutex 对象的上锁和解锁操作,即其对象之间不能发生拷贝。在构造(或移动 move 赋值)时,unique_lock 对象需要传递一个 mutex 对象作为它的参数,新创建的 unique_lock 对象负责传入的 mutex 对象的上锁和解锁操作。使用以上类型互斥量实例化 unique_lock 的对象时,自动调用构造函数上锁,unique_lock 对象销毁时自动调用析构函数解锁,可以很方便的防止死锁问题。与 lock_guard 不同的是,unique_lock 更加的灵活,提供了更多的成员函数:
4.条件变量(condition variable):
我们可以看到 wait 函数如下:
C++
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate stop_waiting );
线程会一直挂起,直到 stop_waiting 为 true 为止。程序示例如下:
C++
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
std::unique_lock<std::mutex> lk(m);
// worker 线程等待 ready
cv.wait(lk, []{return ready;});
// 唤醒执行
std::cout << "Worker thread is processing data\n";
data += " after processing";
// processed 设置为 true, 唤醒 main 线程
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// 释放锁,防止再次被唤醒。
lk.unlock();
// 唤醒 main 线程
cv.notify_one();
}
int main()
{
std::thread worker(worker_thread);
// 让 worker 线程先执行,再进行唤醒,否则可能出现 ready = true 先于 worker 线程的执行
worker.detach();
data = "Example data";
// 设置 ready 为 true, 唤醒 worker 线程
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
// 唤醒 worker 线程
cv.notify_one();
// 等待 worker 线程
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';
return 0;
}
C++ 20 中添加了 C++ 中的信号量为二元信号量与计数信号量,二元信号量实际为计数信号量模板的特化。
binary_semaphore:二元信号量类似于互斥量,信号量只有 0 与 1 。
counting_semaphore:计数信号量
所有关于信号量的定义参考头文件 #include <semaphore>,计数信号量是一种轻量级同步原语,可以控制对共享资源的访问。与 std::mutex 不同的是,acounting_semaphore 至少允许 LeastMaxValue 并发访问者对同一资源进行多个并发访问。Acounting_semaphore 包含一个由构造函数初始化的内部计数器。该计数器可以通过 acquire() 获取资源访问权限,并通过调用 release() 来释放资源从而递增计数器。当计数器为零时,调用 acquire() 时就会阻塞直到计数器增加,但是调用 try_acquire( ) 不阻塞;try_acquire_for() 和 try_acquire_until() 阻塞直到计数器增加或达到超时。
C++
#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
std::binary_semaphore
smphSignalMainToThread{0},
smphSignalThreadToMain{0};
void ThreadProc()
{
// 第一次进入阻塞
smphSignalMainToThread.acquire();
std::cout << "[thread] Got the signal\n"; // response message
using namespace std::literals;
std::this_thread::sleep_for(3s);
std::cout << "[thread] Send the signal\n"; // message
// 唤醒 main 线程
smphSignalThreadToMain.release();
}
int main()
{
std::thread thrWorker(ThreadProc);
std::cout << "[main] Send the signal\n"; // message
// 唤醒 ThreadProc
smphSignalMainToThread.release();
// main 线程阻塞
smphSignalThreadToMain.acquire();
std::cout << "[main] Got the signal\n"; // response message
thrWorker.join();
}
/*
[main] Send the signal
[thread] Got the signal
[thread] Send the signal
[main] Got the signal
*/
C++ 20 以后支持 latch 与 barrier,他们同样可以用来线程同步。
C++
#include <latch>
std::latch work_done(4);
work_done.count_down(); // decrements the counter in a non-blocking manner
work_done.wait(); // blocks until the counter reaches zero
bool ok = work_done.try_wait(); // tests if the internal counter equals zero
work_done.arrive_and_wait(); // decrements the counter and blocks until it reaches zero
一个 barrier 的生命周期包含多个阶段,每个阶段都定义了一个同步点。一个 barrier 阶段包含:
C++
#include <barrier>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
int main() {
const auto workers = { "anil", "busara", "carl" };
auto on_completion = []() noexcept {
// locking not needed here
static auto phase = "... done\n" "Cleaning up...\n";
std::cout << phase;
phase = "... done\n";
};
std::barrier sync_point(std::ssize(workers), on_completion);
auto work = [&](std::string name) {
std::string product = " " + name + " worked\n";
std::cout << product; // ok, op<< call is atomic
sync_point.arrive_and_wait();
// 全部到达后,进行下一阶段
product = " " + name + " cleaned\n";
std::cout << product;
sync_point.arrive_and_wait();
};
std::cout << "Starting...\n";
std::vector<std::thread> threads;
for (auto const& worker : workers) {
threads.emplace_back(work, worker);
}
for (auto& thread : threads) {
thread.join();
}
}
/*
Starting...
anil worked
carl worked
busara worked
... done
Cleaning up...
busara cleaned
anil cleaned
carl cleaned
... done
*/
C++ 11 以后支持 call_once。确保某个操作只被执行一次(成功执行才算),即使是多线程环境下也确保只执行一次。
C++
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
如果在 call_once 被调用时,flag 表明 f 已经被调用,则 call_once 立即返回(这种调用 call_once 称为被动)。
C++
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag flag1, flag2;
void simple_do_once()
{
std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
int main()
{
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
std::thread st3(simple_do_once);
std::thread st4(simple_do_once);
st1.join();
st2.join();
st3.join();
st4.join();
}
/*
Simple example: called once
*/
阅读量:2046
点赞量:0
收藏量:0