C++ ASIO 实现异步套接字管理

Boost ASIO(Asynchronous I/O)是一个用于异步I/O操作的C++库,该框架提供了一种方便的方式来处理网络通信、多线程编程和异步操作。特别适用于网络应用程序的开发,从基本的网络通信到复杂的异步操作,如远程控制程序、高并发服务器等都可以使用该框架。该框架的优势在于其允许处理多个并发连接,而不必创建一个线程来管理每个连接。最重要的是ASIO是一个跨平台库,可以运行在任何支持C++的平台下。

本章笔者将介绍如何通过ASIO框架实现一个简单的异步网络套接字应用程序,该程序支持对Socket套接字的存储,默认将套接字放入到一个Map容器内,当需要使用时只需要将套接字在容器内取出并实现通信,客户端下线时则自动从Map容器内移除,通过对本章知识的学习读者可以很容易的构建一个跨平台的简单远控功能。

AsyncTcpClient 异步客户端

如下这段代码实现了一个基本的带有自动心跳检测的客户端,它可以通过异步连接与服务器进行通信,并根据不同的命令返回不同的数据。代码逻辑较为简单,但为了保证可靠性和稳定性,实际应用中需要进一步优化、处理错误和异常情况,以及增加更多的功能和安全性措施。

首先我们封装实现AsyncConnect类,该类内主要实现两个功能,其中aysnc_connect()方法用于实现异步连接到服务端,而port_is_open()方法则用于验证服务器特定端口是否开放,如果开放则说明服务端还在线,不开放则说明服务端离线此处尝试等待一段时间后再次验证,在调用boost::bind()函数绑定套接字时通过&AsyncConnect::timer_handle()函数来设置一个超时等待时间。

进入到主函数中,首先程序通过while循环让程序保持持续运行,并通过hander.aysnc_connect(ep, 5000) 每隔5秒验证是否与服务端连接成功,如果连接了则进入内循环,在内循环中通过hander.port_is_open("127.0.0.1", 10000, 5000)验证特定端口是否开放,这主要是为了保证服务端断开后客户端依然能够跳转到外部循环继续等待服务端上线。而当客户端与服务端建立连接后则会持续在内循环中socket.read_some()接收服务端传来的特定命令,以此来执行不同的操作。

#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/noncopyable.hpp>

using namespace std;
using boost::asio::ip::tcp;

// 异步连接地址与端口
class AsyncConnect
{
public:
AsyncConnect(boost::asio::io_service& ios, tcp::socket &s)
:io_service_(ios), timer_(ios), socket_(s) {}

// 异步连接
bool aysnc_connect(const tcp::endpoint &ep, int million_seconds)
{
bool connect_success = false;

// 异步连接,当连接成功后将触发 connect_handle 函数
socket_.async_connect(ep, boost::bind(&AsyncConnect::connect_handle, this, _1, boost::ref(connect_success)));

// 设置一个定时器 million_seconds
timer_.expires_from_now(boost::posix_time::milliseconds(million_seconds));
bool timeout = false;

// 异步等待 如果超时则执行 timer_handle
timer_.async_wait(boost::bind(&AsyncConnect::timer_handle, this, _1, boost::ref(timeout)));
do
{
// 等待异步操作完成
io_service_.run_one();
// 判断如果timeout没超时,或者是连接建立了,则不再等待
} while (!timeout && !connect_success);
timer_.cancel();
return connect_success;
}

// 验证服务器端口是否开放
bool port_is_open(std::string address, int port, int timeout)
{
try
{
boost::asio::io_service io;
tcp::socket socket(io);
AsyncConnect hander(io, socket);
tcp::endpoint ep(boost::asio::ip::address::from_string(address), port);
if (hander.aysnc_connect(ep, timeout))
{
io.run();
io.reset();
return true;
}
else
{
return false;
}
}
catch (...)
{
return false;
}
}

private:
// 如果连接成功了,则 connect_success = true
void connect_handle(boost::system::error_code ec, bool &connect_success)
{
if (!ec)
{
connect_success = true;
}
}

// 定时器超时timeout = true
void timer_handle(boost::system::error_code ec, bool &timeout)
{
if (!ec)
{
socket_.close();
timeout = true;
}
}
boost::asio::io_service &io_service_;
boost::asio::deadline_timer timer_;
tcp::socket &socket_;
};

int main(int argc, char * argv[])
{
try
{
boost::asio::io_service io;
tcp::socket socket(io);
AsyncConnect hander(io, socket);
boost::system::error_code error;
tcp::endpoint ep(boost::asio::ip::address::from_string("127.0.0.1"), 10000);

// 循环验证是否在线
go_: while (1)
{
// 验证是否连接成功,并定义超时时间为5秒
if (hander.aysnc_connect(ep, 5000))
{
io.run();
std::cout << "已连接到服务端." << std::endl;

// 循环接收命令
while (1)
{
// 验证地址端口是否开放,默认等待5秒
bool is_open = hander.port_is_open("127.0.0.1", 10000, 5000);

// 客户端接收数据包
boost::array<char, 4096> buffer = { 0 };

// 如果在线则继续执行
if (is_open == true)
{
socket.read_some(boost::asio::buffer(buffer), error);

// 判断收到的命令是否为GetCPU
if (strncmp(buffer.data(), "GetCPU", strlen("GetCPU")) == 0)
{
std::cout << "获取CPU参数并返回给服务端." << std::endl;
socket.write_some(boost::asio::buffer("CPU: 15 %"));
}

// 判断收到的命令是否为GetMEM
if (strncmp(buffer.data(), "GetMEM", strlen("GetMEM")) == 0)
{
std::cout << "获取MEM参数并返回给服务端." << std::endl;
socket.write_some(boost::asio::buffer("MEM: 78 %"));
}

// 判断收到的命令是否为终止程序
if (strncmp(buffer.data(), "Exit", strlen("Exit")) == 0)
{
std::cout << "终止客户端." << std::endl;
return 0;
}
}
else
{
// 如果连接失败,则跳转到等待环节
goto go_;
}
}
}
else
{
std::cout << "连接失败,正在重新连接." << std::endl;
}
}
}
catch (...)
{
return false;
}

std::system("pause");
return 0;
}

AsyncTcpServer 异步服务端

接着我们来实现异步TCP服务器,首先我们需要封装实现CAsyncTcpServer类,该类使用了多线程来支持异步通信,每个客户端连接都会创建一个CTcpConnection类的实例来处理具体的通信操作,该服务器类在连接建立、数据传输和连接断开时,都会通过事件处理器来通知相关操作,以支持服务器端的业务逻辑。其头文件声明如下所示;

#ifdef _MSC_VER
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#define _WIN32_WINNT 0x0601
#define _CRT_SECURE_NO_WARNINGS
#endif

#pragma once
#include <thread>
#include <array>
#include <boost\bind.hpp>
#include <boost\noncopyable.hpp>
#include <boost\asio.hpp>
#include <boost\asio\placeholders.hpp>

using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost::placeholders;
using namespace std;

// 每一个套接字连接,都自动对应一个Tcp客户端连接
class CTcpConnection
{
public:
CTcpConnection(io_service& ios, int clientId) : m_socket(ios), m_clientId(clientId){}
~CTcpConnection(){}

int m_clientId;
tcp::socket m_socket;
array<BYTE, 16 * 1024> m_buffer;
};

typedef shared_ptr<CTcpConnection> TcpConnectionPtr;

class CAsyncTcpServer
{
public:
class IEventHandler
{
public:
IEventHandler(){}
virtual ~IEventHandler(){}
virtual void ClientConnected(int clientId) = 0;
virtual void ClientDisconnect(int clientId) = 0;
virtual void ReceiveData(int clientId, const BYTE* data, size_t length) = 0;
};
public:
CAsyncTcpServer(int maxClientNumber, int port);
~CAsyncTcpServer();
void AddEventHandler(IEventHandler* pHandler){ m_EventHandlers.push_back(pHandler); }

void Send(int clientId, const BYTE* data, size_t length);
string GetRemoteAddress(int clientId);
string GetRemotePort(int clientId);

private:
void bind_hand_read(CTcpConnection* client);
void handle_accept(const boost::system::error_code& error);
void handle_read(CTcpConnection* client, const boost::system::error_code& error, size_t bytes_transferred);

private:
thread m_thread;
io_service m_ioservice;
io_service::work m_work;
tcp::acceptor m_acceptor;
int m_maxClientNumber;
int m_clientId;
TcpConnectionPtr m_nextClient;
map<int, TcpConnectionPtr> m_clients;
vector<IEventHandler*> m_EventHandlers;
};

接着来实现AsyncTcpServer头文件中的功能函数,此功能函数的实现如果读者不明白原理可自行将其提交给ChatGPT解析,这里就不再解释功能了。

// 朱迎春 (基础改进版)
#include "AsyncTcpServer.h"

// CAsyncTcpServer的实现
CAsyncTcpServer::CAsyncTcpServer(int maxClientNumber, int port)
: m_ioservice()
, m_work(m_ioservice)
, m_acceptor(m_ioservice)
, m_maxClientNumber(maxClientNumber)
, m_clientId(0)
{
m_thread = thread((size_t(io_service::*)())&io_service::run, &m_ioservice);
m_nextClient = make_shared<CTcpConnection>(m_ioservice, m_clientId);
m_clientId++;

tcp::endpoint endpoint(tcp::v4(), port);
m_acceptor.open(endpoint.protocol());
m_acceptor.set_option(tcp::acceptor::reuse_address(true));
m_acceptor.bind(endpoint);
m_acceptor.listen();

// 异步等待客户端连接
m_acceptor.async_accept(m_nextClient->m_socket, boost::bind(&CAsyncTcpServer::handle_accept, this, boost::asio::placeholders::error));
}

CAsyncTcpServer::~CAsyncTcpServer()
{
for (map<int, TcpConnectionPtr>::iterator it = m_clients.begin(); it != m_clients.end(); ++it)
{
it->second->m_socket.close();
}
m_ioservice.stop();
m_thread.join();
}

// 根据ID号同步给特定客户端发送数据包
void CAsyncTcpServer::Send(int clientId, const BYTE* data, size_t length)
{
map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId);
if (it == m_clients.end())
{
return;
}
it->second->m_socket.write_some(boost::asio::buffer(data, length));
}

// 根据ID号返回客户端IP地址
string CAsyncTcpServer::GetRemoteAddress(int clientId)
{
map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId);
if (it == m_clients.end())
{
return "0.0.0.0";
}
std::string remote_address = it->second->m_socket.remote_endpoint().address().to_string();
return remote_address;
}

// 根据ID号返回端口号
string CAsyncTcpServer::GetRemotePort(int clientId)
{
map<int, TcpConnectionPtr>::iterator it = m_clients.find(clientId);
char ref[32] = { 0 };
if (it == m_clients.end())
{
return "*";
}
unsigned short remote_port = it->second->m_socket.remote_endpoint().port();
std::string str = _itoa(remote_port, ref, 10);
return str;
}

void CAsyncTcpServer::handle_accept(const boost::system::error_code& error)
{
if (!error)
{
// 判断连接数目是否达到最大限度
if (m_maxClientNumber > 0 && m_clients.size() >= m_maxClientNumber)
{
m_nextClient->m_socket.close();
}
else
{
// 发送客户端连接的消息
for (int i = 0; i < m_EventHandlers.size(); ++i)
{
m_EventHandlers[i]->ClientConnected(m_nextClient->m_clientId);
}

// 设置异步接收数据
bind_hand_read(m_nextClient.get());

// 将客户端连接放到客户表中
m_clients.insert(make_pair(m_nextClient->m_clientId, m_nextClient));

// 重置下一个客户端连接
m_nextClient = make_shared<CTcpConnection>(m_ioservice, m_clientId);
m_clientId++;
}
}

// 异步等待下一个客户端连接
m_acceptor.async_accept(m_nextClient->m_socket, boost::bind(&CAsyncTcpServer::handle_accept, this, boost::asio::placeholders::error));
}

void CAsyncTcpServer::bind_hand_read(CTcpConnection* client)
{
client->m_socket.async_read_some(boost::asio::buffer(client->m_buffer),
boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
return;

client->m_socket.async_receive(boost::asio::buffer(client->m_buffer),
boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

boost::asio::async_read(client->m_socket, boost::asio::buffer(client->m_buffer),
boost::bind(&CAsyncTcpServer::handle_read, this, client, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void CAsyncTcpServer::handle_read(CTcpConnection* client, const boost::system::error_code& error, size_t bytes_transferred)
{
if (!error)
{
// 发送收到数据的信息
for (int i = 0; i < m_EventHandlers.size(); ++i)
{
m_EventHandlers[i]->ReceiveData(client->m_clientId, client->m_buffer.data(), bytes_transferred);
}
bind_hand_read(client);
}
else
{
// 发送客户端离线的消息
for (int i = 0; i < m_EventHandlers.size(); ++i)
{
m_EventHandlers[i]->ClientDisconnect(client->m_clientId);
}
m_clients.erase(client->m_clientId);
}
}

AsyncTcpServer 类调用

服务端首先定义CEventHandler类并继承自CAsyncTcpServer::IEventHandler接口,该类内需要我们实现三个方法,方法ClientConnected用于在客户端连接时触发,方法ClientDisconnect则是在登录客户端离开时触发,而当客户端有数据发送过来时则ReceiveData方法则会被触发。

方法ClientConnected当被触发时自动将clientId客户端Socket套接字放入到tcp_client_id全局容器内存储起来,而当ClientDisconnect客户端退出时,则直接遍历这个迭代容器,找到序列号并通过tcp_client_id.erase将其剔除;

// 客户端连接时触发
virtual void ClientConnected(int clientId)
{
// 将登录客户端加入到容器中
tcp_client_id.push_back(clientId);
}

// 客户端退出时触发
virtual void ClientDisconnect(int clientId)
{
// 将登出的客户端从容器中移除
vector<int>::iterator item = find(tcp_client_id.begin(), tcp_client_id.end(), clientId);
if (item != tcp_client_id.cend())
tcp_client_id.erase(item);
}

ReceiveData一旦收到数据,则直接将其打印输出到屏幕,即可实现客户端参数接收的目的;

// 客户端获取数据
virtual void ReceiveData(int clientId, const BYTE* data, size_t length)
{
std::cout << std::endl;
PrintLine(80);
std::cout << data << std::endl;
PrintLine(80);
std::cout << "[Shell] # ";
}

相对于接收数据而言,发送数据则是通过同步的方式进行,当我们需要发送数据时,只需要将数据字符串放入到一个BYTE*字节数组中,并在调用tcpServer.Send时将所需参数,套接字ID,缓冲区Buf数据,以及长度传递即可实现将数据发送给指定的客户端;

// 同步发送数据到指定的线程中
void send_message(CAsyncTcpServer& tcpServer, int clientId, std::string message, int message_size)
{
// 获取长度
BYTE* buf = new BYTE(message_size + 1);
memset(buf, 0, message_size + 1);

for (int i = 0; i < message_size; i++)
{
buf[i] = message.at(i);
}
tcpServer.Send(clientId, buf, message_size);
}

客户端完整代码如下所示,运行客户端后读者可自行使用不同的命令来接收参数返回值;

#include "AsyncTcpServer.h"
#include <string>
#include <vector>
#include <iostream>
#include <boost/tokenizer.hpp>

using namespace std;

// 存储当前客户端的ID号
std::vector<int> tcp_client_id;

// 输出特定长度的行
void PrintLine(int line)
{
for (int x = 0; x < line; x++)
{
printf("-");
}
printf("\n");
}

class CEventHandler : public CAsyncTcpServer::IEventHandler
{
public:
// 客户端连接时触发
virtual void ClientConnected(int clientId)
{
// 将登录客户端加入到容器中
tcp_client_id.push_back(clientId);
}

// 客户端退出时触发
virtual void ClientDisconnect(int clientId)
{
// 将登出的客户端从容器中移除
vector<int>::iterator item = find(tcp_client_id.begin(), tcp_client_id.end(), clientId);
if (item != tcp_client_id.cend())
tcp_client_id.erase(item);
}

// 客户端获取数据
virtual void ReceiveData(int clientId, const BYTE* data, size_t length)
{
std::cout << std::endl;
PrintLine(80);
std::cout << data << std::endl;
PrintLine(80);
std::cout << "[Shell] # ";
}
};

// 同步发送数据到指定的线程中
void send_message(CAsyncTcpServer& tcpServer, int clientId, std::string message, int message_size)
{
// 获取长度
BYTE* buf = new BYTE(message_size + 1);
memset(buf, 0, message_size + 1);

for (int i = 0; i < message_size; i++)
{
buf[i] = message.at(i);
}
tcpServer.Send(clientId, buf, message_size);
}

int main(int argc, char* argv[])
{
CAsyncTcpServer tcpServer(10, 10000);
CEventHandler eventHandler;
tcpServer.AddEventHandler(&eventHandler);
std::string command;

while (1)
{
std::cout << "[Shell] # ";
std::getline(std::cin, command);

if (command.length() == 0)
{
continue;
}
else if (command == "help")
{
printf(" _ ____ _ _ \n");
printf("| | _ _ / ___| ___ ___| | _____| |_ \n");
printf("| | | | | | \\___ \\ / _ \\ / __| |/ / _ \\ __| \n");
printf("| |__| |_| | ___) | (_) | (__| < __/ |_ \n");
printf("|_____\\__, | |____/ \\___/ \\___|_|\\_\\___|\\__| \n");
printf(" |___/ \n\n");
printf("Usage: LySocket \t PowerBy: LyShark.com \n");
printf("Optional: \n\n");
printf("\t ShowSocket 输出所有Socket容器 \n");
printf("\t GetCPU 获取CPU数据 \n");
printf("\t GetMemory 获取内存数据 \n");
printf("\t Exit 退出客户端 \n\n");
}
else
{
// 定义分词器: 定义分割符号为[逗号,空格]
boost::char_separator<char> sep(", --");
typedef boost::tokenizer<boost::char_separator<char>> CustonTokenizer;
CustonTokenizer tok(command, sep);

// 将分词结果放入vector链表
std::vector<std::string> vecSegTag;
for (CustonTokenizer::iterator beg = tok.begin(); beg != tok.end(); ++beg)
{
vecSegTag.push_back(*beg);
}
// 解析 [shell] # ShowSocket
if (vecSegTag.size() == 1 && vecSegTag[0] == "ShowSocket")
{
PrintLine(80);
printf("客户ID \t 客户IP地址 \t 客户端口 \n");
PrintLine(80);
for (int x = 0; x < tcp_client_id.size(); x++)
{
std::cout << tcp_client_id[x] << " \t "
<< tcpServer.GetRemoteAddress(tcp_client_id[x]) << " \t "
<< tcpServer.GetRemotePort(tcp_client_id[x]) << std::endl;
}
PrintLine(80);
}

// 解析 [shell] # GetCPU --id 100
if (vecSegTag.size() == 3 && vecSegTag[0] == "GetCPU")
{
char *id = (char *)vecSegTag[2].c_str();
send_message(tcpServer, atoi(id), "GetCPU", strlen("GetCPU"));
}

// 解析 [shell] # GetMemory --id 100
if (vecSegTag.size() == 3 && vecSegTag[0] == "GetMemory")
{
char* id = (char*)vecSegTag[2].c_str();
send_message(tcpServer, atoi(id), "GetMEM", strlen("GetMEM"));
}

// 解析 [shell] # Exit --id 100
if (vecSegTag.size() == 3 && vecSegTag[0] == "Exit")
{
char* id = (char*)vecSegTag[2].c_str();
send_message(tcpServer, atoi(id), "Exit", strlen("Exit"));
}
}
}
return 0;
}

案例演示

首先运行服务端程序,接着运行多个客户端,即可实现自动上线;

当用户需要通信时,只需要指定id序号到指定的Socket套接字编号即可;