C++实现集群聊天服务器
json学习
1 |
|
使用json,要包含json的头文件 ,方便使用可以重命名nlohmann::json为json。
1 | //json序列化示例1 |
js使用很像键值对。js.dump()是 nlohmann::json库中用于将 JSON 对象序列化(转为字符串)的方法,它会把一个 json类型的变量转换成 JSON 格式的字符串。
1 | std::string recvBuf= func1(); |
这是main函数中执行。json::parse()
是 nlohmann::json
库中用来将 JSON 字符串 → JSON 对象 的函数,也就是实现反序列化。
转化为json对象后使用对应的键可以把对应的值取出来。结果:
1 | {"from":"zhang san","msg":"hello,what are you doing now","msg_type":2,"to":"li si"} |
1 | //json序列化示例2 |
json还可以放数组,还可以嵌套json语句。
1 | std::string recvBuf= func2(); |
“id”存储的是一个数组,取出来还可以作为数组使用。“msg”存储的值还可以是json语句,并且可以进入再取里面的值。
结果:
1 | {"id":[1,2,3,4,5],"msg":{"liu shuo":"hello china","zhang san":"hello world"},"name":"zhang san"} |
1 | //json序列化示例3 |
还可以序列化容器。
1 | std::string recvBuf= func3(); |
结果:
1 | {"list":[1,2,5],"path":[[1,"黄山"],[2,"华山"],[3,"泰山"]]} |
muduo网络库学习
muduo 是一个现代 C++ 的非阻塞 I/O 网络库,采用 Reactor 模式 + 多线程 + epoll + 定时器 + 高效缓冲区 + 智能指针设计,在性能和工程性方面都表现非常优秀。
1 | /* |
先将bind函数讲清楚,
基本格式
1 | std::bind(&类名::成员函数, 对象指针, 占位符参数...) |
所以:
1 | std::bind(&ChatServer::onConnection, this, _1) |
意思是:
把当前对象(this
)的 onConnection
成员函数绑定起来,形成一个可调用对象,并接受一个参数 _1
。
分解解释
组件 | 说明 |
---|---|
&ChatServer::onConnection |
指向 ChatServer 类的成员函数 onConnection 的指针 |
this |
当前对象的指针,告诉 bind 这个函数是哪个对象的方法(即调用 this->onConnection) |
_1 |
占位符,代表回调传入的第一个参数,实际调用时会被替换 |
最终生成一个 std::function<void(const TcpConnectionPtr&)>
类型的函数对象。
CMakeList.txt学习
主CMakeList.txt
1 | cmake_minimum_required(VERSION 3.10) |
src文件夹下的CMakeList.txt
1 | add_subdirectory(server) |
src文件夹下的server文件夹下的CMakeList.txt
1 | #定义了SRC_LIST变量,包含了当前目录下的所有源文件 |
项目常见的cmake指令使用步骤
1 | mkdir build |
解释:
mkdir build
:创建构建目录(推荐 out-of-source build)cd build
:进入构建目录cmake ..
:执行 CMake,读取上一级目录的CMakeLists.txt
,生成构建系统(默认是 Makefile)make
:使用生成的 Makefile 编译项目
集群聊天服务器
项目目录
bin文件夹编译完成后的 .exe
(Windows)或无扩展的可执行文件。
build文件夹通常用于存放 构建过程中产生的所有中间文件,它是整个编译流程的“临时工作区”。
include文件夹存放头文件(Header Files),src文件夹是 C++ 项目中最常见也最核心的目录之一,用于存放 源代码实现文件(source files),主要是 .cpp
文件(也包括 .c
, .cc
, .cxx
等)。
test文件夹在 C++ 项目中用于存放测试代码,主要用于验证项目功能是否正确,确保每个模块在修改后仍然可以正常工作。
thirdparty文件夹在 C++ 项目中用于存放第三方依赖库的源码或接口文件。在这里存储了json.hpp文件。
CMakeLists.txt文件存放和使用
CMakeLists.txt文件放在了主目录,src文件夹里,src/server文件夹里,接下来分别说明对应文件夹中CMakeLists.txt的作用。
主目录文件夹
1 | cmake_minimum_required(VERSION 3.10) |
主目录 CMakeLists.txt
的作用概括:
- 设置项目基本信息:指定 CMake 最低版本和项目名称。
- 配置全局编译选项:例如添加调试信息(
-g
)以便调试。 - 设置可执行文件输出路径:统一将程序输出到
bin/
目录。 - 配置头文件搜索路径:包括项目头文件目录和第三方库目录,便于代码引用。
- 组织子目录构建:通过
add_subdirectory(src)
加载src
子目录,交由其继续组织源码编译。
src文件夹
1 | add_subdirectory(server) |
add_subdirectory(server)
把 server
子目录纳入构建流程,交由其内部的 CMakeLists.txt
具体定义编译规则,是模块化项目管理的关键步骤。
src/server文件夹
1 | #定义了SRC_LIST变量,包含了当前目录下的所有源文件 |
src/server/CMakeLists.txt
的作用概括:
- 收集源文件:自动获取当前目录和
db
子目录下的所有.cpp
文件。 - 生成可执行程序:将源文件编译为
ChatServer
可执行文件。 - 链接依赖库:链接 Muduo 网络库、MySQL 客户端库和 pthread 线程库。
thirdparty文件夹(第三方库)json.hpp
聊天服务器项目中,json.hpp
负责处理客户端与服务端之间的 JSON 数据格式,承担消息的 解析、构建与传输格式化,是通信协议的关键组件。
include/server/chatserver.hpp 聊天服务器的主类
1 |
|
ChatServer.h
作用及结构概括
- 类功能
定义了聊天服务器的主类 ChatServer
,负责网络服务的初始化、启动和事件处理。
- 成员变量
_server
:muduo::net::TcpServer
对象,负责网络连接管理和服务端监听。_loop
:muduo::net::EventLoop
指针,负责事件循环和调度。
- 核心接口
- 构造函数
ChatServer(EventLoop*, const InetAddress&, const string&)
:初始化服务器监听地址、名称和事件循环。 start()
:启动服务器监听,进入事件循环。
- 回调函数(私有)
onConnection()
:处理客户端连接和断开事件。onMessage()
:处理客户端消息接收事件,进行消息读写。
ChatServer
类是整个聊天项目的网络核心,利用 Muduo 库提供的 TCP 服务器功能,封装了网络事件的注册与处理,负责服务器端的网络通信逻辑。
src/server/chatserver.cpp
1 |
|
ChatServer.cpp 作用与关键点概括
- 构造函数初始化
- 初始化 Muduo 的 TCP 服务器对象
_server
和事件循环_loop
。 - 注册回调函数:
onConnection
处理连接建立与断开;onMessage
处理客户端消息。
- 设置线程数为4,支持多线程处理网络事件。
- 启动服务
start()
调用 Muduo 的_server.start()
启动监听和事件循环。
- 连接回调
onConnection
- 判断客户端是否断开连接,断开时调用
conn->shutdown()
关闭连接。
- 消息回调
onMessage
- 将网络缓冲区中的数据读取为字符串。
- 使用
json.hpp
将字符串反序列化成 JSON 对象。 - 通过消息中的
"msgid"
字段,调用业务层ChatService
获取对应的消息处理函数(回调)。 - 执行该消息处理函数,完成具体的业务逻辑处理。
作用总结
这段代码实现了聊天服务器的核心网络处理逻辑,做到:
- 网络层与业务层解耦:网络部分只负责收发和解析数据,业务处理由
ChatService
中注册的处理器完成。 - 灵活消息分发:根据 JSON 中的
msgid
动态调用对应业务处理函数,实现消息驱动机制。 - 高效异步处理:基于 Muduo 多线程事件循环模型,支持高并发网络请求。
include/server/chatservice.hpp 聊天服务器业务类
1 |
|
ChatService.h 作用与结构概括
- 类功能
- 负责处理聊天服务器的核心业务逻辑,如登录、注册等。
- 管理消息 ID 与对应的处理函数的映射,实现消息的动态分发。
- 核心类型
MsgHandler
:消息处理函数类型,参数包括 TCP 连接指针、JSON 消息对象和时间戳。
- 接口函数
static ChatService* instance()
:单例模式,获取业务服务唯一实例。void login(...)
:处理登录业务逻辑。void reg(...)
:处理注册业务逻辑。MsgHandler getHandler(int msgid)
:根据消息 ID 获取对应的消息处理函数。
- 成员变量
_msgHandlerMap
:存储消息 ID 与处理函数的映射,支持根据消息类型快速调用相应业务处理器。
src/server/chatservice.cpp
1 |
|
ChatService.cpp 作用和关键点
- 单例模式实现
instance()
函数内部静态变量实现线程安全的单例模式,保证ChatService
只有一个实例。
- 构造函数注册消息处理函数
- 在构造函数中通过
_msgHandlerMap.insert
将消息 ID(LOGIN_MSG
、REG_MSG
)绑定到成员函数login
和reg
的回调。 - 使用
std::bind
绑定成员函数和this
指针,方便后续调用。
- 动态消息处理函数获取
getHandler(int msgid)
根据传入的消息 ID 返回对应的处理函数。- 如果消息 ID 未注册,返回一个默认空操作的 lambda,同时写错误日志,保证系统健壮性。
- 业务处理函数示例
login()
和reg()
目前仅打印日志,代表登录和注册的业务处理接口,后续可以扩展具体业务逻辑。
include/public.hpp
1 |
|
public.h 作用简述
- 共享消息类型定义
- 定义了枚举类型
EnMsgType
,用于区分不同的消息类型。 - 目前包含两种消息:
LOGIN_MSG = 1
:登录消息REG_MSG
:注册消息
- 定义了枚举类型
- 作用
- 服务端和客户端都包含这个头文件,保证双方对消息类型有统一的理解和对应关系,方便通信协议的设计与实现。
src/server/main.cpp
1 |
|
启动聊天服务器:
创建事件循环和服务器监听地址,初始化 ChatServer
对象,启动服务器后进入事件循环,开始接受和处理客户端连接与消息。
include/server/db/db.h 数据库操作类
1 |
|
MySQL 数据库操作类说明
类功能
封装对 MySQL 数据库的连接、查询和更新操作,方便上层业务调用。
主要成员函数
- **构造函数
MySQL()
**:初始化数据库连接相关资源。 - **析构函数
~MySQL()
**:释放数据库连接资源。 - **
bool connect()
**:连接数据库,返回连接是否成功。 - **
bool update(string sql)
**:执行更新类 SQL 语句(如 INSERT、UPDATE、DELETE)。 - **
MYSQL_RES* query(string sql)
**:执行查询类 SQL 语句,返回结果集指针。 - **
MYSQL* getConnection()
**:获取底层 MySQL 连接对象,便于其他操作。
成员变量
- **
MYSQL *_conn
**:指向 MySQL 连接句柄的指针。
src/server/db/db.cpp
1 |
|
MySQL 类实现功能简述
- 初始化连接
- 构造函数
MySQL()
调用mysql_init
初始化 MySQL 连接对象。
- 构造函数
- 释放连接资源
- 析构函数
~MySQL()
关闭数据库连接,释放资源。
- 析构函数
- 连接数据库
connect()
使用mysql_real_connect
连接到数据库。- 成功连接后设置字符集为
gbk
,防止中文乱码。 - 通过日志打印连接成功或失败信息。
- 执行更新操作
update(string sql)
执行 SQL 更新语句(如 INSERT、UPDATE、DELETE)。- 失败时打印错误日志,返回
false
。
- 执行查询操作
query(string sql)
执行 SQL 查询语句。- 返回查询结果指针,失败时返回
nullptr
。
- 获取底层连接
getConnection()
返回当前的 MySQL 连接指针,方便其他数据库操作调用。
include/server/user.hpp 数据库中user表对应的类
1 |
|
User
类作用概括:
- ORM 映射作用
- 该类是对数据库
User
表的一个对象化表示(Object-Relational Mapping)。 - 将数据库中的一条用户记录封装成一个 C++ 对象,便于在程序中操作。
- 属性封装
类中包含如下用户信息字段:
id
: 用户编号name
: 用户名password
: 用户密码state
: 用户状态(如"online"
/"offline"
)
- 基本接口功能
提供了以下功能函数:
- 构造函数(支持默认值)
setXXX()
设置各字段值getXXX()
获取各字段值
项目中的实际用途
基于 Muduo 的聊天服务器项目中:
- 用户数据从数据库中查询出来后,可以封装成
User
类对象; - 在业务逻辑中传递用户信息时,使用
User
类便于管理; - 与数据库交互模块(如 DAO 类)进行数据传递、封装与解封装;
- 便于将用户信息序列化为 JSON 数据发送到客户端。
include/server/usermodel.hpp user表的数据操作类
1 |
|
#ifndef/#define/#endif
:防止头文件重复包含(include guard);
#include "user.hpp"
:引入用户实体类定义;
class UserModel
:封装了对 User
表的数据库操作;
bool insert(User &user)
:定义了插入用户的方法,返回是否成功。
src/server/usermodel.cpp
1 |
|
sprintf
组装 SQL;
MySQL
是你自定义的数据库操作类,封装了 connect()
和 update()
;
插入成功后,用 mysql_insert_id()
获取刚插入的记录主键 ID。
注册业务实现
实现步骤
public.hpp
1 | enum EnMsgType{ |
再加上REG_MSG_ACK 注册响应消息,告诉客户端已经收到消息。
chatservice.hpp
1 | private: |
在private中加入数据操作类对象UserModel _userModel,这个是自定义的对象在usermodel.hpp
chatservice.cpp
1 | void ChatService::reg(const TcpConnectionPtr &conn,json &js,Timestamp){ |
在ChatService类的reg函数,负责处理注册业务的。js中存储的是我们的消息以json存储的。这里面的字段例如msgid,name,password都是服务端和客户端约定好的。首先将消息中的name,和password拿出来赋值给User类对象user,注册业务,把对应的值存入数据库chat中的User表中。当数据操作类对象_userModel插入成功时,返回消息给客户端,msgid是客户端和服务端规定的消息类型,消息REG_MSG_ACK注册后的回应消息。返回的消息有消息类型和是否注册成功的标志errno(0表示成功,1表示失败),注册失败id也不会产生,所以不用发了。最后使用json的dump()函数转换为字符串通过conn的send函数再发出去。
conn是TcpConnectionPtr类型
TcpConnectionPtr
是 Muduo 网络库 中定义的一个智能指针类型,指向 TcpConnection
对象,通常用于表示一条活跃的 TCP 连接。
1 | typedef std::shared_ptr<TcpConnection> TcpConnectionPtr; |
它的生命周期由 Muduo
网络库自动管理。
常见成员函数(通过 TcpConnectionPtr
调用)
conn->send(data)
:发送数据到客户端conn->shutdown()
:关闭连接(半关闭)conn->connected()
:是否仍保持连接conn->peerAddress()
:对端地址conn->localAddress()
:本地地址conn->setContext()
/conn->getContext()
:绑定上下文(可保存登录信息、用户ID等)
实现注册业务时出现的问题
实现注册业务时,使用vscod远程连接Linux出现了连接数据库不上的问题。
1 | 20250604 13:22:35.282815Z 18893 INFO connect mysql failed!Access denied for user 'root'@'localhost' - db.cpp:29 |
数据库拒绝了 root 用户从 localhost 的访问请求。
解决方案
1.使用Linux root用户登录mysql
2.修改root用户认证方式为密码登录
1 | ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '123456'; |
第一句ALTER USER修改存在的用户 ‘root‘@’localhost’用户名为root,本地连接。IDENTIFIED WITH mysql_native_password 设置认证插件为 mysql_native_password
(密码方式)
第二句MySQL 会将用户和权限信息缓存在内存中,这条命令强制它 重新加载所有权限表。ALTER USER
做的更改立即生效,而不必重启 MySQL 服务
第三句对用户权限和密码的修改完成了,退出命令行界面即可。
auth_socket
是 MySQL 默认的一种用户认证方式,主要用于本地登录,不使用密码,而是通过操作系统身份验证。MySQL 会验证:
- 你是否是当前 Linux 系统的
root
用户; - 如果是,允许登录;
- 否则,拒绝访问,即使你输入了正确的密码也没用。
登录业务实现
实现步骤:
public.hpp
1 | enum EnMsgType{ |
加入LOGIN_MSG_ACK,//登录响应消息
usermodel.hpp
1 | //根据用户号码查询用户信息 |
加入这两个函数,后面会在登录业务用到
usermodel.cpp 对应函数的具体实现
1 | //根据用户号码查询用户信息 |
chatservice.cpp
实现login函数
1 | //处理登录业务 id pwd |
记录用户的连接信息及线程安全性
因为要实现用户与用户之间的消息往来,所以要实现记录上线用户的连接信息和下线的删除。
chatservice.hpp private:
1 | //存储在线用户的通信连接 注意线程安全 |
那么这个值在什么时候更新,首先在登录成功时,添加连接
chatservice.cpp login函数
1 | _userConnMap.insert({id,conn});//unordered_map容器没有线程安全需要添加线程互斥操作 |
当然插入了还不够,多个用户连接服务器,_userConnMap这个是在多线程环境下,要保证线程互斥性。
大多数 C++ STL 容器本身(如 vector
、map
、unordered_map
等)不是线程安全的。
所以我们要在对这个插入数据加锁。
chatservice.hpp private:
1 | //记得包含头文件#include <mutex> |
chatservice.cpp login函数
还是在登录成功那里
1 | { |
有人会问为什么再加一个花括号,这是为了让lock_guard的生命周期结束,这个变量的特性是构造时加锁,析构时自动解锁。
客户端异常退出业务
当客户端异常退出时,我们的数据库还没有修改当前状态时在线还是下线,所以要实现这个业务
chatservice.cpp 定义公共函数clientCloseException
1 | //处理客户端异常退出 |
chatservice.cpp 对应函数实现
1 | //处理客户端异常退出 |
chatserver.cpp onConnection函数
1 | if(!conn->connected()){ |
在确认断开后,使用clientCloseException将state更新为offline。
一对一聊天业务:在线聊天实现
public.hpp 加入聊天消息ONE_CHAT_MSG
1 | /* |
既然注册了业务,所以还要在chatservice.cpp的构造函数,注册业务
1 | _msgHandlerMap.insert({ONE_CHAT_MSG,std::bind(&ChatService::oneChat,this,_1,_2,_3)}); |
chatservice.hpp 声明一对一聊天业务函数声明
1 | //一对一聊天业务 |
chatservice.cpp 声明一对一聊天业务函数声明
1 | void ChatService::oneChat(const TcpConnectionPtr &conn,json &js,Timestamp){ |
这里js里面的构成是
msgid //消息类型
id 发消息的id
from 发消息的name
to 接受方的id
msg 要发送的消息
在线发送消息整体实现就是服务端接收到消息,之后查看接受用户是否在线,在线则调出服务端与接收端的连接,转发原消息。
一对一聊天业务:离线消息
首先离线消息是存储在数据库的一张表里
这张表只有两个字段userid和message 这里就不用ORM映射构造一个类来保留,所以我们直接构造一个关于这张表的操作类OfflineMsgModel
offlinemessagemodel.hpp
1 |
|
offlinemessagemodel.cpp
1 |
|
构造好对应的数据操作类,要在chatservice类实现功能
chatservice.hpp 添加对应类的声明
1 | OfflineMsgModel _offlineMsgModel; |
chatservice.cpp
首先在用户登录成功后,要查看离线表中是否有属于它的离线消息,所以在login函数中登录成功板块还要添加
1 | //查询该用户是否有离线消息 |
首先查询对应id是否有离线消息存在,存到vector数组中,如果不为空的话,response添加offlinemsg字段存储离线消息,并且在服务端删除属于接收端的离线消息,做完这些转发给接收端。
还有存储离线消息在onechat函数中对应的接收端不在线,存储离线消息在服务端。
1 | //toid不在线,存储离线信息 |
这样一对一的聊天业务就算实现了。
服务器异常退出问题解决(ctrl+c退出)
这里遇到的问题是服务器异常退出,User表中用户们的在线状态并没有改变,所以我们要将其改变。
因为这里数据库的操作,所以在usermodel.cpp UserModel类实现resetState函数,记得在hpp文件声明
1 | //重置用户的状态信息 |
这个函数就在chatservice.cpp中ChatService类的reset新函数调用
1 | //服务器异常,业务重置方法 |
最后在main.cpp实现这一功能
1 | //处理服务器ctrl+c结束后,重置user的状态信息 |
在main函数添加对应的信号
1 | int main(){ |
重点:signal(SIGINT,resetHandler);
注册信号处理函数
SIGINT
是一个信号,表示 中断信号,通常在终端按下Ctrl+C
时触发。resetHandler
是你定义的函数,用来在收到这个信号时执行一些清理逻辑。它的参数类型必须是int
,因为它接受信号编号
当然signal(SIGINT,resetHandler);这句话也不能随便放,得放在服务端启动前。
SIGINT 信号是:
Interrupt Signal(中断信号),编号是 2,表示用户希望中断(终止)正在运行的程序。
常见触发方式:
- 当你在 终端/命令行里运行一个程序时,按下 Ctrl + C,操作系统就会向该程序发送一个 SIGINT 信号。
举个例子:
1 | bash复制编辑$ ./chat_server |
这时,系统向 chat_server
进程发送 SIGINT
信号:
- 如果你没处理这个信号,程序会直接终止;
- 如果你写了
signal(SIGINT, resetHandler);
,就会先执行resetHandler()
,再退出。
背后原理:
- Linux/Unix 中,信号(signal)是一种异步通知机制,用于通知进程发生了某种事件。
SIGINT
是一种 软件信号,由终端驱动程序发送给前台进程组。
常见信号对比:
信号名 | 编号 | 含义 | 默认行为 |
---|---|---|---|
SIGINT |
2 | 终端中断(Ctrl+C) | 终止进程 |
SIGTERM |
15 | 程序终止(系统或用户发送) | 终止进程 |
SIGKILL |
9 | 强制终止(不能捕获) | 立即终止进程 |
SIGQUIT |
3 | 退出(Ctrl+\)并生成 core dump | 终止+生成转储文件 |
SIGSEGV |
11 | 段错误(访问非法内存) | 终止进程 |
总结一句话:SIGINT 是终端用户通过 Ctrl+C 发出的“中断信号”,默认会终止程序,但你可以捕获它,在程序退出前做一些清理处理。
添加好友业务代码
public.hpp
1 | enum EnMsgType{ |
添加ADD_FRIEND_MSG,//添加好友消息
添加好友在数据库中对应的一张Friend表,其中字段是userid和friendid,所以我们实现一个操控Friend表的model类
friendmodel.hpp
1 |
|
friendmodel.cpp
1 |
|
insert函数实现的是将好友关系加到数据库中。query返回的是从数据库中返回的好友数据。
当然还有一个业务绑定
首先在chatservice.hpp声明一个FriendModel对象
1 | FriendModel _friendModel; |
在ChatService的构造函数实现绑定
1 | _msgHandlerMap.insert({ADD_FRIEND_MSG,std::bind(&ChatService::addFriend,this,_1,_2,_3)}); |
在chatservice.hpp声明一个addFriend的函数
addFriend函数实现:
1 | //添加好友业务 msgid id friendid |
这里收到的json消息格式为
1 | {"msgid":6,"id":1,"friendid":2} |
还有一个问题,在用户登录时,也要好友信息的显示,所以在用户登陆成功时,服务端也要把好友信息发给用户端。
所以在服务端登录的回复信息这里需要加上好友信息
1 | //查询该用户的好友信息并返回 |
这里添加好友的业务就实现了。
群组业务
首先要实现群组业务,我在数据库有两张表实现群组业务AllGroup和GroupUser
AllGroup的数据结构:
1 | +-----------+--------------+------+-----+---------+----------------+ |
GroupUSer的数据结构
1 | +------------+--------------------------+------+-----+---------+-------+ |
groupuser.hpp
1 |
|
GroupUser类对象是记录关于组成员的详细信息的类,所以继承User。
group.hpp
1 |
|
Group类对象是AllGroup表的ORM类,但也不完全,这个类还添加了存储组成员详细信息的结构vector
因为这两张表的关联系很大,所以我们第一这个群组业务的数据操作接口。
groupmodel.hpp
1 |
|
对应函数实现
创建群组createGroup函数:
1 | // 创建群组 |
将组名和组介绍存到AllGroup表中。
加入群组addGroup函数:
1 | // 加入群组 |
// 查询用户所在群组信息queryGroups
1 | // 查询用户所在群组消息 |
根据指定的groupid查询群组用户id列表,除userid自己。queryGroupUsers函数:
1 | // 根据指定的groupid查询群组用户id列表,除userid自己,主要用户群聊业务给群组其他成员群发消息。 |
完成这些在业务类使用这些功能
首先在public.hpp更新消息类型:
1 | CREATE_GROUP_MSG,//创建群组 |
在业务类ChatService中添加对应的数据操作类对象:
1 | GroupModel _groupModel; |
加入功能实现函数:
1 | //创建群组业务 |
创建群组业务:
1 | //创建群组业务 |
加入群组业务:
1 | //加入群组业务 |
群组聊天业务:
1 | //群组聊天业务 |
还要在构造函数绑定消息对应函数:
1 | //群组业务注册 |
当然在用户登录成功时,还要把群组的信息发给客户端。
1 | //查询用户的群组信息 |
群组业务就完成了。
客户端开发-首页面开发
首先在src/client编写CMakeLists.txt和main.cpp
CMakeLists.txt
1 | #定义了一个SRC_LIST变量,包含了该目录下所有的源文件 |
main.cpp编写我们的客户端程序
首先保存的变量
1 | //记录当前系统登录的用户信息 |
目前的函数
1 | //显示当前登录成功用户的基本信息 |
主线程main函数
int main(int argc,char **argv)
参数说明:
int argc
:Argument Count,命令行参数的数量(包括程序本身的名称)。char **argv
:Argument Vector,命令行参数的数组。argv[0]
是程序的名字,argv[1]
到argv[argc-1]
是你在命令行中输入的其他参数。
在命令行运行./ChatClient 127.0.0.1 6000即可运行
客户端和服务端的连接
首先客户端和服务端的连接代码:
1 | if(argc<3){ |
对其中的函数解释:
1 | int clientfd=socket(AF_INET,SOCK_STREAM,0); |
socket函数功能:创建一个 TCP 套接字(socket)
其中的socket函数
1 |
各个参数含义:
1 | socket(int domain, int type, int protocol) |
AF_INET
(地址族)
- 指定使用 IPv4 地址(Internet Protocol version 4)。
- 如果是
AF_INET6
,那就是 IPv6。
SOCK_STREAM
(套接字类型)
- 表示使用的是 面向连接的、可靠的、基于字节流 的通信方式。
- 通常对应 TCP 协议。
0
(协议编号)
- 填
0
表示让系统根据前两个参数自动选择合适的协议。 - 对于
AF_INET + SOCK_STREAM
,系统会自动选择 TCP 协议。
返回值:
- 成功时返回一个非负整数(文件描述符,也就是
clientfd
),用于后续的读写操作。 - 失败时返回
-1
,常见原因包括:系统资源耗尽、参数错误等。
1 | sockaddr_in server; |
sockaddr_in
是 C/C++ 中专门用来表示 IPv4 网络地址 的结构体。
对应的头文件:
1 |
1 | memset(&server,0,sizeof(sockaddr_in)); |
把 server
结构体从首地址开始,连续 sizeof(server)
个字节都设置为 0
。
memset函数用法
1 | void *memset(void *ptr, int value, size_t num); |
参数说明:
参数 | 说明 |
---|---|
ptr |
指向要被设置的内存的指针 |
value |
要设置的值(会被转换为 unsigned char 类型) |
num |
要设置的字节数 |
1 | server.sin_family=AF_INET; |
1 | server.sin_family = AF_INET; |
- 表示使用的地址类型是 IPv4。
AF_INET
是 Address Family Internet 的缩写。- 这是
sockaddr_in
中的sin_family
字段,必须设置为AF_INET
,否则系统无法识别地址格式。
1 | server.sin_port = htons(port); |
- 设置端口号,例如:6000。
htons()
是 Host to Network Short 的缩写:h- 它把本地主机的字节序(可能是小端)转换为网络字节序(大端)。
- 因为 TCP/IP 协议使用大端字节序进行数据传输。
port
是你从命令行传进来的端口号字符串转成的整数。
htons函数头文件:
1 |
端口必须转换成网络字节序,否则客户端连接服务器时会出错!
1 | server.sin_addr.s_addr = inet_addr(ip); |
- 设置 IP 地址。
inet_addr()
函数把点分十进制的 IP 字符串(如"127.0.0.1"
)转换成 32 位的二进制整数。- 转换后的结果赋值给
sin_addr.s_addr
,用于 socket 连接。
inet_addr函数头文件:
1 |
1 | //client和server进行连接 |
connect()
函数详解:
1 | int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); |
参数 | 说明 |
---|---|
sockfd |
用 socket() 创建的套接字文件描述符(即 clientfd ) |
addr |
服务器地址结构指针(需要强转成 sockaddr* 类型) |
addrlen |
addr 结构体的大小,通常是 sizeof(sockaddr_in) |
客户端业务实现
首先在一个死循环中实现循环选择:
1 | //显示首页面菜单 登录,注册,退出 |
使用switch语句实现不断选择。
首先登录业务:
1 | case 1://login业务 |
这里解释一些函数
1 | int len=send(clientfd,request.c_str(),strlen(request.c_str())+1,0); |
通过 send()
函数将字符串 request
的内容发送到套接字 clientfd
所代表的对端。
各参数详解:
1 | send(socket, buffer, length, flags) |
clientfd
:套接字描述符,表示要发送数据的目标。request.c_str()
:将std::string
类型的request
转为 C 风格的字符串(返回const char*
指针)。c++strlen(request.c_str()) + 1
:表示发送的数据长度,+1
是为了包括字符串末尾的\0
(空字符终止符),这是为了让接收方知道字符串结束。0
:表示不使用额外的标志(默认行为)。
关于返回值:
返回值 len |
意义说明 |
---|---|
> 0 | 成功发送了 len 字节的数据 |
== 0 | TCP 连接被优雅关闭(几乎不会出现在 send() ,常出现在 recv() ) |
== -1 | 发送失败,需检查 errno 查看错误原因 |
1 | len=recv(clientfd,buffer,1024,0); |
从 clientfd
套接字接收最多 1024 字节的数据,存入 buffer
缓冲区中。
各参数详解:
1 | recv(socket, buffer, length, flags) |
clientfd
:连接的套接字描述符(已连接的对端)。buffer
:用于存放接收数据的内存区域(char buffer[1024];
或char* buffer = new char[1024];
)。1024
:最多接收 1024 字节(防止缓冲区溢出)。0
:标志位,通常设置为 0(表示默认阻塞接收)。
返回值说明(关键点):
返回值 len |
意义说明 |
---|---|
> 0 | 实际接收到的数据字节数(并不一定等于 1024) |
== 0 | 连接已关闭(对方调用了 close() ) |
== -1 | 接收失败,可通过 errno 查看错误原因 |
注意recv() 并不会自动在 buffer 的末尾添加 ‘\0‘
所以我们定义buffer:保证末尾有’\0‘
1 | char buffer[1024]={0}; |
1 | //登录成功,启动接受线程负责接受数据 |
创建了一个新线程,并立刻将它分离(detach),用来异步执行函数 readTaskHandler(clientfd)
。
分析每一行作用:
std::thread readTask(readTaskHandler, clientfd);
- 创建一个线程对象
readTask
,该线程会立即开始运行readTaskHandler(clientfd)
。 - 通常用于处理客户端请求,比如接收数据、解析消息等。
readTask.detach();
- 将线程分离,让它独立运行。
- 主线程不再管理这个子线程,也无法调用
join()
等待它。 - 线程资源会在线程函数执行完毕后由系统自动回收。
detach 的 优点与风险:
优点:
- 简单,不阻塞主线程,不需要显式
join()
。 - 适合执行时间短、无需主线程等待的任务(比如客户端的消息监听)。
风险:
- 无法再追踪或管理线程:你不能再 join 或获取其返回状态。
- 容易引发悬空引用或资源泄漏问题:
- 如果
readTaskHandler()
里访问了已经销毁的变量,会崩。 - 比如使用了传进来的
clientfd
后,主线程先关闭了它。
- 如果
- 调试困难:detach 后的线程出错更难排查。
注册业务:
1 | case 2://register业务 |
退出业务:
1 | case 3://quit 业务 |
聊天主界面业务:
首先在登录业务实现,登录成功又该何去何从,所以聊天主界面业务,首先在登录业务有几个函数,还没有实现,实现一下:
1 | //显示登录用户的基本信息 |
具体实现:
1 | //显示当前登录成功用户的基本信息 |
1 | //登录成功,启动接受线程负责接受数据 |
其中readTaskHandler的实现:
1 | //接受线程 |
这个函数是用来接受服务端发来的消息的。主线程是发消息的。但是当前就实现了关于一对一聊天的接受消息。
1 | //进入聊天主菜单页面 |
进入聊天主菜单页面实现具体的业务:
1 | //主聊天页面程序 |
这个函数实现进入主页面使用对应的指令使用对应的功能。接下来说说其中的变量commandHandlerMap。
1 | //注册系统支持的客户端命令处理 |
通过哈希表绑定对应的字符串和对应函数的实现。
当然在实现这些功能时得要有一个文字说明,这就是help功能。
1 | //"help" command handler |
这有个细节,因为后续用户如果还想要再次使用help功能,所以把他放在commandHandlerMap中,但是help()函数不满足function<void(int,string)>,所以我们在声明的时候赋初始值,这样就可以直接help()实现了。在其中commandMap也是自己定义提示用户如何使用这些功能。
1 | //系统支持的客户端命令列表 |
由commandHandlerMap知道要实现的功能。
1 | //调用相应命令的事件处理回调,mainMenu对修改封闭,添加新功能不需要修改该函数 |
这样就可以把具体实现的内容封装到每一个函数中,所以接下来聚集每个实现的函数。
chat功能实现:
1 | //"chat" command handler |
还要实现getCurrentTime函数:
1 | //获取系统时间(聊天信息需要添加时间信息) |
addfriend功能实现:
1 | //"addfriend" command handler |
creategroup功能实现:
1 | //"creategroup" command handler |
addgroup功能实现:
1 | //"addgroup" command handler |
groupchat功能实现:
1 | //"groupchat" command handler |
实现群聊功能后,需要在接受线程实现解析返回的信息,并且输出。服务端在这里是直接将消息转发。所以如下实现:
1 | if (GROUP_CHAT_MSG == msgtype) |
当然离线消息同时也要输出群组消息.
1 | if (ONE_CHAT_MSG == js["msgid"].get<int>()) |
else部分即为离线群组消息。
loginout功能实现:
1 | //"login out" command handler |
首先这个函数实现需要在public.hpp
1 | LOGINOUT_MSG,//注销消息 |
在服务器端做出对应的函数:(记得在构造函数注册注销业务)
1 | //处理注销业务 |
将_userConnMap对应的id的连接删去,并且更新用户的状态。
isMainMenuRunning是一个全局变量,用来确保loginout后会回到首页。所以默认值为false。当进入主菜单界面时,变为true。这样mainMenu函数就会退出循环,运行结束,进入首页的循环。
当然这里就出现了一个问题就是loginoutr退出但是程序没有退出,很多全局变量还保留着上一次的值,这就会在输出这些变量时会把上一次登录的用户信息都输出。所以在登录成功时,记得将对应变量初始化清空,再进行赋值。
1 | // 记录当前登录用户的好友列表信息 |
这两个变量在登录成功后,会进行赋值,记得在赋值前,初始化,清空。
这里还有一个问题,每次登录成功都会创造一个线程,当我在loginout后再次登录,又创造了一个新线程。所以要保证只有一个线程实现接受。所以在登录成功部分修改:
1 | // 登录成功,启动接受线程负责接受数据 ,该线程只启动一次 |
引入负载均衡器:
以上我们基本完成了客户端和服务端的开发,由于是一个聊天服务器,要考虑一台服务器同时可以多少用户在线。我借用muduo模型创建 1个主线程(accept连接)+ 4个子线程(处理连接和事件)总共 5个线程,用于服务端 I/O。主线程用于 accept 新连接4 个 EventLoop
子线程处理客户端连接的 I/O 事件(读写、消息分发等)
1 | //设置线程数量 |
操作系统的限制
操作系统对单个进程的文件描述符数量有限制,每个客户端连接通常占用一个文件描述符(socket)。
- Linux默认值(可以用
ulimit -n
查看):通常是 1024,也就是说一个进程默认最多只能同时处理约1000个客户端连接。
实际可支持的并发连接数(估算):
类型 | 数量 |
---|---|
Idle连接(不活跃) | 40,000 ~ 100,000(受内核和 FD 限制) |
活跃连接(频繁收发消息) | 10,000 ~ 20,000(实际取决于业务逻辑复杂度) |
这是单进程、多线程、epoll 架构下的典型并发能力,非常高效。
什么是 Idle连接(不活跃连接)?
Idle(空闲)连接 是指:
客户端虽然 已经建立了连接,但 长时间没有发送或接收任何数据,也就是说:
- 连接处于打开状态(TCP连接存在)
- 但没有发生 读写事件
- 占用资源极少(仅占用文件描述符、少量内存)
为什么要区分 Idle 与 活跃连接?
- 资源占用不同
- Idle 连接不产生消息,不触发回调,服务器处理负担极低
- 活跃连接需要处理消息、业务逻辑,负担高
- 并发能力的评估
- 很多高并发服务器说能支持“10万连接”,指的是 Idle连接数量
- 真正活跃(消息频繁)的连接,一般几千~上万就会达到服务器上限
- 是否需要清理 Idle连接
- 有些服务器会设置 心跳/超时机制,长时间不活跃的连接会被断开,防止资源泄漏或恶意占用
我们还要提高并发量,就要引入负载均衡器的概念。
负载均衡器
负载均衡器(Load Balancer) 是一个把大量客户端请求 均匀分发 给多个后端服务器的组件,以提高系统的 吞吐能力、稳定性和可用性。
为什么需要它?
在高并发系统中,如果所有请求都打到一个服务器上,容易导致:
- 响应慢
- 连接超时
- 服务宕机
使用负载均衡器可以:
- 水平扩展服务器(多个 Muduo 实例)
- 分担压力
- 实现高可用(某个服务器挂了不影响整体)
按协议层划分(TCP/HTTP)
类型 | 层级 | 说明 | 常用代表 |
---|---|---|---|
四层负载均衡(L4) | TCP/UDP | 基于 IP、端口、协议 | LVS、HAProxy、Nginx Stream、Keepalived |
七层负载均衡(L7) | HTTP、HTTPS | 基于 URL、Header、Cookie | Nginx、HAProxy、Traefik、Envoy、Kong |
常见软件负载均衡器对比
名称 | 协议支持 | 类型 | 特点 | 场景 |
---|---|---|---|---|
LVS | L4 (TCP/UDP) | 内核模块 | 性能极高、稳定、复杂配置 | 超高并发、基础设施层 |
HAProxy | L4 + L7 | 用户态 | 灵活、高性能、支持健康检查、监控完备 | Web、API 网关、数据库代理 |
Nginx | L7 (也支持 L4) | 用户态 | 配置简单,静态资源支持好 | Web服务器、轻量级反代 |
Traefik | L7 | 云原生 | 自动发现服务(K8s、Docker)、面向微服务 | DevOps、容器化 |
Envoy | L4 + L7 | 云原生 | 微服务架构核心组件、支持 gRPC、动态配置 | Istio、Service Mesh |
Kong | L7 API网关 | 插件丰富 | 基于 Nginx,提供限流、安全等扩展能力 | 企业 API 管理 |
Keepalived | L4 | 高可用 | 常与 LVS 配合,提供主备切换 | 双主结构、高可用架构 |
OpenResty | L7 | 基于 Nginx | 支持 Lua,适合做复杂业务逻辑 | 高定制需求 |
Caddy | L7 | 自动 TLS | HTTPS 自动配置简单 | 个人、低门槛网站部署 |
我们这里要使用的是Nginx 的 TCP 负载均衡模块(即 stream 模块)
这是负载均衡器在本项目发挥的作用图。
Nginx 默认是 HTTP 层负载均衡(七层),但它的 stream
模块 支持四层(TCP/UDP)负载均衡,非常适合。
说白了,通过多台后台服务器提高并发量。但也面临着如何处理跨服务器通信的问题。
引入服务器中间件(redis)
Redis(Remote Dictionary Server) 是一个开源的、基于内存的键值对存储(Key-Value Store)数据库,支持丰富的数据结构,读写极快,非常适合用作缓存、中间件、消息队列等。
引入redis是要解决如果一个用户要和另一个用户通信,但是另一个用户在另一台服务器上,我们能从数据库中看到他在线,但是我们不能获取他的连接,所以引入中间件。这样就可以解决跨服务器通信的问题。
这里是redis在本项目的作用图:
nginx项目配置
nginx配置编译好后,去修改/usr/local/nginx/conf中修改nginx.conf文件。
1 | #nginx tcp loadbalance config |
在events和http领域之间加入上述代码。这一段代码就是加入**stream
模块** 。
说明
- **stream{}**:用于 TCP/UDP 负载均衡。
- upstream MyServer:定义了两个后端服务器,权重相等。
- max_fails=3 fail_timeout=30s:3次失败后,30秒内认为该节点不可用。
- listen 8000:监听本机 8000 端口。
- proxy_connect_timeout 1s:连接超时时间为1秒。
- proxy_pass MyServer:请求转发到 MyServer 负载均衡组。
- tcp_nodelay on:开启 TCP_NODELAY,减少延迟。
这样就实现了多个客户端只需要向一个ip+port就可以。通过nginx对多台服务器管理,这样就增加了聊天服务器的并发量。当然跨服务器通信这一问题还没有解决。
Redis项目配置
首先在include和src建立redis文件夹,在对应的文件夹添加redis.hpp和redis.cpp。记得更新CMakeLists.txt文件。这里调用了hiredis库。hiredis是c++使用redis封装的库。
redis.hpp
1 |
|
redis.cpp
1 |
|
这里需要注意的点就是关于订阅和发布,发布publish直接用redisCommand函数即可,因为这个命令直接执行本身不会堵塞(服务器会很快给出回复)。但是subscribe不一样,他直接执行会卡在那里,一直等待消息进入通道里(服务器不会很快回复)。所以我们这里使用redisAppendCommand+
redisBufferWrite这两个函数一起,不用再等待响应。所以我们单开一个线程来接受通道里的消息。
总结对比
功能 | redisCommand |
redisAppendCommand + redisBufferWrite |
---|---|---|
用法 | 直接发送+等待响应 | 仅发送命令到 Redis,不等待响应 |
是否阻塞 | 会阻塞直到响应 | 不会阻塞 |
适用场景 | 普通命令(GET/SET) | 发布订阅等需要后台线程响应的命令 |
所以我们把监听逻辑放到线程里。
注意:
Redis 的订阅-发布机制设计:
- 订阅连接会被阻塞监听消息,不允许执行其他命令。
- 如果同一个连接既用来订阅又用来发布,订阅时会阻塞导致发布失败或延迟。
- 所以一般建议发布和订阅使用独立的连接,保证各自的流畅性和响应效率。
解决跨服务器通信问题
我们已经配置好redis,并且封装了对应的redis类。接下来在业务类实现加入redis。
在对应的ChatService类声明私有变量。
1 | //redis操作对象 |
在ChatService类的构造函数,实现redis连接并且加入回调函数。
1 | //连接redis服务器 |
handleRedisSubscribeMessage函数是从redis消息队列中获取订阅的消息(在另一个线程执行)
1 | //从redis消息队列中获取订阅的消息 |
从redis返回的userid和message是当前服务器订阅的通道返回的消息,所以当前userid一定在当前服务器的_userConnMap中,当然也存在一种情况,发消息给redis时,还在线,redis传给对应的服务器时,该用户下线了,所以也要存储该用户的离线消息。
接着还要在登录功能和注销功能做修改
在登录成功后:
1 | //id用户登录成功后,向redis订阅channel(id) |
要向redis订阅消息,有向当前用户发送的消息会向在redis通道存储,之后会向当前用户输送。
在注销(退出)后:
1 | //用户注销,相当于下线,在redis中取消订阅通道 |
关闭掉订阅通道。当然还有客户端异常退出时,也要关闭订阅
1 | //客户端关闭,相当于下线,在redis中取消订阅通道 |
最后对一对一聊天和群聊功能做修改。
一对一聊天,当查到目标用户没有在_userConnMap中,说明目标用户不在当前服务器上或者不在线,所以接下来去数据库查看目标用户是否在线,若在线,则向redis对应的通道发布消息。不在线则存储连线消息。
1 | //查询toid是否在线(可能在其他服务器上) |
群聊,当查到目标用户没有在_userConnMap中,说明目标用户不在当前服务器上或者不在线,所以接下来去数据库查看目标用户是否在线,若在线,则向redis对应的通道发布消息。不在线则存储连线消息。
1 | auto it=_userConnMap.find(id); |
至此解决跨服务器通信问题。