Boost ASIO(Asynchronous I/O)是一个用于异步I/O操作的C++库,该框架提供了一种方便的方式来处理网络通信、多线程编程和异步操作。特别适用于网络应用程序的开发,从基本的网络通信到复杂的异步操作,如远程控制程序、高并发服务器等都可以使用该框架。该框架的优势在于其允许处理多个并发连接,而不必创建一个线程来管理每个连接。最重要的是ASIO是一个跨平台库,可以运行在任何支持C++的平台下。

本章笔者将介绍如何通过ASIO框架实现一个简单的异步网络套接字应用程序,该程序支持对Socket套接字的存储,默认将套接字放入到一个Map容器内,当需要使用时只需要将套接字在容器内取出并实现通信,客户端下线时则自动从Map容器内移除,通过对本章知识的学习读者可以很容易的构建一个跨平台的简单远控功能。
AsyncTcpClient 异步客户端
如下这段代码实现了一个基本的带有自动心跳检测的客户端,它可以通过异步连接与服务器进行通信,并根据不同的命令返回不同的数据。代码逻辑较为简单,但为了保证可靠性和稳定性,实际应用中需要进一步优化、处理错误和异常情况,以及增加更多的功能和安全性措施。
首先我们封装实现AsyncConnect
类,该类内主要实现两个功能,其中aysnc_connect()
方法用于实现异步连接到服务端,而port_is_open()
方法则用于验证服务器特定端口是否开放,如果开放则说明服务端还在线,不开放则说明服务端离线此处尝试等待一段时间后再次验证,在调用boost::bind()
函数绑定套接字时通过&AsyncConnect::timer_handle()
函数来设置一个超时等待时间。
进入到主函数中,首先程序通过while
循环让程序保持持续运行,并通过hander.aysnc_connect(ep, 5000)
每隔5秒验证是否与服务端连接成功,如果连接了则进入内循环,在内循环中通过hander.port_is_open("127.0.0.1", 10000, 5000)
验证特定端口是否开放,这主要是为了保证服务端断开后客户端依然能够跳转到外部循环继续等待服务端上线。而当客户端与服务端建立连接后则会持续在内循环中socket.read_some()
接收服务端传来的特定命令,以此来执行不同的操作。
#define BOOST_BIND_GLOBAL_PLACEHOLDERS #include <iostream> #include <string> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/array.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/noncopyable.hpp>
using namespace std; using boost::asio::ip::tcp;
class AsyncConnect { public: AsyncConnect(boost::asio::io_service& ios, tcp::socket &s) :io_service_(ios), timer_(ios), socket_(s) {}
bool aysnc_connect(const tcp::endpoint &ep, int million_seconds) { bool connect_success = false;
socket_.async_connect(ep, boost::bind(&AsyncConnect::connect_handle, this, _1, boost::ref(connect_success)));
timer_.expires_from_now(boost::posix_time::milliseconds(million_seconds)); bool timeout = false;
timer_.async_wait(boost::bind(&AsyncConnect::timer_handle, this, _1, boost::ref(timeout))); do { io_service_.run_one(); } while (!timeout && !connect_success); timer_.cancel(); return connect_success; }
bool port_is_open(std::string address, int port, int timeout) { try { boost::asio::io_service io; tcp::socket socket(io); AsyncConnect hander(io, socket); tcp::endpoint ep(boost::asio::ip::address::from_string(address), port); if (hander.aysnc_connect(ep, timeout)) { io.run(); io.reset(); return true; } else { return false; } } catch (...) { return false; } }
private: void connect_handle(boost::system::error_code ec, bool &connect_success) { if (!ec) { connect_success = true; } }
void timer_handle(boost::system::error_code ec, bool &timeout) { if (!ec) { socket_.close(); timeout = true; } } boost::asio::io_service &io_service_; boost::asio::deadline_timer timer_; tcp::socket &socket_; };
int main(int argc, char * argv[]) { try { boost::asio::io_service io; tcp::socket socket(io); AsyncConnect hander(io, socket); boost::system::error_code error; tcp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 10000);
go_: while (1) { if (hander.aysnc_connect(ep, 5000)) { io.run(); std::cout << "已连接到服务端." << std::endl;
while (1) { bool is_open = hander.port_is_open("127.0.0.1", 10000, 5000);
boost::array<char, 4096> buffer = { 0 };
if (is_open == true) { socket.read_some(boost::asio::buffer(buffer), error);
if (strncmp(buffer.data(), "GetCPU", strlen("GetCPU")) == 0) { std::cout << "获取CPU参数并返回给服务端." << std::endl; socket.write_some(boost::asio::buffer("CPU: 15 %")); }
if (strncmp(buffer.data(), "GetMEM", strlen("GetMEM")) == 0) { std::cout << "获取MEM参数并返回给服务端." << std::endl; socket.write_some(boost::asio::buffer("MEM: 78 %")); }
if (strncmp(buffer.data(), "Exit", strlen("Exit")) == 0) { std::cout << "终止客户端." << std::endl; return 0; } } else { goto go_; } } } else { std::cout << "连接失败,正在重新连接." << std::endl; } } } catch (...) { return false; }
std::system("pause"); return 0; }
|
AsyncTcpServer 异步服务端
接着我们来实现异步TCP服务器,首先我们需要封装实现CAsyncTcpServer
类,该类使用了多线程来支持异步通信,每个客户端连接都会创建一个CTcpConnection
类的实例来处理具体的通信操作,该服务器类在连接建立、数据传输和连接断开时,都会通过事件处理器来通知相关操作,以支持服务器端的业务逻辑。其头文件声明如下所示;
#ifdef _MSC_VER #define BOOST_BIND_GLOBAL_PLACEHOLDERS #define _WIN32_WINNT 0x0601 #define _CRT_SECURE_NO_WARNINGS #endif
#pragma once #include <thread> #include <array> #include <boost\bind.hpp> #include <boost\noncopyable.hpp> #include <boost\asio.hpp> #include <boost\asio\placeholders.hpp>
using namespace boost::asio; using namespace boost::asio::ip; using namespace boost::placeholders; using namespace std;
class CTcpConnection { public: CTcpConnection(io_service& ios, int clientId) : m_socket(ios), m_clientId(clientId){} ~CTcpConnection(){}
int m_clientId; tcp::socket m_socket; array<BYTE, 16 * 1024> m_buffer; };
typedef shared_ptr<CTcpConnection> TcpConnectionPtr;
class CAsyncTcpServer { public: class IEventHandler { public: IEventHandler(){} virtual ~IEventHandler(){} virtual void ClientConnected(int clientId) = 0; virtual void ClientDisconnect(int clientId) = 0; virtual void ReceiveData(int clientId, const BYTE* data, size_t length) = 0; }; public: CAsyncTcpServer(int maxClientNumber, int port); ~CAsyncTcpServer(); void AddEventHandler(IEventHandler* pHandler){ m_EventHandlers.push_back(pHandler); }
void Send(int clientId, const BYTE* data, size_t length); string GetRemoteAddress(int clientId); string GetRemotePort(int clientId);
private: void bind_hand_read(CTcpConnection* client); void handle_accept(const boost::system::error_code& error); void handle_read(CTcpConnection* client, const boost::system::error_code& error, size_t bytes_transferred);
private: thread m_thread; io_service m_ioservice; io_service::work m_work; tcp::acceptor m_acceptor; int m_maxClientNumber; int m_clientId; TcpConnectionPtr m_nextClient; map<int, TcpConnectionPtr> m_clients; vector<IEventHandler*> m_EventHandlers; };
|
接着来实现AsyncTcpServer
头文件中的功能函数,此功能函数的实现如果读者不明白原理可自行将其提交给ChatGPT解析,这里就不再解释功能了。
#include "AsyncTcpServer.h"
CAsyncTcpServer::CAsyncTcpServer(int maxClientNumber, int port) : m_ioservice() , m_work(m_ioservice) , m_acceptor(m_ioservice) , m_maxClientNumber(maxClientNumber) , m_clientId(0) { m_thread = thread((size_t(io_service::*)())&io_service::run, &m_ioservice); m_nextClient = make_shared<CTcpConnection>(m_ioservice, m_clientId); m_clientId++;
tcp::endpoint endpoint(tcp::v4(), port); m_acceptor.open(endpoint.protocol()); m_acceptor.set_option(tcp::acceptor::reuse_address(true)); m_acceptor.bind(endpoint); m_acceptor.listen();
m_acceptor.async_accept(m_nextClient->m_socket, boost::bind(&CAsyncTcpServer::handle_accept, this, boost::asio::placeholders::error)); }
CAsyncTcpServer::~CAsyncTcpServer() { for (map<int, TcpConnectionPtr>::iterator it = m_clients.begin(); it != m_clients.end(); ++it) { it->second->m_socket.close(); } m_ioservice.stop(); m_thread.join(); }
void CAsyncTcpServer::Send(int clientId, const BYTE* data, size_t length) { map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId); if (it == m_clients.end()) { return; } it->second->m_socket.write_some(boost::asio::buffer(data, length)); }
string CAsyncTcpServer::GetRemoteAddress(int clientId) { map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId); if (it == m_clients.end()) { return "0.0.0.0"; } std::string remote_address = it->second->m_socket.remote_endpoint().address().to_string(); return remote_address; }
string CAsyncTcpServer::GetRemotePort(int clientId) { map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId); char ref[32] = { 0 }; if (it == m_clients.end()) { return "*"; } unsigned short remote_port = it->second->m_socket.remote_endpoint().port(); std::string str = _itoa(remote_port, ref, 10); return str; }
void CAsyncTcpServer::handle_accept(const boost::system::error_code& error) { if (!error) { if (m_maxClientNumber > 0 && m_clients.size() >= m_maxClientNumber) { m_nextClient->m_socket.close(); } else { for (int i = 0; i < m_EventHandlers.size(); ++i) { m_EventHandlers[i]->ClientConnected(m_nextClient->m_clientId); }
bind_hand_read(m_nextClient.get());
m_clients.insert(make_pair(m_nextClient->m_clientId, m_nextClient));
m_nextClient = make_shared<CTcpConnection>(m_ioservice, m_clientId); m_clientId++; } }
m_acceptor.async_accept(m_nextClient->m_socket, boost::bind(&CAsyncTcpServer::handle_accept, this, boost::asio::placeholders::error)); }
void CAsyncTcpServer::bind_hand_read(CTcpConnection* client) { client->m_socket.async_read_some(boost::asio::buffer(client->m_buffer), boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); return;
client->m_socket.async_receive(boost::asio::buffer(client->m_buffer), boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
boost::asio::async_read(client->m_socket, boost::asio::buffer(client->m_buffer), boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); }
void CAsyncTcpServer::handle_read(CTcpConnection* client, const boost::system::error_code& error, size_t bytes_transferred) { if (!error) { for (int i = 0; i < m_EventHandlers.size(); ++i) { m_EventHandlers[i]->ReceiveData(client->m_clientId, client->m_buffer.data(), bytes_transferred); } bind_hand_read(client); } else { for (int i = 0; i < m_EventHandlers.size(); ++i) { m_EventHandlers[i]->ClientDisconnect(client->m_clientId); } m_clients.erase(client->m_clientId); } }
|
AsyncTcpServer 类调用
服务端首先定义CEventHandler
类并继承自CAsyncTcpServer::IEventHandler
接口,该类内需要我们实现三个方法,方法ClientConnected
用于在客户端连接时触发,方法ClientDisconnect
则是在登录客户端离开时触发,而当客户端有数据发送过来时则ReceiveData
方法则会被触发。
方法ClientConnected
当被触发时自动将clientId
客户端Socket套接字放入到tcp_client_id
全局容器内存储起来,而当ClientDisconnect
客户端退出时,则直接遍历这个迭代容器,找到序列号并通过tcp_client_id.erase
将其剔除;
virtual void ClientConnected(int clientId) { tcp_client_id.push_back(clientId); }
virtual void ClientDisconnect(int clientId) { vector<int>::iterator item = find(tcp_client_id.begin(), tcp_client_id.end(), clientId); if (item != tcp_client_id.cend()) tcp_client_id.erase(item); }
|
而ReceiveData
一旦收到数据,则直接将其打印输出到屏幕,即可实现客户端参数接收的目的;
virtual void ReceiveData(int clientId, const BYTE* data, size_t length) { std::cout << std::endl; PrintLine(80); std::cout << data << std::endl; PrintLine(80); std::cout << "[Shell] # "; }
|
相对于接收数据而言,发送数据则是通过同步的方式进行,当我们需要发送数据时,只需要将数据字符串放入到一个BYTE*
字节数组中,并在调用tcpServer.Send
时将所需参数,套接字ID,缓冲区Buf数据,以及长度传递即可实现将数据发送给指定的客户端;
void send_message(CAsyncTcpServer& tcpServer, int clientId, std::string message, int message_size) { BYTE* buf = new BYTE(message_size + 1); memset(buf, 0, message_size + 1);
for (int i = 0; i < message_size; i++) { buf[i] = message.at(i); } tcpServer.Send(clientId, buf, message_size); }
|
客户端完整代码如下所示,运行客户端后读者可自行使用不同的命令来接收参数返回值;
#include "AsyncTcpServer.h" #include <string> #include <vector> #include <iostream> #include <boost/tokenizer.hpp>
using namespace std;
std::vector<int> tcp_client_id;
void PrintLine(int line) { for (int x = 0; x < line; x++) { printf("-"); } printf("\n"); }
class CEventHandler : public CAsyncTcpServer::IEventHandler { public: virtual void ClientConnected(int clientId) { tcp_client_id.push_back(clientId); }
virtual void ClientDisconnect(int clientId) { vector<int>::iterator item = find(tcp_client_id.begin(), tcp_client_id.end(), clientId); if (item != tcp_client_id.cend()) tcp_client_id.erase(item); }
virtual void ReceiveData(int clientId, const BYTE* data, size_t length) { std::cout << std::endl; PrintLine(80); std::cout << data << std::endl; PrintLine(80); std::cout << "[Shell] # "; } };
void send_message(CAsyncTcpServer& tcpServer, int clientId, std::string message, int message_size) { BYTE* buf = new BYTE(message_size + 1); memset(buf, 0, message_size + 1);
for (int i = 0; i < message_size; i++) { buf[i] = message.at(i); } tcpServer.Send(clientId, buf, message_size); }
int main(int argc, char* argv[]) { CAsyncTcpServer tcpServer(10, 10000); CEventHandler eventHandler; tcpServer.AddEventHandler(&eventHandler); std::string command;
while (1) { std::cout << "[Shell] # "; std::getline(std::cin, command);
if (command.length() == 0) { continue; } else if (command == "help") { printf(" _ ____ _ _ \n"); printf("| | _ _ / ___| ___ ___| | _____| |_ \n"); printf("| | | | | | \\___ \\ / _ \\ / __| |/ / _ \\ __| \n"); printf("| |__| |_| | ___) | (_) | (__| < __/ |_ \n"); printf("|_____\\__, | |____/ \\___/ \\___|_|\\_\\___|\\__| \n"); printf(" |___/ \n\n"); printf("Usage: LySocket \t PowerBy: LyShark.com \n"); printf("Optional: \n\n"); printf("\t ShowSocket 输出所有Socket容器 \n"); printf("\t GetCPU 获取CPU数据 \n"); printf("\t GetMemory 获取内存数据 \n"); printf("\t Exit 退出客户端 \n\n"); } else { boost::char_separator<char> sep(", --"); typedef boost::tokenizer<boost::char_separator<char>> CustonTokenizer; CustonTokenizer tok(command, sep);
std::vector<std::string> vecSegTag; for (CustonTokenizer::iterator beg = tok.begin(); beg != tok.end(); ++beg) { vecSegTag.push_back(*beg); } if (vecSegTag.size() == 1 && vecSegTag[0] == "ShowSocket") { PrintLine(80); printf("客户ID \t 客户IP地址 \t 客户端口 \n"); PrintLine(80); for (int x = 0; x < tcp_client_id.size(); x++) { std::cout << tcp_client_id[x] << " \t " << tcpServer.GetRemoteAddress(tcp_client_id[x]) << " \t " << tcpServer.GetRemotePort(tcp_client_id[x]) << std::endl; } PrintLine(80); }
if (vecSegTag.size() == 3 && vecSegTag[0] == "GetCPU") { char *id = (char *)vecSegTag[2].c_str(); send_message(tcpServer, atoi(id), "GetCPU", strlen("GetCPU")); }
if (vecSegTag.size() == 3 && vecSegTag[0] == "GetMemory") { char* id = (char*)vecSegTag[2].c_str(); send_message(tcpServer, atoi(id), "GetMEM", strlen("GetMEM")); }
if (vecSegTag.size() == 3 && vecSegTag[0] == "Exit") { char* id = (char*)vecSegTag[2].c_str(); send_message(tcpServer, atoi(id), "Exit", strlen("Exit")); } } } return 0; }
|
案例演示
首先运行服务端程序,接着运行多个客户端,即可实现自动上线;

当用户需要通信时,只需要指定id序号到指定的Socket套接字编号即可;
