客户端连接池

TcpSocket.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#pragma once
#include "TcpSocket.h"
#include <string>
using namespace std;

// 初始化连接池的结构体
struct PoolParam
{
int bounds; //池容量
int connecttime;
int sendtime;
int revtime;
string serverip;
unsigned short serverport;
};

class PoolSocket
{
public:
enum ErrorType {
ParamErr = 3000 + 1,
TimeOut,
PeerClose,
MallocErr,
CreateConnErr, // 创建连接池 (没有达到最大连接数)
terminated, // 已终止
ValidIsZero, // 有效连接数是零
HaveExist, // 连接已经在池中
ValidBounds // 有效连接数目超过了最大连接数
};
PoolSocket();
~PoolSocket();

int poolInit(PoolParam *param);
// 从连接池中获取一条连接
TcpSocket* getConnect();
// 将连接放回到连接池
int putConnect(TcpSocket* sock, bool isValid);
// 释放连接池资源
void poolDestory();
int curConnSize();

private:
void connectServer(bool recursion = true);

private:
void* m_handle;
};

TcpSocket.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#include "PoolSocket.h"
#include <string.h>
#include <pthread.h>
#include <queue>
#include <iostream>
#include <unistd.h>
#include <string.h>
using namespace std;

// Socket连接池结构PoolHandle
struct PoolHandle
{
queue<TcpSocket*> sockList; // 存储可以通信的套接字对象
int bounds; // Socket连接池的容量

string serverip;
unsigned short serverport;

int connecttime;
int sTimeout; // 没有连接时,等待之间
pthread_mutex_t foo_mutex;
};

PoolSocket::PoolSocket()
{
}

PoolSocket::~PoolSocket()
{
}

int PoolSocket::poolInit(PoolParam * param)
{
int ret = 0;
PoolHandle *hdl = new PoolHandle;
m_handle = hdl;
//初始化 句柄
if (hdl == NULL)
{
ret = MallocErr;
return ret;
}

// 数据初始化
hdl->serverip = param->serverip;
hdl->serverport = param->serverport;
hdl->connecttime = param->connecttime;
//处理连接数
hdl->bounds = param->bounds;
hdl->sTimeout = 100;

pthread_mutex_init(&(hdl->foo_mutex), NULL);

pthread_mutex_lock(&(hdl->foo_mutex)); //流程加锁
// 创建用于通信的套接字对象
connectServer();
pthread_mutex_unlock(&(hdl->foo_mutex)); //解锁

return ret;
}

TcpSocket* PoolSocket::getConnect()
{
PoolHandle *hdl = static_cast<PoolHandle*>(m_handle);
// 流程加锁 pthread_mutex_unlock(& (hdl->foo_mutex) ); //解锁
pthread_mutex_lock(&(hdl->foo_mutex));

// 若 有效连数 = 0
if (hdl->sockList.size() == 0)
{
usleep(hdl->sTimeout); //等上几微妙
// 还是没有可用的连接
if (hdl->sockList.size() == 0)
{
return NULL;
}
}
// 从对头取出一条连接, 并将该节点弹出
TcpSocket* sock = hdl->sockList.front();
hdl->sockList.pop();
cout << "取出一条连接, 剩余连接数: " << curConnSize() << endl;

pthread_mutex_unlock(&(hdl->foo_mutex)); //解锁

return sock;
}

int PoolSocket::putConnect(TcpSocket* sock, bool isValid)
{
int ret = 0;
PoolHandle *hdl = static_cast<PoolHandle*>(m_handle);
pthread_mutex_lock(&(hdl->foo_mutex)); //流程加锁

// 判断连接是否已经被 放进来
// 判断该连接是否已经被释放
if (isValid)
{
// 连接可用, 放入队列
hdl->sockList.push(sock);
cout << "放回一条连接, 剩余连接数: " << curConnSize() << endl;
}
else
{
// 套接字不可用, 析构对象, 在创建一个新的连接
sock->disConnect();
delete sock;
connectServer(false);
cout << "修复一条连接, 剩余连接数: " << curConnSize() << endl;
}
pthread_mutex_unlock(&(hdl->foo_mutex)); //解锁

return ret;
}

void PoolSocket::poolDestory()
{
PoolHandle *hdl = static_cast<PoolHandle*>(m_handle);
// 遍历队列
while (hdl->sockList.size() != 0)
{
// 取出对头元素
TcpSocket* sock = hdl->sockList.front();
// 弹出对头原始
hdl->sockList.pop();
// 释放内存
delete sock;
}
delete hdl;
}

int PoolSocket::curConnSize()
{
PoolHandle *hdl = static_cast<PoolHandle*>(m_handle);
return hdl->sockList.size();
}

void PoolSocket::connectServer(bool recursion)
{
PoolHandle *hdl = static_cast<PoolHandle*>(m_handle);
if ((int)hdl->sockList.size() == hdl->bounds)
{
cout << "连接池对象初始化完毕, ^_^ ..." << endl;
cout << "Poll Size: " << hdl->sockList.size() << endl;
cout << "Poll bounds: " << hdl->bounds << endl;
return;
}
TcpSocket* socket = new TcpSocket;
char* ip = const_cast<char*>(hdl->serverip.data());
int ret = socket->connectToHost(ip, hdl->serverport, hdl->connecttime);
if (ret == 0)
{
// 成功连接服务器
hdl->sockList.push(socket);
cout << "Connect count: " << hdl->sockList.size() << endl;
}
else
{
// 失败
cout << "连接服务器失败 - index: " << hdl->sockList.size()+1 << endl;
// 释放对象
delete socket;
}
if (recursion)
{
// 递归调用
connectServer();
}
}

main.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include "PoolSocket.h"
#include <queue>
using namespace std;

int main()
{
PoolSocket pool;
// 连接池的结构体
PoolParam param;
param.bounds = 10;
param.connecttime = 100;
param.revtime = 100;
param.sendtime = 100;
param.serverip = "127.0.0.1";
param.serverport = 9999;
pool.poolInit(&param);
queue<TcpSocket*> list;
while (pool.curConnSize())
{
static int i = 0;
TcpSocket* sock = pool.getConnect();
string str = "hello, server ... " + to_string(i++);
sock->sendMsg((char*)str.c_str(), str.size());
list.push(sock);
}
while (!list.empty())
{
TcpSocket* t = list.front();
pool.putConnect(t, false);
list.pop();
}
cout << "max value: " << pool.curConnSize() << endl;
while (1);
return 0;
}