C++项目 | 集群聊天服务器 | 点对点聊天业务 && 离线消息存储业务

1.点对点聊天业务

业务逻辑:用户给用户发消息,如果目的用户在线,那就找到对应的连接,然后发送消息即可

如果目的用户不在线,那就存到离线消息表,等到下次上线的时候服务器再发过去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr& conn,json &js,Timestamp time)
{
//目的用户
int to=js["to"].get<int>();
{
lock_guard<mutex> lock_(_connMutex);
auto it=_userConnMap.find(to);
if(it!=_userConnMap.end())
{
//to在线,转发消息 服务器主动推送消息给toid用户
it->second->send(js.dump());
return;
}
}
//to不在线,存储离线消息

}

测试

1
2
3
4
5
6
7
8
9
10
11
12
//注册消息
{"msgid":3,"name":"li si","password":"12345678"}
//登录消息
{"msgid":1,"id":1,"password":"123456"}

{"msgid":1,"id":2,"password":"12345678"}

//张三发给李四
{"msgid":5,"id":1,"from":"zhang san","to":2,"msg":"hello123456"}

//李四发给张三
{"msgid":5,"id":1,"from":"li si","to":1,"msg":"还行"}

先让两者都在线

image-20250122162044087

互相发送消息

image-20250122163724927

2.离线消息存储业务

我们专门在offlineMessage表中存储离线消息业务的

1.离线表的相关操作的封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//offlinemessagemodel.hpp
#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H

#include<string>
#include<vector>
using namespace std;

//提供离线消息表的操作接口方法
class OfflineMsgModel
{
public:
//存储用户的离线消息
void insert(int userid,string msg);

//删除用户的离线消息
void remove(int userid);

//查询用户的离线消息
vector<string> query(int userid);
};
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//offlineMessagemodel.cpp
#include"offlineMessagemodel.hpp"
#include"db.h"

//存储用户的离线消息
void OfflineMsgModel::insert(int userid,string msg)
{
//1.组装sql语句
char sql[1024]={0};
sprintf(sql,"insert into offlineMessage values('%d','%s')",userid,msg.c_str());

MySQL mysql;
if(mysql.connect())
{
mysql.update(sql);
}
}

//删除用户的离线消息
void OfflineMsgModel::remove(int userid)
{
//1.组装sql语句
char sql[1024]={0};
sprintf(sql,"delete from offlineMessage where userid = %d",userid);

MySQL mysql;
if(mysql.connect())
{
mysql.update(sql);
}
}

//查询用户的离线消息
vector<string> OfflineMsgModel::query(int userid)
{
//根据用户号码查询用户信息
//1.组装sql语句
char sql[1024]={0};
sprintf(sql,"select message from offlineMessage where userid = %d",userid);

vector<string> vec;
MySQL mysql;
//建立连接
if(mysql.connect())
{
//获取结果
MYSQL_RES *res=mysql.query(sql);
if(res!=nullptr)
{
//把所有的离线消息放入vec中返回
MYSQL_ROW row;
while((row=mysql_fetch_row(res))!=nullptr)
{
vec.push_back(row[0]);
}
//释放资源
mysql_free_result(res);
return vec;
}
}
//没查询到结果
return vec;
}

2.登录业务中,登陆成功后要检查是不是有离线消息

1
2
3
4
5
6
7
//注册消息以及对应的Handler回调操作
ChatService::ChatService()
{
_msgHandlerMap.insert({LOGIN_MSG,std::bind(&ChatService::login,this,_1,_2,_3)});
_msgHandlerMap.insert({REG_MSG,std::bind(&ChatService::reg,this,_1,_2,_3)});
_msgHandlerMap.insert({ONE_CHAT_MSG,std::bind(&ChatService::oneChat,this,_1,_2,_3)});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//登录成功,更新用户状态信息
user.setState("online");
_userModel.updateState(user);

json response;
response["msgid"]=LOGIN_MSG_ACK;
response["errno"]=0;
response["id"]=user.getId();
response["name"]=user.getName();
//查询该用户是否有离线消息 有的话也要封装到response中
vector<string> vec=_offlineMsgModel.query(id);
if(!vec.empty())
{
response["offlinemsg"]=vec;
//读取用户的离线消息后,把该用户的离线消息删除掉
_offlineMsgModel.remove(id);
}

conn->send(response.dump());

3.补充点对点聊天业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr& conn,json &js,Timestamp time)
{
//目的用户
int to=js["to"].get<int>();
{
lock_guard<mutex> lock_(_connMutex);
auto it=_userConnMap.find(to);
if(it!=_userConnMap.end())
{
//to在线,转发消息 服务器主动推送消息给toid用户
it->second->send(js.dump());
return;
}
}
//to不在线,存储离线消息
_offlineMsgModel.insert(to,js.dump());
}

4.测试

1.张三登录,给未登录的李四发送消息

image-20250122173000983

2.查看离线消息表,确实有记录

image-20250122173052898

3.李四登录,有离线消息发过来了

image-20250122173153550

4.数据库中的记录被删除

image-20250122173224389

至此测试完成,但是这只是点对点的聊天业务,扩展到集群时还需要再添加相关代码的。因为不止可以在一台主机上登录,用户可以在别的主机上登录。