C++项目 | 集群聊天服务器 | 点对点聊天业务 && 离线消息存储业务
1.点对点聊天业务
业务逻辑:用户给用户发消息,如果目的用户在线,那就找到对应的连接,然后发送消息即可
如果目的用户不在线,那就存到离线消息表,等到下次上线的时候服务器再发过去
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void ChatService::oneChat(const TcpConnectionPtr& conn,json &js,Timestamp time) { int to=js["to"].get<int>(); { lock_guard<mutex> lock_(_connMutex); auto it=_userConnMap.find(to); if(it!=_userConnMap.end()) { it->second->send(js.dump()); return; } } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12
| //注册消息 {"msgid":3,"name":"li si","password":"12345678"} //登录消息 {"msgid":1,"id":1,"password":"123456"}
{"msgid":1,"id":2,"password":"12345678"}
//张三发给李四 {"msgid":5,"id":1,"from":"zhang san","to":2,"msg":"hello123456"}
//李四发给张三 {"msgid":5,"id":1,"from":"li si","to":1,"msg":"还行"}
|
先让两者都在线

互相发送消息

2.离线消息存储业务
我们专门在offlineMessage表中存储离线消息业务的
1.离线表的相关操作的封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #ifndef OFFLINEMESSAGEMODEL_H #define OFFLINEMESSAGEMODEL_H
#include<string> #include<vector> using namespace std;
class OfflineMsgModel { public: void insert(int userid,string msg);
void remove(int userid);
vector<string> query(int userid); }; #endif
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| #include"offlineMessagemodel.hpp" #include"db.h"
void OfflineMsgModel::insert(int userid,string msg) { char sql[1024]={0}; sprintf(sql,"insert into offlineMessage values('%d','%s')",userid,msg.c_str());
MySQL mysql; if(mysql.connect()) { mysql.update(sql); } }
void OfflineMsgModel::remove(int userid) { char sql[1024]={0}; sprintf(sql,"delete from offlineMessage where userid = %d",userid);
MySQL mysql; if(mysql.connect()) { mysql.update(sql); } }
vector<string> OfflineMsgModel::query(int userid) { char sql[1024]={0}; sprintf(sql,"select message from offlineMessage where userid = %d",userid);
vector<string> vec; MySQL mysql; if(mysql.connect()) { MYSQL_RES *res=mysql.query(sql); if(res!=nullptr) { MYSQL_ROW row; while((row=mysql_fetch_row(res))!=nullptr) { vec.push_back(row[0]); } mysql_free_result(res); return vec; } } return vec; }
|
2.登录业务中,登陆成功后要检查是不是有离线消息
1 2 3 4 5 6 7
| ChatService::ChatService() { _msgHandlerMap.insert({LOGIN_MSG,std::bind(&ChatService::login,this,_1,_2,_3)}); _msgHandlerMap.insert({REG_MSG,std::bind(&ChatService::reg,this,_1,_2,_3)}); _msgHandlerMap.insert({ONE_CHAT_MSG,std::bind(&ChatService::oneChat,this,_1,_2,_3)}); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| user.setState("online"); _userModel.updateState(user);
json response; response["msgid"]=LOGIN_MSG_ACK; response["errno"]=0; response["id"]=user.getId(); response["name"]=user.getName(); vector<string> vec=_offlineMsgModel.query(id); if(!vec.empty()) { response["offlinemsg"]=vec; _offlineMsgModel.remove(id); }
conn->send(response.dump());
|
3.补充点对点聊天业务代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void ChatService::oneChat(const TcpConnectionPtr& conn,json &js,Timestamp time) { int to=js["to"].get<int>(); { lock_guard<mutex> lock_(_connMutex); auto it=_userConnMap.find(to); if(it!=_userConnMap.end()) { it->second->send(js.dump()); return; } } _offlineMsgModel.insert(to,js.dump()); }
|
4.测试
1.张三登录,给未登录的李四发送消息

2.查看离线消息表,确实有记录

3.李四登录,有离线消息发过来了

4.数据库中的记录被删除

至此测试完成,但是这只是点对点的聊天业务,扩展到集群时还需要再添加相关代码的。因为不止可以在一台主机上登录,用户可以在别的主机上登录。