MYSQL数据库连接池 | C++ | 项目实战 笔记通过施磊老师C++数据库连接池项目写出,供自己复习与大家参考
1.项目可以实现什么 为了提高MySQL数据库(基于C/S设计)的访问瓶颈,除了在服务器端增加缓存服务器缓存常用的数据 之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手 所耗费的 性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗。
2.项目技术点有哪些 MySQL数据库编程、单例模式、queue队列容器、C++11多线程编程、线程互斥、线程同步通信和 unique_lock、基于CAS的原子整形、智能指针shared_ptr、lambda表达式、生产者-消费者线程模型
3.连接池功能点介绍 连接池科普 如果有人不太清楚数据库连接池的话可以简单科普一下:
连接池一般是在客户端而不是服务器端,在客户端复用已经创建的数据库连接,避免性能损耗。
有人可能疑惑:我就一个用户也就用一条连接也就够了吧?还用什么连接池
例子:QQ 客户端的并发场景
假设你在使用 QQ 时,虽然你是唯一的用户,但客户端会发起多个不同的操作:
线程 1 :在后台更新聊天消息记录,向数据库读取历史消息。
线程 2 :实时显示你在某个群中的聊天内容,同时写入数据库。
线程 3 :正在处理你发送的一条语音消息,将其上传并存储到数据库中。
线程 4 :后台任务正在从好友服务器同步最新的好友列表,并把数据写入数据库。
每个线程都可能需要独立的数据库连接来完成操作,所以连接池在这种场景下就会显得十分重要,它能有效管理多个并发连接,而不需要频繁创建和关闭连接。
具体功能点 连接池一般包含了数据库连接所用的ip地址、port端口号、用户名和密码以及其它的性能参数,例如初 始连接量,最大连接量,最大空闲时间、连接超时时间等,该项目是基于C++语言实现的连接池,主要 也是实现以上几个所有连接池都支持的通用基础功能。
初始连接量(initSize) :表示连接池事先会和MySQL Server创建initSize个数的connection连接,当 应用发起MySQL访问时,不用再创建和MySQL Server新的连接,直接从连接池中获取一个可用的连接 就可以,使用完成后,并不去释放connection,而是把当前connection再归还到连接池当中。
最大连接量(maxSize) :当并发访问MySQL Server的请求增多时,初始连接量已经不够使用了,此 时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是maxSize,不能 无限制的创建连接,因为每一个连接都会占用一个socket资源,一般连接池和服务器程序是部署在一台 主机上的,如果连接池占用过多的socket资源,那么服务器就不能接收太多的客户端请求了。当这些连 接使用完成后,再次归还到连接池当中来维护。
最大空闲时间(maxIdleTime) :当访问MySQL的并发请求多了以后,连接池里面的连接数量会动态 增加,上限是maxSize个,当这些连接用完再次归还到连接池当中。如果在指定的maxIdleTime里面, 这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连 接量initSize个连接就可以了。
连接超时时间(connectionTimeout) :当MySQL的并发请求量过大,连接池中的连接数量已经到达 maxSize了,而此时没有空闲的连接可供使用,那么此时应用从连接池获取连接无法成功,它通过阻塞 的方式获取连接的时间如果超过connectionTimeout时间,那么获取连接失败,无法访问数据库。
该项目主要实现上述的连接池四大功能,其余连接池更多的扩展功能,可以自行实现。
4.MySQL Server参数介绍 mysql> show variables like ‘max_connections’; 该命令可以查看MySQL Server所支持的最大连接个数,超过max_connections数量的连接,MySQL Server会直接拒绝,所以在使用连接池增加连接数量的时候,MySQL Server的max_connections参数 也要适当的进行调整,以适配连接池的连接上限。
5.功能实现设计 ConnectionPool.cpp和ConnectionPool.h:连接池代码实现 Connection.cpp和Connection.h:数据库操作代码、增删改查代码实现
连接池主要包含了以下功能点 : 1.连接池只需要一个实例,所以ConnectionPool以单例模式进行设计 2.从ConnectionPool中可以获取和MySQL的连接Connection 3.空闲连接Connection全部维护在一个线程安全的Connection队列中,使用线程互斥锁保证队列的线 程安全 4.如果Connection队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是maxSize 5.队列中空闲连接时间超过maxIdleTime的就要被释放掉,只保留初始的initSize个连接就可以了,这个 功能点肯定需要放在独立的线程中去做 6.如果Connection队列为空,而此时连接的数量已达上限maxSize,那么等待connectionTimeout时间 如果还获取不到空闲的连接,那么获取连接失败,此处从Connection队列获取空闲连接,可以使用带 超时时间的mutex互斥锁来实现连接超时时间(并且在这段时间内是一直在监听有没有连接,有能用的就用了) 7.用户获取的连接用shared_ptr智能指针来管理,用lambda表达式定制连接释放的功能(不真正释放 连接,而是把连接归还到连接池中) 8.连接的生产和连接的消费采用生产者-消费者 线程模型来设计,使用了线程间的同步通信机制条件变量 和互斥锁
6.环境(准备工作) mysql的安装和配置由读者自行完成
1.平台选择 有关MySQL数据库编程、多线程编程、线程互斥和同步通信操作、智能指针、设计模式、容器等等这些 技术在C++语言层面都可以直接实现,因此该项目选择直接在windows平台上进行开发,当然放在 Linux平台下用g++也可以直接编译运行。
2.数据库准备工作 启动mysql后,输入登录命令,输入密码登录数据库
使用命令创建数据库chat和表user
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #查看数据库 SHOW databases;#创建数据库 CREATE DATABASE chat;USE chat; SHOW TABLES;#创建表 CREATE TABLE USER (id INT AUTO_INCREMENT PRIMARY KEY , NAME VARCHAR (50 ) , age INT , sex ENUM('male' ,'female' ) ); SHOW TABLES;DESC USER ;
3.VS准备工作 这里的MySQL数据库编程直接采用oracle公司提供的MySQL C/C++客户端开发包,在VS上需要进行相 应的头文件和库文件的配置,如下:
1.右键项目 - C/C++ - 常规 - 附加包含目录,填写mysql.h头文件的路径 2.右键项目 - 链接器 - 常规 - 附加库目录,填写libmysql.lib的路径 3.右键项目 - 链接器 - 输入 - 附加依赖项,填写libmysql.lib库的名字(记得写分号) 4.把libmysql.dll动态链接库(Linux下后缀名是.so库)放在工程目录下
注意VS上方debug后面那个要换成64位的而不是32位的
7.MYSQL API使用 转至这篇文章进行学习
MySQL API 使用详解-CSDN博客
8.连接类Connection的封装与实现 该模块主要作用是创建用户与数据库的联系,调用MYSQL的API接口实现数据库sql操作
本项目中只实现增删改查,感兴趣的读者可以参考这篇文章自行添加事务回滚等其他的操作
MySQL API 使用详解-CSDN博客
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #pragma once #include <mysql.h> #include <string> #include <ctime> using namespace std;#include "public.h" class Connection { public : Connection (); ~Connection (); bool connect (string ip, unsigned short port, string username, string password, string dbname) ; bool update (string sql) ; MYSQL_RES* query (string sql) ; void refreshAlivetime () { _alivetime = clock (); } clock_t getAlivetime () const { return clock () - _alivetime; } private : MYSQL* _conn; clock_t _alivetime; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include "pch.h" #include "public.h" #include "Connection.h" #include <iostream> using namespace std;Connection::Connection () { _conn = mysql_init (nullptr ); } Connection::~Connection () { if (_conn != nullptr ) mysql_close (_conn); } bool Connection::connect (string ip, unsigned short port, string username, string password, string dbname) { MYSQL* p = mysql_real_connect (_conn, ip.c_str (), username.c_str (), password.c_str (), dbname.c_str (), port, nullptr , 0 ); return p != nullptr ; } bool Connection::update (string sql) { if (mysql_query (_conn, sql.c_str ())) { LOG ("更新失败:" + sql); return false ; } return true ; } MYSQL_RES* Connection::query (string sql) { if (mysql_query (_conn, sql.c_str ())) { LOG ("查询失败:" + sql); return nullptr ; } return mysql_use_result (_conn); }
这个模块与数据库连接池几乎没啥关系,写完以后可以调用main编写相应实例检查代码这模块功能是否实现了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #include <iostream> #include "pch.h" #include "Connection.h" using namespace std;int main () { Connection conn; char sql[1024 ] = { 0 }; sprintf (sql, "insert into user(name,age,sex) values('%s','%d','%s')" , "zhang san" , 20 , "male" ); conn.connect ("127.0.0.1" , 3306 , "root" , "040725ge" , "chat" ); conn.update (sql); return 0 ; }
在数据库中查找,发现有插入的该条记录,则成功
辨析:连接与会话
在MySQL中,连接(Connection)和会话(Session)是两个相关但不同的概念。
连接 是一个物理的概念,它指的是一个通过网络建立的客户端和MySQL数据库服务器之间的网络连接。当客户端(如应用程序或数据库管理工具)与MySQL数据库服务器建立连接时,它们之间就建立了一条通信通道,用于传输数据和控制信息。
会话 则是一个逻辑的概念,它存在于MySQL数据库实例中。每当一个连接被建立时,MySQL会为该连接创建一个会话。会话是用户与MySQL数据库之间互动的单位,它管理着与数据库的连接和交互。会话用于执行SQL语句、管理事务、处理错误等。会话在用户断开连接或会话因超时而关闭时结束。
因此,连接并不等同于会话 ,但它们之间存在紧密的关联。一个连接可以拥有一个或多个会话(尽管在大多数情况下,一个连接只对应一个会话),而会话则依赖于连接来与数据库进行通信。
在具体表现形式上,连接通常表现为一个网络连接对象,它包含了连接所需的所有信息,如IP地址、端口号、用户名、密码等。而会话则表现为MySQL数据库实例中的一个内部对象,它管理着与该连接相关的所有信息和状态,如当前数据库、用户权限、时区设置、事务状态等。
总的来说,连接是物理层面的通信通道,而会话是逻辑层面的交互单位。它们共同协作,使得客户端能够与MySQL数据库进行有效的通信和交互。
9.连接池类CommonConnectionPool实现 先看下头文件,看看我们要实现哪些功能
CommonConnectionPool.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #pragma once #include <string> #include <queue> #include <mutex> #include <atomic> #include <thread> #include <memory> #include <functional> #include <condition_variable> #include "Connection.h" using namespace std;class ConnectionPool { public : static ConnectionPool* getConnectionPool () ; shared_ptr<Connection> getConntection () ; private : ConnectionPool (); bool loadConfigFile () ; void produceConnectionTask () ; void scannerConnectionTask () ; string _ip; unsigned _port; string _username; string _password; string _dbname; int _initSize; int _maxIdleTime; int _connectionTimeout; int _maxSize; queue<Connection*> _connectionQue; mutex _queueMutex; atomic_int _connectionCnt; condition_variable cv; };
具体有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static ConnectionPool* getConnectionPool () ;shared_ptr<Connection> getConntection () ;ConnectionPool ();bool loadConfigFile () ;void produceConnectionTask () ;void scannerConnectionTask () ;
1.getConnectionPool() 1 2 3 4 5 6 ConnectionPool* ConnectionPool::getConnectionPool () { static ConnectionPool pool; return &pool; }
这个就是单例模式的获得单例的函数接口
我们返回连接池的单例就行
2.loadConfigFile()和mysql.ini 这个是配置文件mysql.ini 进行相关的配置,一般都是手动配置的
1 2 3 4 5 6 7 8 9 10 11 12 #数据库连接池的配置文件 mysql.ini ip=127.0.0.1 port=3306 username=root password=040725ge dbname=chat initSize=10 maxSize=1024 #最大空闲时间默认单位是秒 maxIdleTime=60 #连接超时时间单位是毫秒 connectionTimeout=100
这个是从配置文件中加载配置项配置给数据库连接池那个单例的成员变量进行赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 bool ConnectionPool::loadConfigFile () { FILE* pf = fopen ("mysql.ini" , "r" ); if (pf == nullptr ) { LOG ("mysql.ini file is not exist!" ); return false ; } while (!feof (pf)) { char line[1024 ] = { 0 }; fgets (line, 1024 , pf); string str = line; int idx = str.find ('=' , 0 ); if (idx == -1 ) continue ; int endidx = str.find ('\n' , idx); string key = str.substr (0 , idx); string value = str.substr (idx + 1 , endidx - idx - 1 ); if (key == "ip" ) _ip = value; else if (key == "port" ) _port = atoi (value.c_str ()); else if (key == "username" ) _username = value; else if (key == "password" ) _password = value; else if (key == "dbname" ) _dbname = value; else if (key == "initSize" ) _initSize = atoi (value.c_str ()); else if (key == "maxSize" ) _maxSize = atoi (value.c_str ()); else if (key == "maxIdleTime" ) _maxIdleTime = atoi (value.c_str ()); else if (key == "connectionTimeout" ) _connectionTimeout = atoi (value.c_str ()); } return true ; }
3.ConnectionPool() 这个是连接池的构造函数,单例模式很多功能都会在这里实现
1.检查配置是否成功
2.初始化连接池,一开始有initsize个连接已经在池子里面了
3.启动生产者线程
4.启动定时器扫描超过maxIdleTime时间的空闲连接,进行多余的空闲连接的回收
这里要注意使用detach分离主线程和子线程,不然主线程结束而子线程还没有结束的话是一个很危险的行为。
关于为什么不用join请看具体的线程函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 ConnectionPool::ConnectionPool () { if (!loadConfigFile ()) return ; for (int i = 0 ; i < _initSize; i++) { Connection* p = new Connection (); p->connect (_ip, _port, _username, _password, _dbname); p->refreshAlivetime (); _connectionQue.push (p); _connectionCnt++; } thread produce (std::bind(&ConnectionPool::produceConnectionTask,this )) ; produce.detach (); thread scanner (std::bind(&ConnectionPool::scannerConnectionTask, this )) ; scanner.detach (); }
4.produceConnectionTask() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void ConnectionPool::produceConnectionTask () { for (;;) { unique_lock<mutex> lock (_queueMutex) ; while (!_connectionQue.empty ()) { cv.wait (lock); } if (_connectionCnt < _maxSize) { Connection* p = new Connection (); p->connect (_ip, _port, _username, _password, _dbname); p->refreshAlivetime (); _connectionQue.push (p); _connectionCnt++; } cv.notify_all (); } }
如果主线程调用join阻塞等待生产者线程结束,但是我们这里的生产者线程是一个死循环,也就是说主线程在这里等死了生产者线程也不会结束,并且主线程很有可能还有其他功能要调用。
为什么用detach呢?因为这里主线程和子线程分离之后就各管各的,主线程会自动继续执行下去,去干它的事情,生产者线程也就去生产了,在主线程事情办完了以后,那就结束了,之后整个进程结束后会回收仍然处在死循环的生产者线程
大家肯定会觉得这样不太好,这里提供一种优化措施。
1.增加一个成员变量,_stop来控制是否要结束子线程线程
2.构造函数初始化为false
3.增加一个成员函数stop专门关闭生产者线程
1 2 3 4 5 6 7 void stop () { { std::lock_guard<std::mutex> lock (_queueMutex) ; _stop = true ; } cv.notify_all (); }
4.在生产者代码段添加
1 2 3 4 5 if (_stop) { std::cout << "生产者线程退出" << std::endl; return ; }
5.在析构函数中调用stop成员函数即可
1 2 3 4 5 6 ~ConnectionPool () { stop (); if (_producerThread.joinable ()) { _producerThread.join (); } }
5.getConntection() 实现消费者线程+回收机制
1.如果超过了最长等待时间还没有获得连接,说明此时连接的人太多了,获取连接失败了
2.使用智能指针+自定义删除器来完成回收机制
智能指针第二个参数是删除器,用lambda表达式,我们不希望它断开连接,而是重新回到连接池队列中,供下一次使用,而不用重新建立连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 shared_ptr<Connection> ConnectionPool::getConntection () { unique_lock<mutex> lock (_queueMutex) ; while (_connectionQue.empty ()) { if (cv_status::timeout == cv.wait_for (lock, chrono::milliseconds (_connectionTimeout))) { if (_connectionQue.empty ()) { LOG ("获取空闲连接超时了...获取连接失败" ); return nullptr ; } } } shared_ptr<Connection> sp (_connectionQue.front(), [&](Connection* pcon) { unique_lock<mutex> lock(_queueMutex); pcon->refreshAlivetime(); _connectionQue.push(pcon); }) ; _connectionQue.pop (); cv.notify_all (); return sp; }
大家也不用担心内存泄漏的问题,在连接池最后析构的时候会对每一个对象调用析构函数进行内存释放
6.scannerConnectionTask() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 void ConnectionPool::scannerConnectionTask () { for (;;) { this_thread::sleep_for (chrono::seconds (_maxIdleTime)); unique_lock<mutex> lock (_queueMutex) ; while (_connectionCnt > _initSize) { Connection* p = _connectionQue.front (); if (p->getAlivetime () > (_maxIdleTime * 1000 )) { _connectionQue.pop (); _connectionCnt--; delete p; } else { break ; } } } }
这部分相关的变量和成员函数都在connection类中封装着,大家可能找不到
1 2 3 4 5 6 7 8 pubic: void refreshAlivetime () { _alivetime = clock (); } clock_t getAlivetime () const { return clock () - _alivetime; } private : MYSQL* _conn; clock_t _alivetime;
在每次生产者生产完,和每次使用完连接析构的时候调用只能指针的删除器的时候会调用refreshAlivetime进行开始空闲的连接时间的刷新,就是说从这个点开始你是空闲的
然后返回存活时间就是从生产出来或者重新入队,到目前请求getAlivetime函数这一段时间
10.各个模块实现后的完整版 pch.h 1 2 3 4 5 #pragma once #ifndef PCH_H #define PCH_H #endif
public.h 1 2 3 4 5 #pragma once #define LOG(str) \ cout << __FILE__ << ":" << __LINE__ <<" " << \ __TIMESTAMP__ << " : " << str << endl;
Connection.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #pragma once #include <mysql.h> #include <string> #include <ctime> using namespace std;#include "public.h" class Connection { public : Connection (); ~Connection (); bool connect (string ip, unsigned short port, string username, string password, string dbname) ; bool update (string sql) ; MYSQL_RES* query (string sql) ; void refreshAlivetime () { _alivetime = clock (); } clock_t getAlivetime () const { return clock () - _alivetime; } private : MYSQL* _conn; clock_t _alivetime; };
CommonConnectionPool.h 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 #pragma once #include <string> #include <queue> #include <mutex> #include <atomic> #include <thread> #include <memory> #include <functional> #include <condition_variable> #include "Connection.h" using namespace std;class ConnectionPool { public : static ConnectionPool* getConnectionPool () ; shared_ptr<Connection> getConntection () ; private : ConnectionPool (); bool loadConfigFile () ; void produceConnectionTask () ; void scannerConnectionTask () ; string _ip; unsigned _port; string _username; string _password; string _dbname; int _initSize; int _maxIdleTime; int _connectionTimeout; int _maxSize; queue<Connection*> _connectionQue; mutex _queueMutex; atomic_int _connectionCnt; condition_variable cv; };
Connection.cpp 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include "pch.h" #include "public.h" #include "Connection.h" #include <iostream> using namespace std;Connection::Connection () { _conn = mysql_init (nullptr ); } Connection::~Connection () { if (_conn != nullptr ) mysql_close (_conn); } bool Connection::connect (string ip, unsigned short port, string username, string password, string dbname) { MYSQL* p = mysql_real_connect (_conn, ip.c_str (), username.c_str (), password.c_str (), dbname.c_str (), port, nullptr , 0 ); return p != nullptr ; } bool Connection::update (string sql) { if (mysql_query (_conn, sql.c_str ())) { LOG ("更新失败:" + sql); return false ; } return true ; } MYSQL_RES* Connection::query (string sql) { if (mysql_query (_conn, sql.c_str ())) { LOG ("查询失败:" + sql); return nullptr ; } return mysql_use_result (_conn); }
CommonConnectionPool.cpp 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 #include "pch.h" #include "public.h" #include "CommonConnectionPool.h" #include <iostream> ConnectionPool* ConnectionPool::getConnectionPool () { static ConnectionPool pool; return &pool; } bool ConnectionPool::loadConfigFile () { FILE* pf = fopen ("mysql.ini" , "r" ); if (pf == nullptr ) { LOG ("mysql.ini file is not exist!" ); return false ; } while (!feof (pf)) { char line[1024 ] = { 0 }; fgets (line, 1024 , pf); string str = line; int idx = str.find ('=' , 0 ); if (idx == -1 ) continue ; int endidx = str.find ('\n' , idx); string key = str.substr (0 , idx); string value = str.substr (idx + 1 , endidx - idx - 1 ); if (key == "ip" ) _ip = value; else if (key == "port" ) _port = atoi (value.c_str ()); else if (key == "username" ) _username = value; else if (key == "password" ) _password = value; else if (key == "dbname" ) _dbname = value; else if (key == "initSize" ) _initSize = atoi (value.c_str ()); else if (key == "maxSize" ) _maxSize = atoi (value.c_str ()); else if (key == "maxIdleTime" ) _maxIdleTime = atoi (value.c_str ()); else if (key == "connectionTimeout" ) _connectionTimeout = atoi (value.c_str ()); } return true ; } ConnectionPool::ConnectionPool () { if (!loadConfigFile ()) return ; for (int i = 0 ; i < _initSize; i++) { Connection* p = new Connection (); p->connect (_ip, _port, _username, _password, _dbname); p->refreshAlivetime (); _connectionQue.push (p); _connectionCnt++; } thread produce (std::bind(&ConnectionPool::produceConnectionTask,this )) ; produce.detach (); thread scanner (std::bind(&ConnectionPool::scannerConnectionTask, this )) ; scanner.detach (); } void ConnectionPool::produceConnectionTask () { for (;;) { unique_lock<mutex> lock (_queueMutex) ; while (!_connectionQue.empty ()) { cv.wait (lock); } if (_connectionCnt < _maxSize) { Connection* p = new Connection (); p->connect (_ip, _port, _username, _password, _dbname); p->refreshAlivetime (); _connectionQue.push (p); _connectionCnt++; } cv.notify_all (); } } shared_ptr<Connection> ConnectionPool::getConntection () { unique_lock<mutex> lock (_queueMutex) ; while (_connectionQue.empty ()) { if (cv_status::timeout == cv.wait_for (lock, chrono::milliseconds (_connectionTimeout))) { if (_connectionQue.empty ()) { LOG ("获取空闲连接超时了...获取连接失败" ); return nullptr ; } } } shared_ptr<Connection> sp (_connectionQue.front(), [&](Connection* pcon) { unique_lock<mutex> lock(_queueMutex); pcon->refreshAlivetime(); _connectionQue.push(pcon); }) ; _connectionQue.pop (); cv.notify_all (); return sp; } void ConnectionPool::scannerConnectionTask () { for (;;) { this_thread::sleep_for (chrono::seconds (_maxIdleTime)); unique_lock<mutex> lock (_queueMutex) ; while (_connectionCnt > _initSize) { Connection* p = _connectionQue.front (); if (p->getAlivetime () > (_maxIdleTime * 1000 )) { _connectionQue.pop (); _connectionCnt--; delete p; } else { break ; } } } }
mysql.ini 1 2 3 4 5 6 7 8 9 10 11 12 #数据库连接池的配置文件 ip=127.0.0.1 port=3306 username=root password=040725ge dbname=chat initSize=10 maxSize=1024 #最大空闲时间默认单位是秒 maxIdleTime=60 #连接超时时间单位是毫秒 connectionTimeout=100
pch.cpp
11.个人觉得应该改进的地方 改进:生产者线程和检查超过最大空闲时间的线程应该加入一个自己结束的标志,在线程池对象析构的时候回收掉而不是detach后进入死循环等着整个应用程序进程结束之后才回收
在produceConnectionTask()部分有实现的代码,供大家参考
12.技术点问题的复习 1.锁的作用: 第一个锁 情景 假设有两个线程 T1 和 T2 同时在运行,并且它们都调用 produceConnectionTask()
方法。具体执行流程如下:
T1 获取了 unique_lock
,并检查 _connectionQue
是否为空。在这里,T1 发现队列为空,于是进入到 if (_connectionCnt < _maxSize)
的条件判断。
此时,T2 也几乎同时调用了 produceConnectionTask()
,但由于没有获取锁,它可以在 T1 的操作未完成时同时访问 _connectionQue
和 _connectionCnt
。
T2 也发现 _connectionQue
为空,于是它也进入到 if (_connectionCnt < _maxSize)
条件判断,且在此刻它的 _connectionCnt
可能还没有增加,因为 T1 还未完成对连接的创建和队列的更新。
问题 由于没有使用锁来保护对 _connectionQue
和 _connectionCnt
的访问,T1 和 T2 都在认为可以创建连接的情况下独立执行,可能导致:
连接重复创建 :假设 _maxSize
的值为 10,那么 T1 和 T2 各自都创建了一个新的连接,_connectionCnt
的值可能变成了 12(假设 T1 和 T2 同时完成创建),这超出了最大连接数的限制。
资源浪费和潜在崩溃 :创建过多的连接会导致资源浪费,并可能导致数据库连接过载,最终可能导致程序崩溃或出现不稳定状态。
第2个锁 (unique_lock<mutex> lock(_queueMutex);
):
作用 :保护对连接队列 _connectionQue
的访问,确保只有一个线程能够在任意时刻对队列进行操作(检查空闲连接、弹出连接等)。
场景 :当线程调用 getConntection()
时,它会首先尝试获取这个锁。只有在成功获得锁后,线程才能访问和修改 _connectionQue
。
第3个锁 (在 lambda 中的 unique_lock<mutex> lock(_queueMutex);
):
作用 :在自定义删除器的上下文中再次保护对 _queueMutex
的访问,确保在归还连接时,线程能够安全地刷新连接的空闲时间并将连接推回到队列中。
场景 :当 shared_ptr
被销毁时,删除器会被调用,这时需要再一次确保对 _queueMutex
的安全访问,以避免数据竞争和不一致的状态。
如果缺少第一个锁 缺少第一个锁的后果:
情景 :假设两个线程 T1 和 T2 同时调用 getConntection()
方法。由于缺少第一个锁,T1 可能在检查 _connectionQue.empty()
时,T2 也进入了 getConntection()
方法并尝试检查同一队列。
问题 :如果 T1 和 T2 同时读取 _connectionQue
的状态,且在 T1 认为队列不为空的情况下,T2 也可能认为队列不为空。然后两者都试图从队列中获取连接并将其返回,这可能导致同一个连接被多个 shared_ptr
指向,造成连接资源的重复释放,最终引发未定义行为。
如果缺少第二个锁 缺少第二个锁的后果:
情景 :假设 T1 调用了 getConntection()
方法,并成功获取到连接,同时正在返回 shared_ptr
,在自定义删除器中缺少锁保护。
问题 :在 T1 调用删除器时,T2 可能也同时调用 getConntection()
。如果 T2 也在操作 _connectionQue
,它可以同时修改连接队列,甚至可能在 T1 还未完全完成对连接的返回时,试图再次修改同一连接。
后果 :如果 T1 的删除器正在调用 refreshAlivetime()
,而 T2 也在尝试将连接返回到队列中,两个线程之间可能会发生数据竞争,导致连接的状态不一致或引发程序崩溃。
2.生产者消费者模型 本项目中是生产者和消费者模型的运用,只不过是生产和消费的东西是数据库的连接罢了
3.原子整型 _connectionCnt;//记录连接所创建的connection连接的总数量,不能超过maxsize
对它的++和–操作必须是线程安全的,所以用原子整型
4.线程互斥 通过上面提到的三个锁完成了互斥,主要是对队列的操作需要线程互斥
5.线程通信和信号量 通过cv这个信号量实现了线程通信,即生产者生产完通知消费者来消费,消费者消费完通知生产者生产
6.智能指针和lambda表达式 lambda自定义智能指针删除器,使得用完的连接不用释放而是重新进入连接池队列