本人一直在寻找一个跨平台的网络库,boost与ACE比较庞大,不考虑。对比了libevent,libev,libuv后,最终选择了libuv.可libuv文档少,例子也简单,对于tcp只有个echo-server的例子。网上也找过对其封装的例子,如下
libsourcey库,封装了许多库。对libuv的封装跟其他代码耦合比较紧,难为剥离 http://sourcey.com/libuv-cpp-wrappers/
C++11封装的,可惜VS10未完全支持C++11 https://github.com/larroy/uvpp
C++封装的 https://github.com/keepallsimple/uvpp 本人想实现一个raw tcp server,支持上万链接数的,网上找到的都没合适我的,没办法只能参照各例子自己封装了。 头文件
- /***************************************
- * @file tcpsocket.h
- * @brief 基于libuv封装的tcp服务器与客户端,使用log4z作日志工具
- * @details
- * @author phata, wqvbjhc@gmail.com
- * @date 2014-5-13
- * @mod 2014-5-13 phata 修正服务器与客户端的错误.现服务器支持多客户端连接
- 修改客户端测试代码,支持并发多客户端测试
- ****************************************/
- #ifndef TCPSocket_H
- #define TCPSocket_H
- #include "uv.h"
- #include <string>
- #include <list>
- #include <map>
- #define BUFFERSIZE (1024*1024)
-
- namespace uv
- {
- typedef void (*newconnect)(int clientid);
- typedef void (*server_recvcb)(int cliendid, const char* buf, int bufsize);
- typedef void (*client_recvcb)(const char* buf, int bufsize, void* userdata);
-
- class TCPServer;
- class clientdata
- {
- public:
- clientdata(int clientid):client_id(clientid),recvcb_(nullptr) {
- client_handle = (uv_tcp_t*)malloc(sizeof(*client_handle));
- client_handle->data = this;
- readbuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
- writebuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
- }
- virtual ~clientdata() {
- free(readbuffer.base);
- readbuffer.base = nullptr;
- readbuffer.len = 0;
-
- free(writebuffer.base);
- writebuffer.base = nullptr;
- writebuffer.len = 0;
-
- free(client_handle);
- client_handle = nullptr;
- }
- int client_id;//客户端id,惟一
- uv_tcp_t* client_handle;//客户端句柄
- TCPServer* tcp_server;//服务器句柄(保存是因为某些回调函数需要到)
- uv_buf_t readbuffer;//接受数据的buf
- uv_buf_t writebuffer;//写数据的buf
- uv_write_t write_req;
- server_recvcb recvcb_;//接收数据回调给用户的函数
- };
-
-
- class TCPServer
- {
- public:
- TCPServer(uv_loop_t* loop = uv_default_loop());
- virtual ~TCPServer();
- static void StartLog(const char* logpath = nullptr);//启动日志,必须启动才会生成日志
- public:
- //基本函数
- bool Start(const char *ip, int port);//启动服务器,地址为IP4
- bool Start6(const char *ip, int port);//启动服务器,地址为IP6
- void close();
-
- bool setNoDelay(bool enable);
- bool setKeepAlive(int enable, unsigned int delay);
-
- const char* GetLastErrMsg() const {
- return errmsg_.c_str();
- };
-
- virtual int send(int clientid, const char* data, std::size_t len);
- virtual void setnewconnectcb(newconnect cb);
- virtual void setrecvcb(int clientid,server_recvcb cb);//设置接收回调函数,每个客户端各有一个
- protected:
- int GetAvailaClientID()const;//获取可用的client id
- bool DeleteClient(int clientid);//删除链表中的客户端
- //静态回调函数
- static void AfterServerRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
- static void AfterSend(uv_write_t *req, int status);
- static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
- static void AfterServerClose(uv_handle_t *handle);
- static void AfterClientClose(uv_handle_t *handle);
- static void acceptConnection(uv_stream_t *server, int status);
-
- private:
- bool init();
- bool run(int status = UV_RUN_DEFAULT);
- bool bind(const char* ip, int port);
- bool bind6(const char* ip, int port);
- bool listen(int backlog = 1024);
-
-
- uv_tcp_t server_;//服务器链接
- std::map<int,clientdata*> clients_list_;//子客户端链接
- uv_mutex_t mutex_handle_;//保护clients_list_
- uv_loop_t *loop_;
- std::string errmsg_;
- newconnect newconcb_;
- bool isinit_;//是否已初始化,用于close函数中判断
- };
-
-
-
- class TCPClient
- {
- //直接调用connect/connect6会进行连接
- public:
- TCPClient(uv_loop_t* loop = uv_default_loop());
- virtual ~TCPClient();
- static void StartLog(const char* logpath = nullptr);//启动日志,必须启动才会生成日志
- public:
- //基本函数
- virtual bool connect(const char* ip, int port);//启动connect线程,循环等待直到connect完成
- virtual bool connect6(const char* ip, int port);//启动connect线程,循环等待直到connect完成
- virtual int send(const char* data, std::size_t len);
- virtual void setrecvcb(client_recvcb cb, void* userdata);////设置接收回调函数,只有一个
- void close();
-
- //是否启用Nagle算法
- bool setNoDelay(bool enable);
- bool setKeepAlive(int enable, unsigned int delay);
-
- const char* GetLastErrMsg() const {
- return errmsg_.c_str();
- };
- protected:
- //静态回调函数
- static void AfterConnect(uv_connect_t* handle, int status);
- static void AfterClientRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
- static void AfterSend(uv_write_t *req, int status);
- static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
- static void AfterClose(uv_handle_t *handle);
-
- static void ConnectThread(void* arg);//真正的connect线程
- static void ConnectThread6(void* arg);//真正的connect线程
-
- bool init();
- bool run(int status = UV_RUN_DEFAULT);
- private:
- enum {
- CONNECT_TIMEOUT,
- CONNECT_FINISH,
- CONNECT_ERROR,
- CONNECT_DIS,
- };
- uv_tcp_t client_;//客户端连接
- uv_loop_t *loop_;
- uv_write_t write_req_;//写时请求
- uv_connect_t connect_req_;//连接时请求
- uv_thread_t connect_threadhanlde_;//线程句柄
- std::string errmsg_;//错误信息
- uv_buf_t readbuffer_;//接受数据的buf
- uv_buf_t writebuffer_;//写数据的buf
- uv_mutex_t write_mutex_handle_;//保护write,保存前一write完成才进行下一write
-
- int connectstatus_;//连接状态
- client_recvcb recvcb_;//回调函数
- void* userdata_;//回调函数的用户数据
- std::string connectip_;//连接的服务器IP
- int connectport_;//连接的服务器端口号
- bool isinit_;//是否已初始化,用于close函数中判断
- };
-
- }
复制代码
源文件
- #include "tcpsocket.h"
- #include "log4z.h"
-
- std::string GetUVError(int retcode)
- {
- std::string err;
- err = uv_err_name(retcode);
- err +=":";
- err += uv_strerror(retcode);
- return std::move(err);
- }
-
- namespace uv
- {
- /*****************************************TCP Server*************************************************************/
- TCPServer::TCPServer(uv_loop_t* loop)
- :newconcb_(nullptr), isinit_(false)
- {
- loop_ = loop;
- }
-
-
- TCPServer::~TCPServer()
- {
- close();
- LOGI("tcp server exit.");
- }
-
- //初始化与关闭--服务器与客户端一致
- bool TCPServer::init()
- {
- if (isinit_) {
- return true;
- }
-
- if (!loop_) {
- errmsg_ = "loop is null on tcp init.";
- LOGE(errmsg_);
- return false;
- }
- int iret = uv_mutex_init(&mutex_handle_);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- iret = uv_tcp_init(loop_,&server_);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- isinit_ = true;
- server_.data = this;
- //iret = uv_tcp_keepalive(&server_, 1, 60);//调用此函数后后续函数会调用出错
- //if (iret) {
- // errmsg_ = GetUVError(iret);
- // return false;
- //}
- return true;
- }
-
- void TCPServer::close()
- {
- for (auto it = clients_list_.begin(); it!=clients_list_.end(); ++it) {
- auto data = it->second;
- uv_close((uv_handle_t*)data->client_handle,AfterClientClose);
- }
- clients_list_.clear();
-
- LOGI("close server");
- if (isinit_) {
- uv_close((uv_handle_t*) &server_, AfterServerClose);
- LOGI("close server");
- }
- isinit_ = false;
- uv_mutex_destroy(&mutex_handle_);
- }
-
- bool TCPServer::run(int status)
- {
- LOGI("server runing.");
- int iret = uv_run(loop_,(uv_run_mode)status);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
- //属性设置--服务器与客户端一致
- bool TCPServer::setNoDelay(bool enable)
- {
- int iret = uv_tcp_nodelay(&server_, enable ? 1 : 0);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- bool TCPServer::setKeepAlive(int enable, unsigned int delay)
- {
- int iret = uv_tcp_keepalive(&server_, enable , delay);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- //作为server时的函数
- bool TCPServer::bind(const char* ip, int port)
- {
- struct sockaddr_in bind_addr;
- int iret = uv_ip4_addr(ip, port, &bind_addr);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- LOGI("server bind ip="<<ip<<", port="<<port);
- return true;
- }
-
- bool TCPServer::bind6(const char* ip, int port)
- {
- struct sockaddr_in6 bind_addr;
- int iret = uv_ip6_addr(ip, port, &bind_addr);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr,0);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- LOGI("server bind ip="<<ip<<", port="<<port);
- return true;
- }
-
- bool TCPServer::listen(int backlog)
- {
- int iret = uv_listen((uv_stream_t*) &server_, backlog, acceptConnection);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- LOGI("server listen");
- return true;
- }
-
- bool TCPServer::Start( const char *ip, int port )
- {
- close();
- if (!init()) {
- return false;
- }
- if (!bind(ip,port)) {
- return false;
- }
- if (!listen(SOMAXCONN)) {
- return false;
- }
- if (!run()) {
- return false;
- }
- LOGI("start listen "<<ip<<": "<<port);
- return true;
- }
-
- bool TCPServer::Start6( const char *ip, int port )
- {
- close();
- if (!init()) {
- return false;
- }
- if (!bind6(ip,port)) {
- return false;
- }
- if (!listen(SOMAXCONN)) {
- return false;
- }
- if (!run()) {
- return false;
- }
- return true;
- }
-
- //服务器发送函数
- int TCPServer::send(int clientid, const char* data, std::size_t len)
- {
- auto itfind = clients_list_.find(clientid);
- if (itfind == clients_list_.end()) {
- errmsg_ = "can't find cliendid ";
- errmsg_ += std::to_string((long long)clientid);
- LOGE(errmsg_);
- return -1;
- }
- //自己控制data的生命周期直到write结束
- if (itfind->second->writebuffer.len < len) {
- itfind->second->writebuffer.base = (char*)realloc(itfind->second->writebuffer.base,len);
- itfind->second->writebuffer.len = len;
- }
- memcpy(itfind->second->writebuffer.base,data,len);
- uv_buf_t buf = uv_buf_init((char*)itfind->second->writebuffer.base,len);
- int iret = uv_write(&itfind->second->write_req, (uv_stream_t*)itfind->second->client_handle, &buf, 1, AfterSend);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- //服务器-新客户端函数
- void TCPServer::acceptConnection(uv_stream_t *server, int status)
- {
- if (!server->data) {
- return;
- }
- TCPServer *tcpsock = (TCPServer *)server->data;
- int clientid = tcpsock->GetAvailaClientID();
- clientdata* cdata = new clientdata(clientid);//uv_close回调函数中释放
- cdata->tcp_server = tcpsock;//保存服务器的信息
- int iret = uv_tcp_init(tcpsock->loop_, cdata->client_handle);//析构函数释放
- if (iret) {
- delete cdata;
- tcpsock->errmsg_ = GetUVError(iret);
- LOGE(tcpsock->errmsg_);
- return;
- }
- iret = uv_accept((uv_stream_t*)&tcpsock->server_, (uv_stream_t*) cdata->client_handle);
- if ( iret) {
- tcpsock->errmsg_ = GetUVError(iret);
- uv_close((uv_handle_t*) cdata->client_handle, NULL);
- delete cdata;
- LOGE(tcpsock->errmsg_);
- return;
- }
- tcpsock->clients_list_.insert(std::make_pair(clientid,cdata));//加入到链接队列
- if (tcpsock->newconcb_) {
- tcpsock->newconcb_(clientid);
- }
- LOGI("new client("<<cdata->client_handle<<") id="<< clientid);
- iret = uv_read_start((uv_stream_t*)cdata->client_handle, onAllocBuffer, AfterServerRecv);//服务器开始接收客户端的数据
- return;
- }
-
- //服务器-接收数据回调函数
- void TCPServer::setrecvcb(int clientid, server_recvcb cb )
- {
- auto itfind = clients_list_.find(clientid);
- if (itfind != clients_list_.end()) {
- itfind->second->recvcb_ = cb;
- }
- }
-
- //服务器-新链接回调函数
- void TCPServer::setnewconnectcb(newconnect cb )
- {
- newconcb_ = cb;
- }
-
- //服务器分析空间函数
- void TCPServer::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
- {
- if (!handle->data) {
- return;
- }
- clientdata *client = (clientdata*)handle->data;
- *buf = client->readbuffer;
- }
-
- void TCPServer::AfterServerRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
- {
- if (!handle->data) {
- return;
- }
- clientdata *client = (clientdata*)handle->data;//服务器的recv带的是clientdata
- if (nread < 0) {/* Error or EOF */
- TCPServer *server = (TCPServer *)client->tcp_server;
- if (nread == UV_EOF) {
- fprintf(stdout,"客户端(%d)连接断开,关闭此客户端\n",client->client_id);
- LOGW("客户端("<<client->client_id<<")主动断开");
- } else if (nread == UV_ECONNRESET) {
- fprintf(stdout,"客户端(%d)异常断开\n",client->client_id);
- LOGW("客户端("<<client->client_id<<")异常断开");
- } else {
- fprintf(stdout,"%s\n",GetUVError(nread));
- LOGW("客户端("<<client->client_id<<")异常断开:"<<GetUVError(nread));
- }
- server->DeleteClient(client->client_id);//连接断开,关闭客户端
- return;
- } else if (0 == nread) {/* Everything OK, but nothing read. */
-
- } else if (client->recvcb_) {
- client->recvcb_(client->client_id,buf->base,nread);
- }
- }
-
- //服务器与客户端一致
- void TCPServer::AfterSend(uv_write_t *req, int status)
- {
- if (status < 0) {
- LOGE("发送数据有误:"<<GetUVError(status));
- fprintf(stderr, "Write error %s\n", GetUVError(status));
- }
- }
-
- void TCPServer::AfterServerClose(uv_handle_t *handle)
- {
- //服务器,不需要做什么
- }
-
- void TCPServer::AfterClientClose(uv_handle_t *handle)
- {
- clientdata *cdata = (clientdata*)handle->data;
- LOGI("client "<<cdata->client_id<<" had closed.");
- delete cdata;
- }
-
- int TCPServer::GetAvailaClientID() const
- {
- static int s_id = 0;
- return ++s_id;
- }
-
- bool TCPServer::DeleteClient( int clientid )
- {
- uv_mutex_lock(&mutex_handle_);
- auto itfind = clients_list_.find(clientid);
- if (itfind == clients_list_.end()) {
- errmsg_ = "can't find client ";
- errmsg_ += std::to_string((long long)clientid);
- LOGE(errmsg_);
- uv_mutex_unlock(&mutex_handle_);
- return false;
- }
- if (uv_is_active((uv_handle_t*)itfind->second->client_handle)) {
- uv_read_stop((uv_stream_t*)itfind->second->client_handle);
- }
- uv_close((uv_handle_t*)itfind->second->client_handle,AfterClientClose);
-
- clients_list_.erase(itfind);
- LOGI("删除客户端"<<clientid);
- uv_mutex_unlock(&mutex_handle_);
- return true;
- }
-
-
- void TCPServer::StartLog( const char* logpath /*= nullptr*/ )
- {
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);
- if (logpath) {
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);
- }
- zsummer::log4z::ILog4zManager::GetInstance()->Start();
- }
-
-
- /*****************************************TCP Client*************************************************************/
- TCPClient::TCPClient(uv_loop_t* loop)
- :recvcb_(nullptr),userdata_(nullptr)
- ,connectstatus_(CONNECT_DIS)
- , isinit_(false)
- {
- readbuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);
- writebuffer_ = uv_buf_init((char*) malloc(BUFFERSIZE), BUFFERSIZE);
- loop_ = loop;
- connect_req_.data = this;
- write_req_.data = this;
- }
-
-
- TCPClient::~TCPClient()
- {
- free(readbuffer_.base);
- readbuffer_.base = nullptr;
- readbuffer_.len = 0;
- free(writebuffer_.base);
- writebuffer_.base = nullptr;
- writebuffer_.len = 0;
- close();
- LOGI("客户端("<<this<<")退出");
- }
- //初始化与关闭--服务器与客户端一致
- bool TCPClient::init()
- {
- if (isinit_) {
- return true;
- }
-
- if (!loop_) {
- errmsg_ = "loop is null on tcp init.";
- LOGE(errmsg_);
- return false;
- }
- int iret = uv_tcp_init(loop_,&client_);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- iret = uv_mutex_init(&write_mutex_handle_);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- isinit_ = true;
- fprintf(stdout,"客户端(%p) init type = %d\n",&client_,client_.type);
- client_.data = this;
- //iret = uv_tcp_keepalive(&client_, 1, 60);//
- //if (iret) {
- // errmsg_ = GetUVError(iret);
- // return false;
- //}
- LOGI("客户端("<<this<<")Init");
- return true;
- }
-
- void TCPClient::close()
- {
- if (!isinit_) {
- return;
- }
- uv_mutex_destroy(&write_mutex_handle_);
- uv_close((uv_handle_t*) &client_, AfterClose);
- LOGI("客户端("<<this<<")close");
- isinit_ = false;
- }
-
- bool TCPClient::run(int status)
- {
- LOGI("客户端("<<this<<")run");
- int iret = uv_run(loop_,(uv_run_mode)status);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- //属性设置--服务器与客户端一致
- bool TCPClient::setNoDelay(bool enable)
- {
- //http://blog.csdn.net/u011133100/article/details/21485983
- int iret = uv_tcp_nodelay(&client_, enable ? 1 : 0);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- bool TCPClient::setKeepAlive(int enable, unsigned int delay)
- {
- int iret = uv_tcp_keepalive(&client_, enable , delay);
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- //作为client的connect函数
- bool TCPClient::connect(const char* ip, int port)
- {
- close();
- init();
- connectip_ = ip;
- connectport_ = port;
- LOGI("客户端("<<this<<")start connect to server("<<ip<<":"<<port<<")");
- int iret = uv_thread_create(&connect_threadhanlde_, ConnectThread, this);//触发AfterConnect才算真正连接成功,所以用线程
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- while ( connectstatus_ == CONNECT_DIS) {
- #if defined (WIN32) || defined(_WIN32)
- Sleep(100);
- #else
- usleep((100) * 1000)
- #endif
- }
- return connectstatus_ == CONNECT_FINISH;
- }
-
- bool TCPClient::connect6(const char* ip, int port)
- {
- close();
- init();
- connectip_ = ip;
- connectport_ = port;
- LOGI("客户端("<<this<<")start connect to server("<<ip<<":"<<port<<")");
- int iret = uv_thread_create(&connect_threadhanlde_, ConnectThread6, this);//触发AfterConnect才算真正连接成功,所以用线程
- if (iret) {
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- while ( connectstatus_ == CONNECT_DIS) {
- //fprintf(stdout,"client(%p) wait, connect status %d\n",this,connectstatus_);
- #if defined (WIN32) || defined(_WIN32)
- Sleep(100);
- #else
- usleep((100) * 1000)
- #endif
- }
- return connectstatus_ == CONNECT_FINISH;
- }
-
- void TCPClient::ConnectThread( void* arg )
- {
- TCPClient *pclient = (TCPClient*)arg;
- fprintf(stdout,"client(%p) ConnectThread start\n",pclient);
- struct sockaddr_in bind_addr;
- int iret = uv_ip4_addr(pclient->connectip_.c_str(), pclient->connectport_, &bind_addr);
- if (iret) {
- pclient->errmsg_ = GetUVError(iret);
- LOGE(pclient->errmsg_);
- return;
- }
- iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);
- if (iret) {
- pclient->errmsg_ = GetUVError(iret);
- LOGE(pclient->errmsg_);
- return;
- }
- fprintf(stdout,"client(%p) ConnectThread end, connect status %d\n",pclient, pclient->connectstatus_);
- pclient->run();
- }
-
-
- void TCPClient::ConnectThread6( void* arg )
- {
- TCPClient *pclient = (TCPClient*)arg;
- LOGI("客户端("<<pclient<<")Enter Connect Thread.");
- fprintf(stdout,"client(%p) ConnectThread start\n",pclient);
- struct sockaddr_in6 bind_addr;
- int iret = uv_ip6_addr(pclient->connectip_.c_str(), pclient->connectport_, &bind_addr);
- if (iret) {
- pclient->errmsg_ = GetUVError(iret);
- LOGE(pclient->errmsg_);
- return;
- }
- iret = uv_tcp_connect(&pclient->connect_req_, &pclient->client_, (const sockaddr*)&bind_addr, AfterConnect);
- if (iret) {
- pclient->errmsg_ = GetUVError(iret);
- LOGE(pclient->errmsg_);
- return;
- }
- fprintf(stdout,"client(%p) ConnectThread end, connect status %d\n",pclient, pclient->connectstatus_);
- LOGI("客户端("<<pclient<<")Leave Connect Thread. connect status "<<pclient->connectstatus_);
- pclient->run();
- }
-
- void TCPClient::AfterConnect(uv_connect_t* handle, int status)
- {
- fprintf(stdout,"start after connect\n");
- TCPClient *pclient = (TCPClient*)handle->handle->data;
- if (status) {
- pclient->connectstatus_ = CONNECT_ERROR;
- fprintf(stdout,"connect error:%s\n",GetUVError(status));
- return;
- }
-
- int iret = uv_read_start(handle->handle, onAllocBuffer, AfterClientRecv);//客户端开始接收服务器的数据
- if (iret) {
- fprintf(stdout,"uv_read_start error:%s\n",GetUVError(iret));
- pclient->connectstatus_ = CONNECT_ERROR;
- } else {
- pclient->connectstatus_ = CONNECT_FINISH;
- }
- LOGI("客户端("<<pclient<<")run");
- fprintf(stdout,"end after connect\n");
- }
-
- //客户端的发送函数
- int TCPClient::send(const char* data, std::size_t len)
- {
- //自己控制data的生命周期直到write结束
- if (!data || len <= 0) {
- errmsg_ = "send data is null or len less than zero.";
- return 0;
- }
-
- uv_mutex_lock(&write_mutex_handle_);
- if (writebuffer_.len < len) {
- writebuffer_.base = (char*)realloc(writebuffer_.base,len);
- writebuffer_.len = len;
- }
- memcpy(writebuffer_.base,data,len);
- uv_buf_t buf = uv_buf_init((char*)writebuffer_.base,len);
- int iret = uv_write(&write_req_, (uv_stream_t*)&client_, &buf, 1, AfterSend);
- if (iret) {
- uv_mutex_unlock(&write_mutex_handle_);
- errmsg_ = GetUVError(iret);
- LOGE(errmsg_);
- return false;
- }
- return true;
- }
-
- //客户端-接收数据回调函数
- void TCPClient::setrecvcb(client_recvcb cb, void* userdata )
- {
- recvcb_ = cb;
- userdata_ = userdata;
- }
-
- //客户端分析空间函数
- void TCPClient::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
- {
- if (!handle->data) {
- return;
- }
- TCPClient *client = (TCPClient*)handle->data;
- *buf = client->readbuffer_;
- }
-
-
- void TCPClient::AfterClientRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
- {
- if (!handle->data) {
- return;
- }
- TCPClient *client = (TCPClient*)handle->data;//服务器的recv带的是TCPClient
- if (nread < 0) {
- if (nread == UV_EOF) {
- fprintf(stdout,"服务器(%p)主动断开\n",handle);
- LOGW("服务器主动断开");
- } else if (nread == UV_ECONNRESET) {
- fprintf(stdout,"服务器(%p)异常断开\n",handle);
- LOGW("服务器异常断开");
- } else {
- fprintf(stdout,"服务器(%p)异常断开:%s\n",handle,GetUVError(nread));
- LOGW("服务器异常断开"<<GetUVError(nread));
- }
- uv_close((uv_handle_t*)handle, AfterClose);
- return;
- }
- if (nread > 0 && client->recvcb_) {
- client->recvcb_(buf->base,nread,client->userdata_);
- }
- }
-
- //服务器与客户端一致
- void TCPClient::AfterSend(uv_write_t *req, int status)
- {
- TCPClient *client = (TCPClient *)req->handle->data;
- uv_mutex_unlock(&client->write_mutex_handle_);
- if (status < 0) {
- LOGE("发送数据有误:"<<GetUVError(status));
- fprintf(stderr, "Write error %s\n", GetUVError(status));
- }
- }
- //服务器与客户端一致
- void TCPClient::AfterClose(uv_handle_t *handle)
- {
- fprintf(stdout,"客户端(%p)已close\n",handle);
- LOGI("客户端("<<handle<<")已close");
- }
-
- void TCPClient::StartLog( const char* logpath /*= nullptr*/ )
- {
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerMonthdir(LOG4Z_MAIN_LOGGER_ID, true);
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerDisplay(LOG4Z_MAIN_LOGGER_ID,false);
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLevel(LOG4Z_MAIN_LOGGER_ID,LOG_LEVEL_DEBUG);
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerLimitSize(LOG4Z_MAIN_LOGGER_ID,100);
- if (logpath) {
- zsummer::log4z::ILog4zManager::GetInstance()->SetLoggerPath(LOG4Z_MAIN_LOGGER_ID,logpath);
- }
- zsummer::log4z::ILog4zManager::GetInstance()->Start();
- }
-
- }
复制代码
原文地址:
- http://www.cnblogs.com/wqvbjhc/p/3757582.html
复制代码
|