C++项目 | 集群聊天服务器 | 服务器集群
1.负载均衡存在的意义
为什么要有负载均衡器?
一台机器的文件描述符是有限的,之前的linux系统最大也就1024,那说明也就同时最多可以提供1024个人的服务聊天服务还是长连接,需要一直保持和服务器的连接才行,那么随着用户规模的扩大,就需要更多的服务器,而负载均衡器就是负责把这些请求分发到不同的服务器上面去。不可能说是给用户三台服务器,让用户自己选要用哪一台提供服务

第一点是基本功能
第二点,保持心跳是以防有的主机发生了故障,不能够继续工作,那么负载均衡器不可以往这个服务器发送请求服务的请求
第三点,一般服务器都是24小时不关的,因为可能24小时都有人请求服务,不可能说是为了添加新的机器就把服务给停掉,这样的话用户体验会很差,所以要有第三点
2.服务器集群
负载均衡器 - 一致性哈希算法
单台服务器受限于硬件资源,其性能是有上限的,当单台服务器不能满足应用场景的并发需求量时,就需要考虑部署多个服务器共同处理客户端的并发请求,但是客户端怎么知道去连接具体哪台服务器呢?此时就需要一台负载均衡器,通过预设的负载算法,指导客户端连接服务器。
负载均衡器有基于客户端的负载均衡和服务器的负载均衡。
普通的基于哈希的负载算法,并不能满足负载均衡所要求的单调性和平衡性,但一致性哈希算法非常好的保持了这两种特性,所以经常用在需要设计负载算法的应用场景当中。
nginx配置tcp负载均衡
在服务器快速集群环境搭建中,都迫切需要一个能拿来即用的负载均衡器,nginx在1.9版本之前,只支持http协议web服务器的负载均衡,从1.9版本开始以后,nginx开始支持tcp的长连接负载均衡,但是nginx默认并没有编译tcp负载均衡模块,编写它时,需要加入–with-stream参数来激活这个模块。
1
| nginx编译加入--with-stream参数激活tcp负载均衡模块
|
nginx编译安装需要先安装pcre、openssl、zlib等库,也可以直接编译执行下面的configure命令,根
据错误提示信息,安装相应缺少的库。

下面的make命令会向系统路径拷贝文件,需要在root用户下执行
1 2
| ./configure --with-stream make && make install
|
编译完成后,默认安装在了/usr/local/nginx目录。
可执行文件在sbin目录里面,配置文件在conf目录里面。

1 2
| nginx -s reload 重新加载配置文件启动 nginx -s stop 停止nginx服务
|
主要在conf目录里面配置nginx.conf文件,配置如下:

很多东西之前都有,把下面的部分加上就是了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| stream { upstream MyServer { #weight是权重 max_fails表示心跳,只要相应超过30秒的次数超过3次,那这个服务器就算是挂掉了 server 127.0.0.1:6000 weight=1 max_fails=3 fail_timeout=30s; server 127.0.0.1:6002 weight=1 max_fails=3 fail_timeout=30s; } server { proxy_connect_timeout 1s; #proxy_timeout 3s; #ngxin监听的端口号,客户端都连接这个端口号,然后ngxin分发到其他服务器 listen 8000; proxy_pass MyServer; tcp nodelay on; } }
|
配置完成后,./nginx -s reload平滑重启。
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include"chatserver.hpp" #include"chatservice.hpp" #include<iostream> #include<signal.h> using namespace std;
void resetHandler(int) { ChatService::instance()->reset(); exit(0); }
int main(int argc,char **argv) { if(argc<3) { cerr<<"参数过少!example:./ChatServer 127.0.0.1 6000"<<endl; }
char *ip=argv[1]; uint16_t port=atoi(argv[2]);
signal(SIGINT,resetHandler);
EventLoop loop; InetAddress addr(ip,port); ChatServer server(&loop,addr,"ChatServer");
server.start(); loop.loop(); return 0; }
|
1.启动两台服务器
2.启动两个客户端
客户端都是8000端口,就是nginx监听的端口
两个客户端都可以登录,注册之类的操作
但聊天现在还不行,因为_userConnMap并不相通

3.服务器中间件-基于发布-订阅的Redis
集群服务器之间的通信设计
当ChatServer集群部署多台服务器以后,当登录在不同服务器上的用户进行通信时,该怎么设计!如下设计好吗?

上面的设计,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,如下图所示:

在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。限于我们的项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。
redis发布-订阅的客户端编程
redis支持多种不同的客户端编程语言,例如Java对应jedis、php对应phpredis、C++对应的则是
hiredis。下面是安装hiredis的步骤:
1.git clone https://github.com/redis/hiredis 从github上下载hiredis客户端,进行源码
编译安装
1 2 3 4 5
| tony@tony-virtual-machine:~/github$ git clone https://github.com/redis/hiredis 正克隆到 'hiredis'... remote: Enumerating objects: 3261 , done. ^C收对象中: 83 % (2707/3261), 876 .01 KiB | 59 .00 KiB/s
|
2.cd hiredis
3.make
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| tony@tony-virtual-machine:~/github/hiredis$ make cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb net.c cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb hiredis.c cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb sds.c cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb async.c cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb read.c cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb sockcompat.c cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb sslio.c cc -shared -Wl,-soname,libhiredis.so.0.14 -o libhiredis.so net.o hiredis.o sds.o async.o read.o sockcompat.o sslio.o ar rcs libhiredis.a net.o hiredis.o sds.o async.o read.o sockcompat.o sslio.o cc -std=c99 -pedantic -c -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite- strings -Wno-missing-field-initializers -g -ggdb test.c cc -O3 -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings -Wno-missing- field-initializers -g -ggdb -o hiredis-test test.o libhiredis.a Generating hiredis.pc for pkgconfig... tony@tony-virtual-machine:~/github/hiredis$
|
编译成功!
4.sudo make install
1 2 3 4 5 6 7 8 9 10 11
| tony@tony-virtual-machine:~/github/hiredis$ sudo make install [sudo] tony 的密码: mkdir -p /usr/local/include/hiredis /usr/local/include/hiredis/adapters /usr/local/lib cp -pPR hiredis.h async.h read.h sds.h sslio.h /usr/local/include/hiredis cp -pPR adapters/*.h /usr/local/include/hiredis/adapters cp -pPR libhiredis.so /usr/local/lib/libhiredis.so.0. cd /usr/local/lib && ln -sf libhiredis.so.0.14 libhiredis.so cp -pPR libhiredis.a /usr/local/lib mkdir -p /usr/local/lib/pkgconfig cp -pPR hiredis.pc /usr/local/lib/pkgconfig
|
拷贝生成的动态库到/usr/local/lib目录下!
5.sudo ldconfig /usr/local/lib
代码方面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #ifndef REDIS_H #define REDIS_H
#include <hiredis/hiredis.h> #include <thread> #include <functional>
using namespace std; using redis_handler = function<void(int,string)>;
class Redis { public: Redis(); ~Redis();
bool connect();
bool publish(int channel, string message);
bool subscribe(int channel);
bool unsubscribe(int channel);
void observer_channel_message();
void init_notify_handler(redis_handler handler);
private: redisContext *publish_context_;
redisContext *subcribe_context_;
redis_handler notify_message_handler_; };
#endif
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| #include "redis.hpp" #include <iostream>
Redis::Redis() : publish_context_(nullptr), subcribe_context_(nullptr) { }
Redis::~Redis() { if (publish_context_ != nullptr) { redisFree(publish_context_); }
if (subcribe_context_ != nullptr) { redisFree(subcribe_context_); } }
bool Redis::connect() { publish_context_ = redisConnect("127.0.0.1", 6379); if (publish_context_ == nullptr) { cerr << "connect redis failed!" << endl; return false; }
subcribe_context_ = redisConnect("127.0.0.1", 6379); if (subcribe_context_ == nullptr) { cerr << "connect redis failed!" << endl; return false; }
thread t([&]() { observer_channel_message(); }); t.detach();
cout << "connect redis-server success!" << endl; return true; }
bool Redis::publish(int channel, string message) { redisReply *reply = (redisReply *)redisCommand(publish_context_, "PUBLISH %d %s", channel, message.c_str()); if (reply == nullptr) { cerr << "publish command failed!" << endl; return false; }
freeReplyObject(reply); return true; }
bool Redis::subscribe(int channel) { if (REDIS_ERR == redisAppendCommand(subcribe_context_, "SUBSCRIBE %d", channel)) { cerr << "subscibe command failed" << endl; return false; }
int done = 0; while (!done) { if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done)) { cerr << "subscribe command failed" << endl; return false; } }
return true; }
bool Redis::unsubscribe(int channel) { if (REDIS_ERR == redisAppendCommand(subcribe_context_, "UNSUBSCRIBE %d", channel)) { cerr << "subscibe command failed" << endl; return false; }
int done = 0; while (!done) { if (REDIS_ERR == redisBufferWrite(subcribe_context_, &done)) { cerr << "subscribe command failed" << endl; return false; } }
return true; }
void Redis::observer_channel_message() { redisReply *reply = nullptr; while (REDIS_OK == redisGetReply(subcribe_context_, (void **)&reply)) { if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr) { notify_message_handler_(atoi(reply->element[1]->str), reply->element[2]->str); }
freeReplyObject(reply); }
cerr << "----------------------- oberver_channel_message quit--------------------------" << endl; }
void Redis::init_notify_handler(redis_handler handler) { notify_message_handler_ = handler; }
|
cahtService.hpp和cpp的修改见github地址
4.集群后的测试
1.两台服务器进行集群

2.登录两个客户端

3.分别进行一对一聊天和群聊

测试成功!