Boost库为C++提供了强大的支持,尤其在多线程和网络编程方面。其中,Boost.Asio库是一个基于前摄器设计模式的库,用于实现高并发和网络相关的开发。Boost.Asio核心类是io_service
,它相当于前摄模式下的Proactor
角色。所有的IO操作都需要通过io_service
来实现。
在异步模式下,程序除了发起IO操作外,还需要定义一个用于回调的完成处理函数。io_service
将IO操作交给操作系统执行,但它不同步等待,而是立即返回。调用io_service
的run
成员函数可以等待异步操作完成。当异步操作完成时,io_service
会从操作系统获取结果,再调用相应的处理函数(handler)来处理后续逻辑。
这种异步模型的优势在于它能够更有效地利用系统资源,避免线程阻塞,提高程序的并发性能。Boost.Asio的设计让开发者能够以高效的方式开发跨平台的并发网络应用,使C++在这方面能够与类似Java等语言相媲美。
ASIO异步定时器
boost::asio::deadline_timer
是 Boost.Asio 库中用于处理定时器的类。它允许你在一段时间后或在指定的时间点触发回调函数。deadline_timer
通常与 io_service
配合使用,以实现异步定时器功能。
以下是 boost::asio::deadline_timer
的一些重要概念和方法:
构造函数: deadline_timer
的构造函数通常需要一个 io_service
对象和一个时间参数。时间参数可以是相对时间(相对于当前时间的一段时间间隔)或绝对时间(具体的时刻)。
cppCopy codeboost::asio::io_service io_service; boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(5));
|
expires_from_now
方法: 通过调用 expires_from_now
方法,可以设置相对于当前时间的时间间隔,来定义定时器的到期时间。
cppCopy code timer.expires_from_now(boost::posix_time::seconds(10));
|
expires_at
方法: 通过调用 expires_at
方法,可以设置定时器的到期时间为一个具体的时刻。
cppCopy codeboost::posix_time::ptime expiryTime = boost::posix_time::second_clock::local_time() + boost::posix_time::seconds(10); timer.expires_at(expiryTime);
|
async_wait
方法: async_wait
方法用于启动异步等待定时器的到期。它接受一个回调函数作为参数,该回调函数将在定时器到期时被调用。
cppCopy codevoid timerCallback(const boost::system::error_code& ) { std::cout << "Timer expired!" << std::endl; }
timer.async_wait(boost::bind(timerCallback, boost::asio::placeholders::error));
|
取消定时器: 你可以通过调用 cancel
方法来取消定时器,以停止它在到期时触发回调函数。
cppCopy code timer.cancel();
|
boost::asio::deadline_timer
提供了一种灵活和强大的方式来处理异步定时器操作,使得你可以方便地执行定时任务、调度操作或执行周期性的工作。
#include <iostream> #include <boost/asio.hpp>
using namespace std; using namespace boost::asio;
void handler(const boost::system::error_code &ec) { cout << "hello lyshark A" << endl; }
void handler2(const boost::system::error_code &ec) { cout << "hello lyshark B" << endl; }
int main(int argc,char *argv) { boost::asio::io_service service;
boost::asio::deadline_timer timer(service, boost::posix_time::seconds(5)); timer.async_wait(handler);
boost::asio::deadline_timer timer2(service, boost::posix_time::seconds(10)); timer2.async_wait(handler2);
service.run();
std::system("pause"); return 0; }
|
上述代码运行后,会分别间隔5秒及10秒,用来触发特定的handler
函数,效果如下图所示;
在 Boost.Asio 中,io_service::run()
是一个关键的方法,它用于运行 I/O 服务的事件循环。通常,run()
方法会一直运行,直到没有更多的工作需要完成,即直到没有未完成的异步操作。
如果多个异步函数同时调用同一个 io_service
的 run()
方法,可以考虑将 run()
方法单独摘出来,以便在线程函数中多次调用。
#include <iostream> #include <boost/asio.hpp> #include <boost/thread.hpp>
using namespace std; using namespace boost::asio;
void handler(const boost::system::error_code &ec) { cout << "hello lyshark A" << endl; }
void handler2(const boost::system::error_code &ec) { cout << "hello lyshark B" << endl; }
boost::asio::io_service service;
void run() { service.run(); }
int main(int argc,char *argv) { boost::asio::deadline_timer timer(service, boost::posix_time::seconds(5)); timer.async_wait(handler);
boost::asio::deadline_timer timer2(service, boost::posix_time::seconds(10)); timer2.async_wait(handler2);
boost::thread thread1(run); boost::thread thread2(run);
thread1.join(); thread2.join();
std::system("pause"); return 0; }
|
上述代码的运行效果与第一个案例一致,唯一的不同在于,该案例中我们通过boost::thread
分别启动了两个线程,并通过join()
分别等待这两个线程的执行结束,让异步与线程分离。
通过多次触发计时器,实现重复计时器功能,如下代码使用 Boost.Asio 实现了一个异步定时器的例子。
该程序定义了一个计数器 count
,并创建了一个 steady_timer
对象 io_timer
,设置其到期时间为 1 秒。然后,通过 io_timer.async_wait
启动了一个异步等待操作,该操作在计时器到期时调用 print
函数。
在 print
函数中,首先判断计数器是否小于 5,如果是,则输出计数器的值,并将计时器的到期时间延迟 1 秒。然后,再次启动新的异步等待操作,递归调用 print
函数。当计数器达到 5 时,停止了 io
对象,这会导致 io.run()
返回,程序退出。
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp>
void print(const boost::system::error_code &,boost::asio::steady_timer * io_timer, int * count) { if (*count < 5) { std::cout << "Print函数计数器: " << *count << std::endl; ++(*count); io_timer->expires_at(io_timer->expiry() + boost::asio::chrono::seconds(1)); io_timer->async_wait(boost::bind(print,boost::asio::placeholders::error, io_timer, count)); } }
int main(int argc, char *argv) { boost::asio::io_context io; int count = 0; boost::asio::steady_timer io_timer(io, boost::asio::chrono::seconds(1));
io_timer.async_wait(boost::bind(print, boost::asio::placeholders::error, &io_timer, &count));
io.run(); std::cout << "循环已跳出,总循环次数: " << count << std::endl;
std::system("pause"); return 0; }
|
运行上述代码,输出效果如下图所示,通过计数器循环执行特定次数并输出,每次间隔为1秒。
与之前的代码相比,如下所示的版本使用了一个类 print
来封装定时器操作。
与之前版本相比的主要不同点:
- 类的引入: 引入了
print
类,将定时器和计数器等相关的操作封装到了一个类中,提高了代码的封装性和可读性。
- 构造函数和析构函数: 在
print
类中使用构造函数初始化 timer_
定时器,而在析构函数中打印最终循环次数。这样的设计使得对象的创建和销毁分别与初始化和清理相关的操作关联起来。
- 成员函数
run_print
: 使用了成员函数 run_print
作为定时器回调函数,无需再使用 boost::bind
绑定 this
指针,直接使用类的成员变量,提高了代码的简洁性。
- 对象的创建和运行: 在
main
函数中,直接创建了 print
对象 ptr
,并通过 io.run()
来运行异步操作,无需手动调用 async_wait
。这种方式更加面向对象,将异步操作和对象的生命周期绑定在一起。
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp>
class print { private: boost::asio::steady_timer timer_; int count_;
public: print(boost::asio::io_context& io) : timer_(io, boost::asio::chrono::seconds(1)), count_(0) { timer_.async_wait(boost::bind(&print::run_print, this)); }
~print() { std::cout << "循环已跳出,总循环次数: " << count_ << std::endl; }
void run_print() { if (count_ < 5) { std::cout << "Print函数计时器: " << count_ << std::endl; ++count_;
timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1)); timer_.async_wait(boost::bind(&print::run_print, this)); } } };
int main(int argc, char *argv) { boost::asio::io_context io; print ptr(io); io.run();
std::system("pause"); return 0; }
|
这个输出效果与之前基于过程的保持一致,其他的并无差异;
如下版本的代码相对于之前的版本引入了 io_context::strand
来保证定时器回调函数的串行执行,避免了多个线程同时执行 print1
和 print2
导致的竞态条件。
与之前版本相比的主要不同点:
io_context::strand
的引入: 引入了 io_context::strand
对象 strand_
,用于确保 print1
和 print2
的回调函数在同一线程内按序执行。io_context::strand
在多线程环境中提供了同步操作,确保绑定到 strand_
上的操作不会同时执行。
bind_executor
的使用: 在 async_wait
中使用了 boost::asio::bind_executor
函数,将定时器的回调函数与 strand_
绑定,保证了异步操作的执行在 strand_
内。这样可以确保 print1
和 print2
不会在不同线程中同时执行。
- 多线程运行
io_context
: 引入了两个子线程 t
和 t1
,分别调用 io_context::run
来运行 io_context
。这样可以使 io_context
在两个独立的线程中运行,增加了并发性。
- 线程的 Join: 在
main
函数中,通过 t.join()
和 t1.join()
等待两个子线程执行完成后再退出程序。这样确保了 main
函数在所有线程都完成后再结束。
总体而言,这个版本通过引入 io_context::strand
以及多线程运行 io_context
,解决了异步操作可能导致的竞态条件,增强了程序的并发性。
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp>
class print { public: print(boost::asio::io_context& io) : strand_(io), timer1_(io, boost::asio::chrono::seconds(1)), timer2_(io, boost::asio::chrono::seconds(1)), count_(0) { timer1_.async_wait(boost::asio::bind_executor(strand_,boost::bind(&print::print1, this))); timer2_.async_wait(boost::asio::bind_executor(strand_,boost::bind(&print::print2, this))); }
void print1() { if (count_ < 10) { std::cout << "Print 1: " << count_ << std::endl; ++count_;
timer1_.expires_at(timer1_.expiry() + boost::asio::chrono::seconds(1)); timer1_.async_wait(boost::asio::bind_executor(strand_,boost::bind(&print::print1, this))); } }
void print2() { if (count_ < 10) { std::cout << "Print 2: " << count_ << std::endl; ++count_;
timer2_.expires_at(timer2_.expiry() + boost::asio::chrono::seconds(1)); timer2_.async_wait(boost::asio::bind_executor(strand_,boost::bind(&print::print2, this))); } }
private: boost::asio::io_context::strand strand_; boost::asio::steady_timer timer1_; boost::asio::steady_timer timer2_; int count_; };
int main(int argc,char *argv[]) { boost::asio::io_context io; print ptr(io); boost::thread t(boost::bind(&boost::asio::io_context::run, &io)); boost::thread t1(boost::bind(&boost::asio::io_context::run, &io));
io.run(); t.join(); t1.join();
std::system("pause"); return 0; }
|
输出效果如下图所示;
ASIO异步网络通信
异步通信的原理与同步通信不同,主要体现在程序对IO请求的处理上。在异步状态下,程序发起IO请求后会立即返回,无需等待IO操作完成。无论IO操作成功还是失败,程序都可以继续执行其他任务,不会被阻塞。当IO请求被执行完成后,系统会通过回调函数的方式通知调用者,使其能够获取操作的状态或结果。
这种异步通信的机制带来了一些优势:
- 提高并发性: 在异步模式下,程序在等待IO操作完成的过程中不会阻塞,可以继续执行其他任务,充分利用了宝贵的CPU时间。这使得程序更容易实现高并发,同时处理多个IO操作。
- 节省时间: 由于程序不需要等待IO操作完成,可以更加高效地利用时间。在同步模式下,程序必须等待每个IO操作的完成,而在异步模式下,可以在等待的时间内执行其他任务,提高了整体效率。
- 提高系统响应性: 异步通信使得程序能够更灵活地响应IO事件,及时处理完成的IO操作。这对于需要快速响应用户请求的系统非常重要,如网络通信、图形用户界面等。
- 减少资源浪费: 在异步模式下,程序可以通过回调函数获取IO操作的结果,而无需通过轮询或其他方式一直等待。这减少了对系统资源的浪费,提高了系统的效率。
异步通信的原理在于通过非阻塞的方式发起IO请求,充分利用等待IO完成的时间,通过回调函数的方式获取IO操作的结果,以提高程序的并发性、响应性和效率。
使用Boost.Asio库实现简单的异步TCP服务器。
对代码的主要分析:
- IOService 结构体:
- 该结构体负责管理
io_service
和 acceptor
。
- 构造函数初始化
io_service
和 acceptor
对象。acceptor
用于监听连接请求。
start()
函数启动异步等待连接操作,当有客户端连接请求时,触发 accept_handler
。
- start() 函数:
- 在
start()
函数中,通过 async_accept
异步等待连接请求,当有客户端连接请求时,会触发 accept_handler
函数。
- 创建了一个新的
tcp::socket
对象,并使用 async_accept
异步等待连接请求。
accept_handler
函数被绑定,负责处理连接成功后的操作。
- accept_handler 函数:
- 当有客户端连接成功时,该函数会被调用。
- 递归调用
start()
,以便继续等待新的连接请求。
- 输出远程客户端的IP地址。
- 创建一个字符串指针
pstr
,并发送 “hello lyshark” 给客户端。
- write_handler 函数:
- 当异步写操作完成时,该函数被调用。
- 输出已发送的信息。
- main 函数:
- 创建了一个
io_service
对象和 IOService
对象 server
。
- 调用
server.start()
启动服务器。
- 调用
io.run()
启动 IO 服务,使其保持运行状态,直到所有异步操作完成。
整体而言,这个程序通过异步的方式接受客户端连接,并在连接建立后异步发送消息给客户端。使用 Boost.Asio 提供的异步操作可以实现高效的并发网络编程。
#include <iostream> #include <string> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/smart_ptr.hpp>
using namespace boost::asio; using boost::system::error_code; using ip::tcp;
struct IOService { IOService(io_service &io) :m_iosev(io), m_acceptor(io, tcp::endpoint(tcp::v4(), 80)) { std::cout << "执行构造函数" << std::endl; } void start() { boost::shared_ptr<tcp::socket> psocket(new tcp::socket(m_iosev)); m_acceptor.async_accept(*psocket,boost::bind(&IOService::accept_handler, this, psocket, _1)); }
void accept_handler(boost::shared_ptr<tcp::socket> psocket, error_code ec) { if (ec) return;
start(); std::cout << "远端IP: " << psocket->remote_endpoint().address() << std::endl; boost::shared_ptr<std::string> pstr(new std::string("hello lyshark"));
psocket->async_write_some(buffer(*pstr),boost::bind(&IOService::write_handler, this, pstr, _1, _2)); }
void write_handler(boost::shared_ptr<std::string> pstr,error_code ec, size_t bytes_transferred) { if (!ec) std::cout << *pstr << " 已发送" << std::endl; }
private: io_service &m_iosev; ip::tcp::acceptor m_acceptor; };
int main(int argc, char* argv[]) { io_service io; IOService server(io); server.start(); io.run(); return 0; }
|
客户端代码
#include <iostream> #include <string> #include <boost/asio.hpp>
using namespace boost::asio;
int main(int argc, char *argv[]) { io_service io_service; ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), 1000); ip::tcp::socket socket(io_service); socket.connect(ep);
char buffer[1024] = { 0 }; socket.read_some(boost::asio::buffer(buffer)); std::cout << buffer << std::endl;
std::system("pause"); return 0; }
|