利用多线程和智能指针手写数据库连接池

项目背景

为了提高MySQL数据库(基于客户端服务端模型)的访问瓶颈

策略一:减小磁盘的IO。

  • 在服务器端增加缓存服务器缓存常用的数据(例如redis)

  • 可以增加连接池,来提高MySQL Server的访问效率。

    在高并发情况下,大量的TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗。

本项目就是为了在C/C++项目中,提高MySQL Server的访问效率,实现基于C++代码的数据库连接池模块。

什么是连接池

数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。并且应用程序可以根据池中连接的使用率,动态增加或减少池中的连接数。

连接池一般包含了数据库连接所用的ip地址、port端口号、用户名和密码以及其它的性能参数,例如初始连接量,最大连接量,最大空闲时间、连接超时时间等。

初始连接量(initSize):表示连接池事先会和MySQL Server创建initSize个数的connection连接,当应用发起MySQL访问时,不用再创建和MySQL Server新的连接,直接从连接池中获取一个可用的连接就可以,使用完成后,并不去释放connection,而是把当前connection再归还到连接池当中。

最大连接量(maxSize):当并发访问MySQL Server的请求增多时,初始连接量已经不够使用了,此时会根据新的请求数量去创建更多的连接给应用去使用,但是新创建的连接数量上限是maxSize,不能无限制的创建连接,因为每一个连接都会占用一个socket资源,一般连接池和服务器程序是部署在一台主机上的,如果连接池占用过多的socket资源,那么服务器就不能接收太多的客户端请求了。当这些连接使用完成后,再次归还到连接池当中来维护。

最大空闲时间(maxIdleTime):当访问MySQL的并发请求多了以后,连接池里面的连接数量会动态增加,上限是maxSize个,当这些连接用完再次归还到连接池当中。如果在指定的maxIdleTime里面,这些新增加的连接都没有被再次使用过,那么新增加的这些连接资源就要被回收掉,只需要保持初始连接量initSize个连接就可以了。

连接超时时间(connectionTimeout):当MySQL的并发请求量过大,连接池中的连接数量已经到达maxSize了,而此时没有空闲的连接可供使用,那么此时应用从连接池获取连接无法成功,它通过阻塞的方式获取连接的时间如果超过connectionTimeout时间,那么获取连接失败,无法访问数据库。该项目主要实现上述的连接池四大功能。

MYSQL部分的连接实现

MySQL数据库C++代码封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include <mysql.h>
#include <string>
using namespace std;
#include "public.h"
// 数据库操作类
class MySQL
{
public:
// 初始化数据库连接
MySQL()
{
_conn = mysql_init(nullptr);
}
// 释放数据库连接资源
~MySQL()
{
if (_conn != nullptr)
mysql_close(_conn);
}
// 连接数据库
bool connect(string ip, unsigned short port, string user, string password,
string dbname)
{
MYSQL *p = mysql_real_connect(_conn, ip.c_str(), user.c_str(),
password.c_str(), dbname.c_str(), port, nullptr, 0);
return p != nullptr;
}
// 更新操作 insert、delete、update
bool update(string sql)
{
if (mysql_query(_conn, sql.c_str()))
{
LOG("更新失败:" + sql);
return false;
}
return true;
}
// 查询操作 select
MYSQL_RES* query(string sql)
{
if (mysql_query(_conn, sql.c_str()))
{
LOG("查询失败:" + sql);
return nullptr;
}
return mysql_use_result(_conn);
}
private:
MYSQL *_conn; // 表示和MySQL Server的一条连接
};

在官方提供的基础上进行功能增加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Connection
{
public:
// 初始化数据库连接
Connection();
// 释放数据库连接资源
~Connection();
// 连接数据库
bool connect(string ip,
unsigned short port,
string user,
string password,
string dbname);
// 更新操作 insert、delete、update
bool update(string sql);
// 查询操作 select
MYSQL_RES* query(string sql);

// 刷新一下连接的起始的空闲时间点
void refreshAliveTime() { _alivetime = clock(); }
// 返回存活的时间
clock_t getAliveeTime()const { return clock() - _alivetime; }
private:
MYSQL *_conn; // 表示和MySQL Server的一条连接
clock_t _alivetime; // 记录进入空闲状态后的起始存活时间
};

连接池实现

原理
  1. 连接池只需要一个实例,所以ConnectionPool以单例模式进行设计

  2. 从ConnectionPool中可以获取和MySQL的连接Connection

  3. 空闲连接Connection全部维护在一个线程安全的Connection队列中,使用线程互斥锁保证队列的线程安全

  4. 如果Connection队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是maxSize.因此这里是一个生产者线程,生产连接。

  5. 队列中空闲连接时间超过maxIdleTime的就要被释放掉,只保留初始的initSize个连接就可以了。也就是说需要一个定时线程清理队列多余的空闲连接

  6. 如果Connection队列为空,而此时连接的数量已达上限maxSize,那么等待connectionTimeout时间如果还获取不到空闲的连接,那么获取连接失败,此处从Connection队列获取空闲连接,可以使用带超时时间的mutex互斥锁来实现连接超时时间

  7. 用户获取的连接用shared_ptr智能指针来管理,用lambda表达式定制连接释放的功能(不真正释放连接,而是把连接归还到连接池中)

  8. 连接的生产和连接的消费采用生产者-消费者线程模型来设计,使用了线程间的同步通信机制条件变量和互斥锁

线程安全的单例

这里采用的是线程安全的懒汉式单例模式。

什么是懒汉模式:在程序运行到需要用到该类的实例化时,instance()方法才去判断单例指针p,进而实例化单例指针p,让人它有一种懒惰,不到最后关头不实例化的感觉。

首先,将类的构造函数进行私有化

1
2
3
private:
// 单例#1 构造函数私有化
ConnectionPool();

为了实现线程安全的懒汉式单例模式,可以采用锁,也可以利用静态区。这里采用的是后者。使用在静态数据区实现类的实现的方法,因为分配在静态区中所以无论获取实例的函数被调用多少次,所得的都是同一份实例

1
2
3
public:
// 获取连接池对象实例
static ConnectionPool* getConnectionPool();

根据配置文件配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 从配置文件中加载配置项
bool ConnectionPool::loadConfigFile()
{
//打开文件
FILE *pf = fopen("mysql.ini", "r");
if (pf == nullptr)
{
//处理文件没打开的情况
...
}
//逐行读文件
while (!feof(pf))
{
char line[1024] = { 0 };
fgets(line, 1024, pf);
string str = line;
int idx = str.find('=', 0);
if (idx == -1) // 无效的配置项
{
continue;
}
int endidx = str.find('\n', idx);
string key = str.substr(0, idx);
string value = str.substr(idx + 1, endidx - idx - 1);
if (key == "ip")
{
_ip = value;
}
...//依次匹配设置其它参数
}
return true;
}

构造函数

构造函数只执行一次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <thread>
ConnectionPool::ConnectionPool()
{
// 加载配置项
if (!loadConfigFile())
{
return;
}
// 创建初始数量的连接,此时是系统启动过程中的所以不用考虑线程安全
for (int i = 0; i < _initSize; ++i)
{
Connection *p = new Connection();//创建连接
p->connect(_ip, _port, _username, _password, _dbname);
p->refreshAliveTime(); // 刷新一下开始空闲的起始时间
_connectionQue.push(p); //包含到队列中去
_connectionCnt++;
}

// 启动一个新的线程,作为连接的生产者 linux thread => pthread_create
thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
produce.detach();

// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));
scanner.detach();
}

部分解释:

1
2
3
std::thread produce(std::bind(&ConnectionPool::produceConnectionTask, this));
//需要使用bind绑定对象this不然没法访问成员变量
produce.detach();

这行代码创建了一个名为 produce 的线程对象。它使用 std::bind 函数将 ConnectionPool::produceConnectionTask 成员函数绑定到当前对象实例 (this) 上,然后传递给 std::thread 构造函数。这样,线程 produce 将执行 ConnectionPoolproduceConnectionTask 成员函数。

调用 detach 函数,将线程 produce 从主线程中分离。这意味着一旦主线程结束,不再等待 produce 线程的完成。线程的生命周期将独立于主线程,允许它在后台继续执行。通常,detach 被用于在主线程退出时,确保所有线程都能完成它们的任务。

生产者线程:连接生产者

首先这个线程一直在查看并创建连接,因此应该在一个死循环里面

1
2
3
4
void ConnectionPool::produceConnectionTask()
{
for (;;)
}

设置条件变量作为成员变量,用于连接生产线程和连接消费线程的通信

1
condition_variable cv; 

核心逻辑:如果队列不空,则生产者线程等待暂不生产,否则,当连接数量没有达到上限的时候生产新的连接,生产完后通知消费者

使用互斥锁维护队列之间的线程安全:

1
2
3
mutex _queueMutex; // 维护连接队列的线程安全互斥锁

unique_lock<mutex> lock(_queueMutex);

std::mutex 是 C++11 标准引入的互斥量类,用于实现线程同步。互斥量用于保护共享资源,确保同一时间只有一个线程可以访问它。

这行代码的含义是创建一个 std::unique_lock 对象 lock,该对象对 _queueMutex 进行独占性的锁定。在 lock 对象的生命周期内,_queueMutex 将一直被锁定,直到 lock 被销毁(通常是离开作用域时)或显式调用 unlock 方法来释放锁。这样做的目的是确保在互斥量保护的临界区内,只有一个线程能够执行,防止多个线程同时访问共享资源导致竞态条件。

这里生产者加锁后,消费者就拿不到这把锁了

线程安全队列

std::queue 是 C++ 标准模板库(STL)中的队列容器,但它本身并不提供线程安全性。在多线程环境中,多个线程可能会同时访问和修改队列,这可能导致竞态条件(race condition)和数据不一致性。

以下是一些导致 std::queue 不适用于多线程环境的主要原因:

  1. 缺乏同步机制: std::queue 不包含内建的同步机制,比如互斥量或锁。在多线程环境下,当多个线程同时尝试访问或修改队列时,没有适当的同步措施可能导致数据竞争。

  2. 不提供原子操作: std::queue 不提供原子操作,即使是简单的 pushpop 操作,也不能在多线程环境中保证原子性。多个线程可能同时执行这些操作,而没有合适的同步机制,可能导致不一致的队列状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <condition_variable>
void ConnectionPool::produceConnectionTask()
{
for (;;)
{
unique_lock<mutex> lock(_queueMutex);//通过互斥锁实现线程安全队列
// lock 在此离开作用域时,会自动释放互斥量
while (!_connectionQue.empty())
{
cv.wait(lock); // 队列不空,此处生产线程进入等待状态
}

// 连接数量没有到达上限,继续创建新的连接
if (_connectionCnt < _maxSize)
{
Connection *p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
p->refreshAliveTime(); // 刷新一下开始空闲的起始时间
_connectionQue.push(p);
_connectionCnt++;
}

// 通知消费者线程,可以消费连接了
cv.notify_all();
}
}

消费者线程

  • cv.wait_for(lock, std::chrono::milliseconds(_connectionTimeout)) 是等待条件变为真或超时的操作。它会在等待的过程中释放锁,允许其他线程在这段时间内访问共享资源。等待的时间由 _connectionTimeout 指定,单位是毫秒。

    整个 if 语句检查条件变量的等待状态是否是超时状态。如果等待超时,表示在指定的时间内条件变为真的情况未发生。

    1
    if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))

    这里如果写成下面的语句:

    1
    if (cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))

    又可能出现并不是超时唤醒,而是被其它的进程唤醒了的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
unique_lock<mutex> lock(_queueMutex);//涉及到操作队列,需要保证线程安全,所以需要上锁
while (_connectionQue.empty())//如果连接池空了,需要
{
// sleep
//如果在超时时间内仍没有获取到,获取失败
if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))
{
if (_connectionQue.empty())
{
LOG("获取空闲连接超时了...获取连接失败!");
return nullptr;
}
}
}

/*
shared_ptr智能指针析构时,会把connection资源直接delete掉,相当于
调用connection的析构函数,connection就被close掉了。
这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到queue当中
*/
shared_ptr<Connection> sp(_connectionQue.front(),
[&](Connection *pcon) {
// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
unique_lock<mutex> lock(_queueMutex);
pcon->refreshAliveTime(); // 刷新一下开始空闲的起始时间
_connectionQue.push(pcon);
});

_connectionQue.pop();
cv.notify_all(); // 消费完连接以后,通知生产者线程检查一下,如果队列为空了,赶紧生产连接

return sp;
}

利用智能指针实现给外部提供接口

如果是给外界返回一个指针,还得实现一个把连接归还给连接池的过程比较复杂。

1
Conection* getConnection();

可以返回一个智能指针,智能指针出作用域后会自动析构,我们可以重定义一些删除器让其在析构的时候把连接归还给连接池

1
2
#include <memory>
shared_ptr<Connection> getConnection();
1
2
3
4
5
6
7
shared_ptr<Connection> sp(_connectionQue.front(), 
[&](Connection *pcon) {
// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作
unique_lock<mutex> lock(_queueMutex);
pcon->refreshAliveTime(); // 刷新一下开始空闲的起始时间
_connectionQue.push(pcon);
});
  • std::shared_ptr<Connection> 是一个智能指针,用于管理 Connection 类型的对象。这里使用 _connectionQue.front() 获取队列头部的 Connection 指针作为被 shared_ptr 所管理的对象。

  • 第二个参数是一个自定义的删除器,使用了 lambda 表达式 ([&](Connection *pcon) {...})。这个删除器会在 shared_ptr 管理的对象引用计数变为零时被调用,用于自定义对象的销毁行为。

    • 在这个例子中,当 shared_ptr 的引用计数变为零时(没有任何 shared_ptr 持有这个对象时),lambda 表达式会被调用。
    • 在 lambda 表达式内部,首先通过互斥量 _queueMutex 对队列进行锁定(std::unique_lock<std::mutex>lock(_queueMutex);)。这是因为涉及到队列的操作,需要考虑线程安全。
    • 然后调用 pcon->refreshAliveTime() 刷新 Connection 对象的空闲时间。
    • 最后,将 pcon 重新放入队列中 _connectionQue.push(pcon)

这样,通过自定义删除器,你可以在 Connection 对象被释放时执行一些额外的操作。在这个例子中,是刷新空闲时间并将连接重新放入队列。这通常用于对象的资源管理和回收。

  • lambda表达式部分解释

    • [&]: 这是 lambda 表达式的捕获列表(capture list),用于指定在 lambda 函数体内可访问的外部变量。[&] 表示以引用方式捕获所有外部变量,即在 lambda 函数体内,可以访问包含这个 lambda 的函数作用域内的所有变量,并且可以修改这些变量的值。
    • (Connection *pcon): 这是 lambda 表达式的参数列表,表示 lambda 函数接受一个名为 pcon,类型为 Connection* 的参数。
    • { /* ... */ }: 这是 lambda 表达式的函数体,包含了 lambda 函数要执行的代码块。

    综合起来,[&](Connection *pcon) { /* ... */ } 这个 lambda 表达式表示一个接受一个 Connection* 类型的参数 pcon 的函数,它以引用方式捕获了包含它的作用域内的所有变量,并在函数体内执行一些操作。

最大空闲时间回收连接扫描线程

时间的记录

1
2
3
4
5
6
7
8
	// 刷新一下连接的起始的空闲时间点
void refreshAliveTime() { _alivetime = clock(); } //在每次的队列push的操作时,要刷新一下空闲的起始时间
// 返回存活的时间
//const: 这里的 const 表示这是一个常量成员函数。在常量成员函数内部,不能修改该对象的任何非 mutable 成员变量。
clock_t getAliveeTime()const { return clock() - _alivetime; }
private:
MYSQL *_conn; // 表示和MySQL Server的一条连接
clock_t _alivetime; // 记录进入空闲状态后的起始存活时间

定时线程

在构造函数里面进行线程的初始化:

1
2
3
// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));
scanner.detach();

scannerConnectionTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void ConnectionPool::scannerConnectionTask()
{
for (;;)
{
// 通过sleep模拟定时效果
this_thread::sleep_for(chrono::seconds(_maxIdleTime));

// 扫描整个队列,释放多余的连接
unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
//当当前的连接数量大于初始化最大队列大小的时候才进行检测,有点像缓冲区的设计思想。
{
Connection *p = _connectionQue.front();
if (p->getAliveeTime() >= (_maxIdleTime * 1000))
{
_connectionQue.pop();
_connectionCnt--;
delete p; // 调用~Connection()释放连接
}
else
{
break; // 队头的连接没有超过_maxIdleTime,其它连接肯定没有
}
}
}
}