Boost 库是一个由C/C++语言的开发者创建并更新维护的开源类库,其提供了许多功能强大的程序库和工具,用于开发高质量、可移植、高效的C应用程序。Boost库可以作为标准C库的后备,通常被称为准标准库,是C标准化进程的重要开发引擎之一。使用Boost库可以加速C应用程序的开发过程,提高代码质量和性能,并且可以适用于多种不同的系统平台和编译器。Boost库已被广泛应用于许多不同领域的C++应用程序开发中,如网络应用程序、图像处理、数值计算、多线程应用程序和文件系统处理等。
C++语言并没有对多线程与网络的良好支持,虽然新的C++标准加入了基本的thread
库,但是对于并发编程的支持仍然很基础,Boost库提供了数个用于实现高并发与网络相关的开发库这让我们在开发跨平台并发网络应用时能够像Java等语言一样高效开发。
thread库为C++增加了多线程处理能力,其主要提供了清晰的,互斥量,线程,条件变量等,可以很容易的实现多线程应用开发,而且该库是可跨平台的,并且支持POSIX
和Windows
线程。
7.1 互斥锁 互斥锁通过在访问共享资源的线程之间进行通信来避免并发问题。互斥锁仅允许一个线程在任何给定时间点上访问共享资源。如果已经有一个线程锁定了互斥锁,则任何其他线程都必须等待锁被释放。一旦锁被释放,等待队列中的一个线程将被允许继续其工作。
Boost库中的 boost::mutex 类型表示一个互斥锁。它提供了两个主要函数来控制互斥锁:lock() 和 unlock()。当一个线程想要访问一个共享资源时,它会调用互斥锁的 lock() 函数来获取锁,如果无法获得,线程将最多等待直到锁被释放。在线程访问完共享资源后,它需要调用 unlock() 函数来释放锁,以便其他线程可以获得锁并访问共享资源。
互斥体是用于线程同步的一种手段,其主要用于在多线程环境下,防止多个线程同时操作共享资源,当某线程被锁,其他线程则需要等待它解锁后才能继续访问共享资源。
thread提供了6种互斥类型,但常用的只有3种:
mutex 独占互斥锁
recursive_mutex 递归互斥锁
shared_mutex 读写锁
通常我们会使用Mutex来保护共享资源,防止在多线程环境中数据的不一致性,当一个资源被锁定,其他线程只能阻塞等待释放后才可继续操作。
#define BOOST_THREAD_VERSION 5 #include <iostream> #include <boost/thread/thread_guard.hpp> using namespace std ; using namespace boost; void MutexA () { boost::mutex mutex; try { mutex.lock(); mutex.unlock(); } catch (...) { mutex.unlock(); } } void MutexB () { boost::mutex mutex; boost::lock_guard<boost::mutex> global_mutex (mutex) ; }
在Boost中创建多线程非常简单,只需要定义一个MyThread
线程函数,并在主函数中开启线程即可实现。
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> using namespace std ; boost::mutex io_mutex; void MyThread (int id) { for (int i = 1 ; i < 10 ; ++i) { boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << id << ": " << i << std ::endl ; } } int main (int argc, char *argv[]) { boost::thread thrd1 (boost::bind(&MyThread, 1 )) ; boost::thread thrd2 (boost::bind(&MyThread, 2 )) ; thrd1.interrupt(); cout << "线程ID:" << thrd1.get_id() << endl ; thrd1.join(); thrd2.join(); thrd1.timed_join(boost::posix_time::seconds(3 )); thrd2.timed_join(boost::posix_time::seconds(3 )); std ::system("pause" ); return 0 ; }
7.2 线程局部存储 Boost库中提供了线程局部存储(Thread Local Storage,简称TLS)的支持,可以让程序中的每个线程都拥有独立的数据空间,互相之间不会受到干扰。这对于一些线程之间需要共享数据,但需要保证数据安全的场景非常有用,例如线程池等。
有时候函数使用了局部静态变量或全局变量,导致无法用于多线程环境,因为无法保证变量在多线程环境下重入的正确操作。
#include <iostream> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/tss.hpp> using namespace std ; boost::mutex io_mutex; boost::thread_specific_ptr<int > ptr; struct MyThread { MyThread(int id) :id(id){} void operator () () { if (ptr.get() == 0 ) ptr.reset(new int (0 )); for (int x = 0 ; x <10 ; ++x) { (*ptr)++; boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << "当前ID: " << id << " 本地存储数值: " << *ptr << std ::endl ; } } public: int id; }; int main (int argc, char *argv[]) { boost::thread thrd1 (MyThread(1 )) ; boost::thread thrd2 (MyThread(2 )) ; thrd1.join(); thrd2.join(); std ::system("pause" ); return 0 ; }
如果本地存储的类型是一个结构体,如下定义了MyStruct
本地结构体,来实现本地数据累加。
#include <iostream> #include <string> #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/tss.hpp> using namespace std ; boost::mutex io_mutex; typedef struct MyStruct { int uid; std ::string uname; MyStruct(int x, std ::string y) { uid = x; uname = y; } }MyStruct; boost::thread_specific_ptr<MyStruct> ptr; struct MyThread { MyThread(int id) :id(id){} void operator () () { if (ptr.get() == 0 ) ptr.reset(new MyStruct(0 ,"lyshark" )); for (int x = 0 ; x <10 ; ++x) { (*ptr).uid = (*ptr).uid + 1 ; (*ptr).uname = "lyshark" ; boost::mutex::scoped_lock lock (io_mutex) ; std ::cout << "当前ID: " << id << " 本地存储数值: " << (*ptr).uid << "本地存储名字: " << (*ptr).uname << std ::endl ; } } public: int id; }; int main (int argc, char *argv[]) { boost::thread thrd1 (MyThread(1 )) ; boost::thread thrd2 (MyThread(2 )) ; thrd1.join(); thrd2.join(); std ::system("pause" ); return 0 ; }
7.3 使用线程组 线程组thread_group
用于管理一组线程,就像线程池一样,其内部使用了std::list<thread*>
来容纳每个线程对象。
当需要创建新线程时,使用create_thread()
工厂函数,并通过bind
绑定传递参数即可实现创建,如下是最简单的线程组创建。
#include <iostream> #include <boost/thread.hpp> #include <boost/function.hpp> #include <boost/bind.hpp> using namespace std ; boost::mutex io_mutex; void MyThread (int x, string str) { try { boost::this_thread::sleep(boost::posix_time::seconds(2 )); for (int i = 0 ; i < x; i++) { boost::mutex::scoped_lock lock (io_mutex) ; cout << "输出字符串: " << str << " 计次: " << i << endl ; } } catch (boost::thread_interrupted&) { cout << "thread is interrupt" << endl ; } } int main (int argc,char *argv[]) { boost::thread_group group; for (int x = 0 ; x < 10 ; x++) { group.create_thread(boost::bind(MyThread, x, "hello lyshark" )); } cout << "当前线程数量: " << group.size() << endl ; group.join_all(); std ::system("pause" ); return 0 ; }
我们还可以通过add_thread
和remove_thread
将特定的线程对象放入到不同的线程组中,来实现对线程的批量操作。
#include <iostream> #include <string> #include <boost/thread.hpp> #include <boost/function.hpp> #include <boost/bind.hpp> using namespace std ; typedef struct MyStruct { int uuid; std ::string uname; }MyStruct; boost::mutex io_mutex; void MyThread (MyStruct ptr) { try { for (int i = 0 ; i < 5 ; i++) { boost::mutex::scoped_lock lock (io_mutex) ; cout << "UUID: " << ptr.uuid << " UName: " << ptr.uname << endl ; } } catch (boost::thread_interrupted&) { cout << "thread is interrupt" << endl ; } } int main (int argc,char *argv[]) { MyStruct my_struct; boost::thread_group group; my_struct.uuid = 1001 ; my_struct.uname = "lyshark" ; boost::thread thrd1 (&MyThread,my_struct) ; my_struct.uuid = 1002 ; my_struct.uname = "admin" ; boost::thread thrd2 (&MyThread, my_struct) ; group.add_thread(&thrd1); group.add_thread(&thrd2); bool is_in = group.is_thread_in(&thrd1); std ::cout << "是否在组内: " << is_in << std ::endl ; group.remove_thread(&thrd1); group.remove_thread(&thrd2); group.join_all(); boost::this_thread::sleep(boost::posix_time::seconds(2 )); std ::system("pause" ); return 0 ; }
7.4 获取线程返回值 获取线程返回值,需要使用异步的方式得到,Boost中提供了ASIO
库来实现异步操作,该库采用了前摄器设计模式,实现了可移植的异步IO操作。
首先来简单的看一下,如何使用异步的方式实现创建线程的。
#define BOOST_THREAD_VERSION 5 #include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/function.hpp> using namespace std ; void MyThread (int x) { for (int i = 0 ; i < x; i++) std ::cout << i << std ::endl ; } int main (int argc, char *argv[]) { auto x = async(&MyThread, 10 ); x.wait(); async(boost::bind(MyThread, 20 )); auto y = boost::async([] { cout << "hello lyshark" << endl ; }); y.wait(); std ::system("pause" ); return 0 ; }
当我们需要获取单个线程的返回值时,可以使用valid()
方法或使用get()
将返回值从线程里拉取出来。
#define BOOST_THREAD_VERSION 5 #include <iostream> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/function.hpp> using namespace std ; int MyThread (int x, int y) { Sleep(3000 ); return x + y; } typedef struct { int x; int y; }MyStruct; MyStruct MyThreadStruct (int x, int y) { MyStruct ref; ref.x = x + 100 ; ref.y = y + 100 ; return ref; } int main (int argc, char *argv[]) { auto f = boost::async(boost::bind(MyThread, 10 , 20 )); if (f.valid()) { cout << "获取计算结果: " << f.get() << endl ; } f.wait(); auto t = boost::async(boost::bind(MyThreadStruct, 100 , 200 )); MyStruct tmp = t.get(); cout << "获取结构体参数A: " << tmp.x << " 参数B: " << tmp.y << endl ; t.wait(); std ::system("pause" ); return 0 ; }
有时候我们会一次性创建
多个线程共同执行,此时想要获取到每个线程
中的返回值
,那么就需要使用多个future
对象,代码如下。
#define BOOST_THREAD_VERSION 5 #include <iostream> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/function.hpp> using namespace std ; int MyThread (int x, int y) { Sleep(3000 ); return x + y; } int main (int argc, char *argv[]) { std ::vector <boost::future <int >> vect; for (int i = 0 ; i < 100 ; ++i) { vect.push_back(boost::async(bind(MyThread, i, i * 10 ))); } boost::wait_for_all(vect.begin(), vect.end()); for (auto &x : vect) { if (x.valid()) { cout << "线程计算结果: " << x.get() << endl ; } } std ::system("pause" ); return 0 ; }
返回数值类型如果不够存储的话,那么我们可以定义一个MyStruct
结构体,通过结构体传递参数,并将计算结果返回为结构
体类型。
#define BOOST_THREAD_VERSION 5 #include <iostream> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/function.hpp> using namespace std ; typedef struct { int x; int y; }MyStruct; MyStruct MyThreadStruct (int x, int y) { MyStruct ref; ref.x = x + 100 ; ref.y = y + 100 ; return ref; } int main (int argc, char *argv[]) { std ::vector <boost::future <MyStruct>> vec; for (int x = 0 ; x < 100 ; x++) { vec.push_back(boost::async(bind(MyThreadStruct, x, x + 10 ))); } boost::wait_for_all(vec.begin(), vec.end()); for (auto &x : vec) { MyStruct tmp = x.get(); cout << "获取计算结果A: " << tmp.x << " 获取结果B: " << tmp.y << endl ; } std ::system("pause" ); return 0 ; }
由于future
只能get
获取一次数据,使得它不能被多线程并发访问,所以就出现了shared_future
,它是future
的增强,可以线程安全的多次调用get()
获取到计算结果,修改很简单只需要将声明改一下,其他的不用动。
void Async () { std ::vector <boost::shared_future<MyStruct>> vec; for (int x = 0 ; x < 100 ; x++) { vec.push_back(boost::async(bind(MyThreadStruct, x, x + 10 )).share()); } boost::wait_for_all(vec.begin(), vec.end()); for (auto &x : vec) { MyStruct tmp = x.get(); cout << "获取计算结果: " << tmp.x << endl ; } }
7.5 共享锁 shared_mutex(共享互斥锁)是 C++11 标准库中引入的一种线程同步机制,可以实现同时有多个线程同时读取共享资源,但只能有一个线程写入共享资源的机制。与常见的互斥锁不同,shared_mutex 具有更加细致的控制对共享资源的访问权限。
该锁允许线程获取多个共享所有权和一个专享所有权,实现了读写锁机制,即多个读线程一个写线程。
#define BOOST_THREAD_VERSION 5 #include <iostream> #include <boost/bind.hpp> #include <boost/thread.hpp> #include <boost/function.hpp> using namespace std ; using namespace boost; class MyClass { private: int m_x; boost::shared_mutex rw_mutex; public: MyClass() { m_x = 0 ; } void write () { boost::unique_lock<boost::shared_mutex> g (rw_mutex) ; ++m_x; } void read (int *x) { boost::shared_lock<boost::shared_mutex> g (rw_mutex) ; *x = m_x; } }; void writer (MyClass &ptr) { for (int x = 0 ; x < 10 ; x++) { ptr.write(); } } void reader (MyClass &ptr) { int item; for (int x = 0 ; x < 10 ; x++) { ptr.read(&item); std ::cout << "读取数据: " << item << std ::endl ; } } int main (int argc, char *argv[]) { MyClass ptr; thread_group pool; pool.create_thread(boost::bind(reader, boost::ref(ptr))); pool.create_thread(boost::bind(reader, boost::ref(ptr))); pool.create_thread(boost::bind(writer, boost::ref(ptr))); pool.join_all(); std ::system("pause" ); return 0 ; }
7.6 获取线程ID号 实现线程池,每次将一个线程service_io
存入到栈中,需要时从栈中弹出并调用内部相应的函数。
#include <stack> #include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/thread.hpp> using namespace std ; using namespace boost; class ThreadPool { static int count; int NoOfThread; int counter; thread_group group; boost::mutex mutex_; asio::io_service io_service; stack <boost::thread*> thread_stack; public: ThreadPool(int num) { NoOfThread = num; counter = 0 ; boost::mutex::scoped_lock lock (mutex_) ; if (count == 0 ) count++; else return ; for (int i = 0 ; i<num; ++i) { thread_stack.push(group.create_thread(boost::bind(&asio::io_service::run, &io_service))); } } ~ThreadPool() { io_service.stop(); group.join_all(); } boost::thread* get_thread () { if (counter > NoOfThread) return NULL ; counter++; boost::thread* ptr = thread_stack.top(); thread_stack.pop(); return ptr; } int get_number () { return counter; } }; int ThreadPool::count = 0 ;int main (int argc, char * argv[]) { ThreadPool pool (10 ) ; for (int x = 0 ; x < 10 ; x++) { boost::thread* ptr = pool.get_thread(); cout << "线程ID: " << ptr->get_id() << endl ; int num = pool.get_number(); std ::cout << "计数器: " << num << std ::endl ; } std ::system("pause" ); return 0 ; }