2.线程同步与异步-灵析社区

菜鸟码转

线程同步操作:

C++ 标准库提供了以下几种线程同步的方式:

  • 互斥量(支持超时加锁、递归加锁);
  • 读写锁(共享互斥量,也支持超时加锁);
  • 互斥量包装器(基于 RAII 的思想);
  • 条件变量;
  • 信号量(二元信号量、计数信号量);
  • barrier;
  • call_once;

不同的同步方式具有不同的使用场景和性能,实际使用时根据不同的场景选择不同的同步方式,分别就几种以上几种方式进行简要介绍:

1.互斥量(mutex):

互斥量(mutex)是防止同时访问共享资源的程序对象。为避免线程更新共享变量时所出现问题,必须使用互斥量(mutex 是 mutual exclusion 的缩写)来确保同时仅有一个线程可以访问某项共享资源。 即使用互斥量来实现原子访问操作,防止多个线程对临界区同时操作而产生不一致的问题。mutex 只有锁定(locked)和未锁定(unlocked)两种状态。任何时候,至多只有一个线程可以锁定互斥量。试图对已经锁定的互斥量再次加锁,将可能阻塞线程或者报错失败,mutex 的底层可能封装的是操作系统 spinlock,不同的操作系统下可能有不同的实现。C++ 中关于 mutex 的头文件为 #include <mutex>。

C++

#include <iostream>       
#include <thread>        
#include <mutex>          

std::mutex mtx;     

void print_block (int n, char c) {
  mtx.lock();
  for (int i=0; i<n; ++i) { std::cout << c; }
  std::cout << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread th1 (print_block,50,'*');
  std::thread th2 (print_block,50,'$');

  th1.join();
  th2.join();

  return 0;
}
/*
****************************************
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
*/

C++ 中还定义了 timed_mutex:在 mutex 的基础上增加了超时加锁的功能。

recursive_mutex:在 mutex 的基础上增加了递归加锁的功能(此时,lock() 函数可以被同一线程在不释放锁的情况下多次调用)。

C++

std::recursive_mutex mtx;

void fun1() {
    mtx.lock();
    mtx.unlock();
}

void fun2() {
    mtx.lock();
    fun1(); // recursive lock 
    mtx.unlock();
};

2.共享互斥量:

std::shared_mutex 是 C++ 17 标准中引入的,由 unique_lock 和 shared_lock 两个类模板配合 shared_mutex 使用,主要用于读写共享锁。unique_lock 用于写入时加锁,shared_lock 用于读取时加锁。对象在构造时自动对 std::shared_mutex 加锁,析构时自动对其解锁。头文件主要包含在 #include <shared_mutex>。shared_mutex 可用于保护共享数据不被多个线程同时访问。与其他便于独占访问的互斥锁类型相比,shared_mutex 具有两个访问级别:

  • shared:多个线程可以共享同一个互斥锁的所有权。
  • exclusive :只有一个线程可以拥有互斥锁。

共享互斥锁通常用于多个读取操作可以同时访问同一资源而不会导致数据竞争,但只有一个写入操作的场景。

C++

#include <iostream>
#include <mutex>  // For std::unique_lock
#include <shared_mutex>
#include <thread>
 
class ThreadSafeCounter {
 public:
  ThreadSafeCounter() = default;
 
  // 多个线程可以同时读取 countter 计数
  unsigned int get() const {
    std::shared_lock lock(mutex_);
    return value_;
  }
 
  // 只有1个线程可以修改 countter 计数
  void increment() {
    std::unique_lock lock(mutex_);
    value_++;
  }
 
  // 只有1个线程可以修改 countter 计数
  void reset() {
    std::unique_lock lock(mutex_);
    value_ = 0;
  }
 
 private:
  mutable std::shared_mutex mutex_;
  unsigned int value_ = 0;
};
 
int main() {
  ThreadSafeCounter counter;
 
  auto increment_and_print = [&counter]() {
    for (int i = 0; i < 3; i++) {
      counter.increment();
      std::cout << std::this_thread::get_id() << ' ' << counter.get() << '\n';
 
      // Note: Writing to std::cout actually needs to be synchronized as well
      // by another std::mutex. This has been omitted to keep the example small.
    }
  };
 
  std::thread thread1(increment_and_print);
  std::thread thread2(increment_and_print);
 
  thread1.join();
  thread2.join();
}
/*
139677317637888 2
139677317637888 3
139677309245184 4
139677309245184 5
139677309245184 6
139677317637888 6
*/

我们可以看到 increment 同时只能有一个线程对计数进行增加,但可能同时存在多个线程读取同一个计数。

shared_timed_mutex 是在 shared_mutex 的基础上增加了超时加锁的功能。

3.互斥量包装器

lock_guard:使用了 RAII 的机制对互斥量进行类模板封装,构造时加锁,析构时解锁。

C++

#include <mutex>
std::mutex mtx;
void f()
{
  const std::lock_guard<std::mutex> lock(mtx);
  // ...
  // mtx is automatically released when lock goes out of scope
}

互斥量包装器对比原生的 mutex 来说,创建即加锁,作用域结束自动析构并解锁,无需手动解锁。缺点是不能中途解锁,不支持复制和移动。在需要加锁的地方,只需要任意实例化一个 lock_guard,调用构造函数成功上锁,出作用域时则 lock_guard 对象会被销毁,调用析构函数自动解锁可以有效避免死锁问题,但是提供的功能单一且不够灵活。

unique_lock:unique_lock 类模板也是采用 RAII 的方式对锁进行了封装,并且也是以独占所有权的方式管理 mutex 对象的上锁和解锁操作,即其对象之间不能发生拷贝。在构造(或移动 move 赋值)时,unique_lock 对象需要传递一个 mutex 对象作为它的参数,新创建的 unique_lock 对象负责传入的 mutex 对象的上锁和解锁操作。使用以上类型互斥量实例化 unique_lock 的对象时,自动调用构造函数上锁,unique_lock 对象销毁时自动调用析构函数解锁,可以很方便的防止死锁问题。与 lock_guard 不同的是,unique_lock 更加的灵活,提供了更多的成员函数:

  • 上锁/解锁操作:lock、try_lock、try_lock_for、try_lock_until 和 unlock;
  • 修改操作:支持移动赋值、交换(swap:与另一个 unique_lock 对象互换所管理的互斥量所有权)、释放(release:返回它所管理的互斥量对象的指针,并释放所有权)。
  • 获取属性:owns_lock (返回当前对象是否上了锁)、operator bool() (与 owns_lock() 的功能相同)、mutex(返回当前 unique_lock 所管理的互斥量的指针)。

4.条件变量(condition variable):

  • 在 C++ 11 以后,我们可以使用条件变量(condition_variable)实现多个线程间的同步操作;当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。C++ 中包含的头文件在 #include <condition_variable> 中。
  • 条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:
  • 一个线程因等待 条件变量的条件成立 而挂起;
  • 另外一个线程使 条件成立 从而给出唤醒线程的信号,从而唤醒被等待的线程;
  • 为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起;通常情况下这个锁是 std::mutex,并且管理这个锁只能是 std::unique_lock std::mutex 等 RAII 模板类。分别是使用以下两个方法实现:
  • 等待条件成立使用的是 condition_variable 类成员 wait 、wait_for 或 wait_until。
  • 唤醒信号使用的是 condition_variable 类成员 notify_one 或者 notify_all 函数。

我们可以看到 wait 函数如下:

C++

template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate stop_waiting );

线程会一直挂起,直到 stop_waiting 为 true 为止。程序示例如下:

C++

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    std::unique_lock<std::mutex> lk(m);
    // worker 线程等待 ready
    cv.wait(lk, []{return ready;});
 
    // 唤醒执行
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // processed 设置为 true, 唤醒 main 线程
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // 释放锁,防止再次被唤醒。
    lk.unlock();
    // 唤醒 main 线程
    cv.notify_one();
}
 
int main()
{
    std::thread worker(worker_thread);
    // 让 worker 线程先执行,再进行唤醒,否则可能出现 ready = true 先于 worker 线程的执行
    worker.detach();
    
    data = "Example data";
    // 设置 ready 为 true, 唤醒 worker 线程
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    // 唤醒 worker 线程
    cv.notify_one();
    // 等待 worker 线程
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
    return 0;
}

5.信号量:

C++ 20 中添加了 C++ 中的信号量为二元信号量与计数信号量,二元信号量实际为计数信号量模板的特化。

binary_semaphore:二元信号量类似于互斥量,信号量只有 0 与 1 。

counting_semaphore:计数信号量

所有关于信号量的定义参考头文件 #include <semaphore>,计数信号量是一种轻量级同步原语,可以控制对共享资源的访问。与 std::mutex 不同的是,acounting_semaphore 至少允许 LeastMaxValue 并发访问者对同一资源进行多个并发访问。Acounting_semaphore 包含一个由构造函数初始化的内部计数器。该计数器可以通过 acquire() 获取资源访问权限,并通过调用 release() 来释放资源从而递增计数器。当计数器为零时,调用 acquire() 时就会阻塞直到计数器增加,但是调用 try_acquire( ) 不阻塞;try_acquire_for() 和 try_acquire_until() 阻塞直到计数器增加或达到超时。

C++

#include <iostream>
#include <thread>
#include <chrono>
#include <semaphore>
 
std::binary_semaphore
	smphSignalMainToThread{0},
	smphSignalThreadToMain{0};
 
void ThreadProc()
{	
    // 第一次进入阻塞
	smphSignalMainToThread.acquire();
	std::cout << "[thread] Got the signal\n"; // response message
	using namespace std::literals;
	std::this_thread::sleep_for(3s);
	std::cout << "[thread] Send the signal\n"; // message
    // 唤醒 main 线程
	smphSignalThreadToMain.release();
}
 
int main()
{
	std::thread thrWorker(ThreadProc);
	std::cout << "[main] Send the signal\n"; // message
    // 唤醒 ThreadProc 
	smphSignalMainToThread.release();
    // main 线程阻塞
	smphSignalThreadToMain.acquire();
	std::cout << "[main] Got the signal\n"; // response message
	thrWorker.join();
}
/*
[main] Send the signal
[thread] Got the signal
[thread] Send the signal
[main] Got the signal
*/

6.barrier:

C++ 20 以后支持 latch 与 barrier,他们同样可以用来线程同步。

  • latch:类 latch 是 std::ptrdiff_t 类型的向下计数器,可用于同步线程。计数器的值在创建时初始化。线程可能会阻塞在锁存器上,直到计数器减为零。不能增加或重置计数器,这使得锁存器创建后不可重用。其内部维护着一个计数器,当计数不为 0 时,所有参与者(线程)都将阻塞在等待操作处,计数为 0 时,解除阻塞。计数器不可重置或增加,故它是一次性的,不可重用。与 std::barrier 不同,std::latch 参与线程可以多次递减。

C++

#include <latch>

std::latch work_done(4);

work_done.count_down();			 // decrements the counter in a non-blocking manner
work_done.wait();				   // blocks until the counter reaches zero
bool ok = work_done.try_wait();	 // tests if the internal counter equals zero
work_done.arrive_and_wait();	    // decrements the counter and blocks until it reaches zero
  • barrier:类似于 latch,它会阻塞线程直到所有参与者线程都到达一个同步点,直到预期数量的线程到达设定的值则会接触阻塞。与 latch 不同的是,它是可重用的。

一个 barrier 的生命周期包含多个阶段,每个阶段都定义了一个同步点。一个 barrier 阶段包含:

  • 期望计数(设创建时指定的计数为 n),当期望计数不为 0 时,参与者将阻塞于等待操作处;
  • 当期望计数为 0 时,会执行创建 barrier 时指定的阶段完成步骤,然后解除阻塞所有阻塞于同步点的参与者线程。
  • 当阶段完成步骤执行完成后,会重置期望计数为 n - 调用arrive_and_drop() 的次数,然后开始下一个阶段。

C++

#include <barrier>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
 
int main() {
  const auto workers = { "anil", "busara", "carl" };
 
  auto on_completion = []() noexcept { 
    // locking not needed here
    static auto phase = "... done\n" "Cleaning up...\n";
    std::cout << phase;
    phase = "... done\n";
  };

  std::barrier sync_point(std::ssize(workers), on_completion);
  auto work = [&](std::string name) {
    std::string product = "  " + name + " worked\n";
    std::cout << product;  // ok, op<< call is atomic
    sync_point.arrive_and_wait();
    // 全部到达后,进行下一阶段
    product = "  " + name + " cleaned\n";
    std::cout << product;
    sync_point.arrive_and_wait();
  };
 
  std::cout << "Starting...\n";
  std::vector<std::thread> threads;
  for (auto const& worker : workers) {
    threads.emplace_back(work, worker);
  }
  for (auto& thread : threads) {
    thread.join();
  }
}
/*
Starting...
  anil worked
  carl worked
  busara worked
... done
Cleaning up...
  busara cleaned
  anil cleaned
  carl cleaned
... done
*/

7.call_once:

C++ 11 以后支持 call_once。确保某个操作只被执行一次(成功执行才算),即使是多线程环境下也确保只执行一次。

C++

template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );

如果在 call_once 被调用时,flag 表明 f 已经被调用,则 call_once 立即返回(这种调用 call_once 称为被动)。

C++

#include <iostream>
#include <thread>
#include <mutex>
 
std::once_flag flag1, flag2;
 
void simple_do_once()
{
    std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
 
int main()
{
    std::thread st1(simple_do_once);
    std::thread st2(simple_do_once);
    std::thread st3(simple_do_once);
    std::thread st4(simple_do_once);
    st1.join();
    st2.join();
    st3.join();
    st4.join();
}
/*
Simple example: called once
*/


阅读量:2046

点赞量:0

收藏量:0