集群和分布式 集群:每台服务器独立运行一个工程的所有模块
分布式:一个工程拆分很多模块,每个模块独立部署运行在一个服务器主机上,所有服务器协同工作共同提供服务,每一台服务器称作分布式的一个节点,根据节点的并发要求,对一个节点可以再做节点模块集群部署。
RPC通信原理 RPC(Remote Procedure Call Protocol)远程过程调用协议
这里使用protobuf而不使用json?
protobuf是二进制存储;xml和json都是文本存储
protobuf不需要存储额外的信息;json怎么存储数据的呢?
name:”zhangsan”,pwd:”123456”,protobuf就是“zhangsan””123456”.
分布式网络框架开发 配置文件 1 2 3 4 5 6 7 8 rpcserverip=127.0.0.1 rpcserverport=8000 zookeeperip=127.0.0.1 zookeeperport=5000
用户可以修改配置文件来修改rpc节点和zookeeper的ip和端口。
框架读取配置文件类 mprpcconfig.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #pragma once #include <unordered_map> #include <string> class MprpcConfig { public : void LoadConfigFile (const char *config_file) ; std::string Load (std::string key) ; private : std::unordered_map<std::string,std::string> m_configMap; void Trim (std::string &src_buf) ; };
mprpcconfig.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 #include "mprpcconfig.h" #include <iostream> #include <string> void MprpcConfig::LoadConfigFile (const char *config_file) { FILE *pf = fopen (config_file, "r" ); if (nullptr == pf) { std::cout << config_file << "is not exist!" << std::endl; exit (EXIT_FAILURE); } while (!feof (pf)) { char buf[512 ] = {0 }; fgets (buf, 512 , pf); std::string read_buf (buf) ; Trim (read_buf); if (read_buf[0 ] == '#' || read_buf.empty ()) { continue ; } int idx = read_buf.find ('=' ); if (idx == -1 ) { continue ; } std::string key; std::string value; key = read_buf.substr (0 , idx); Trim (key); int endidx=read_buf.find ('\n' ,idx); value = read_buf.substr (idx + 1 , endidx - idx-1 ); Trim (value); m_configMap.insert ({key, value}); } } std::string MprpcConfig::Load (std::string key) { auto it = m_configMap.find (key); if (it == m_configMap.end ()) { return "" ; } return it->second; } void MprpcConfig::Trim (std::string &src_buf) { int idx = src_buf.find_first_not_of (' ' ); if (idx != -1 ) { src_buf = src_buf.substr (idx, src_buf.size () - idx); } idx = src_buf.find_last_not_of (' ' ); if (idx != -1 ) { src_buf = src_buf.substr (0 , idx + 1 ); } }
在mprpc框架的基础类MprpcApplication测试
1 2 3 4 5 6 m_config.LoadConfigFile (config_file.c_str ());
在bin目录命令行测试
结果:可以看到使用无序哈希表正确保存了
1 2 3 4 rpcserverip:127.0 .0 .1 rpcserverip:8000 rpcserverip:127.0 .0 .1 rpcserverip:5000
框架提供的专门服务发布rpc服务的网络对象类RpcProvider 专门在run函数中实现网络服务接收消息,使用muduo的网络板块
增加成员变量
1 2 muduo::net::EventLoop m_eventLoop;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void RpcProvider::Run () { std::string ip = MprpcApplication::GetInstance ().GetMprpcConfig ().Load ("rpcserverip" ); uint16_t port = atoi (MprpcApplication::GetInstance ().GetMprpcConfig ().Load ("rpcserverport" ).c_str ()); muduo::net::InetAddress address (ip, port) ; muduo::net::TcpServer server (&m_eventLoop, address, "RpcProvider" ) ; server.setConnectionCallback (std::bind (&RpcProvider::OnConnection, this , std::placeholders::_1)); server.setMessageCallback (std::bind (&RpcProvider::OnMessage, this , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); server.setThreadNum (4 ); std::cout << "RpcProvider start service at ip:" << ip << "port:" << port << std::endl; server.start (); m_eventLoop.loop (); }
提供给外部使用的,可以发布rpc方法的函数接口NotifyService 。
这里把对应服务和方法注册到一张表中。
增加成员变量
1 2 3 4 5 6 7 8 9 10 struct ServiceInfo { google::protobuf::Service * m_service; std::unordered_map < std::string, const google::protobuf::MethodDescriptor * > m_methodMap; }; std::unordered_map < std::string, ServiceInfo > m_serviceMap;
实现NotifyService 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void RpcProvider::NotifyService (google::protobuf::Service * service) { ServiceInfo service_Info; const google::protobuf::ServiceDescriptor * pserviceDesc = service -> GetDescriptor (); std::string service_name = std::string (pserviceDesc -> name ()); int methodCnt = pserviceDesc -> method_count (); std::cout << "service_name:" << service_name << std::endl; for (int i = 0 ; i < methodCnt; ++i) { const google::protobuf::MethodDescriptor * pmethodDesc = pserviceDesc -> method (i); std::string method_name = std::string (pmethodDesc -> name ()); service_Info.m_methodMap.insert ({ method_name, pmethodDesc }); std::cout << "method_name:" << method_name << std::endl; } service_Info.m_service = service; m_serviceMap.insert ({ service_name, service_Info }); }
接下来通过muduo服务器在连接和接收消息产生的回调进行网络信息的解析
这里在对网络字符流要解析。所以要规定格式RpcHeader就是header_str,所以在这个消息头前还要说明其大小。在消息末尾还要避免粘包问题,所以在消息头加上参数字符串大小。
rpcheader.proto
1 2 3 4 5 6 7 8 9 syntax="proto3" ; package mprpc;message RpcHeader { bytes service_name=1 ; bytes method_name=2 ; uint32 args_size=3 ; }
1 2 3 4 5 6 7 8 9 void OnConnection (const muduo::net::TcpConnectionPtr & ) ;void OnMessage (const muduo::net::TcpConnectionPtr & , muduo::net::Buffer * , muduo::Timestamp) ;void SendRpcResponse (const muduo::net::TcpConnectionPtr & , google::protobuf::Message * ) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 void RpcProvider::OnConnection (const muduo::net::TcpConnectionPtr &conn) { if (!conn->connected ()) { conn->shutdown (); } } void RpcProvider::OnMessage (const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp) { std::string recv_buf = buffer->retrieveAllAsString (); uint32_t header_size = 0 ; recv_buf.copy ((char *)&header_size, 4 , 0 ); std::string rpc_header_str = recv_buf.substr (4 , header_size); mprpc::RpcHeader rpcHeader; std::string service_name; std::string method_name; uint32_t args_size; if (rpcHeader.ParseFromString (rpc_header_str)) { service_name = rpcHeader.service_name (); method_name = rpcHeader.method_name (); args_size = rpcHeader.args_size (); } else { std::cout << "rpc_header_str:" << rpc_header_str << "parse error!" << std::endl; return ; } std::string args_str = recv_buf.substr (4 + header_size, args_size); std::cout << "====================================" << std::endl; std::cout << "header_size:" << header_size << std::endl; std::cout << "rpc_header_str:" << rpc_header_str << std::endl; std::cout << "service_name:" << service_name << std::endl; std::cout << "method_name:" << method_name << std::endl; std::cout << "args_str:" << args_str << std::endl; std::cout << "====================================" << std::endl; auto it = m_serviceMap.find (service_name); if (it == m_serviceMap.end ()) { std::cout << service_name << "is not exist!" << std::endl; return ; } auto mit = it->second.m_methodMap.find (method_name); if (mit == it->second.m_methodMap.end ()) { std::cout << service_name << " : " << method_name << "is not exist!" << std::endl; return ; } auto service = it->second.m_service; auto method = mit->second; google::protobuf::Message *request = service->GetRequestPrototype (method).New (); if (!request->ParseFromString (args_str)) { std::cout << "request parse error! content:" << args_str << std::endl; return ; } google::protobuf::Message *response = service->GetResponsePrototype (method).New (); google::protobuf::Closure *done = google::protobuf::NewCallback <RpcProvider, const muduo::net::TcpConnectionPtr &, google::protobuf::Message *>(this , &RpcProvider::SendRpcResponse, conn, response); service->CallMethod (method, nullptr , request, response, done); } void RpcProvider::SendRpcResponse (const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response) { std::string response_str; if (response->SerializeToString (&response_str)) { conn->send (response_str); } else { std::cout<<"serialize response_str error!" <<std::endl; } conn->shutdown (); }
客户端发送 RPC 请求的统一接口MprpcChannel mprpcchannel.h
1 2 3 4 5 6 7 8 9 10 11 #pragma once #include <google/protobuf/service.h> class MprpcChannel : public google::protobuf::RpcChannel{ public : void CallMethod (const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, google::protobuf::Closure* done) ;};
mprpcchannel.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 #include "mprpcchannel.h" #include "rpcheader.pb.h" #include "mprpcapplication.h" #include <google/protobuf/descriptor.h> #include <google/protobuf/message.h> #include <string> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <errno.h> void MprpcChannel::CallMethod (const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller, const google::protobuf::Message *request, google::protobuf::Message *response, google::protobuf::Closure *done) { const google::protobuf::ServiceDescriptor *sd=method->service (); std::string service_name=std::string (sd->name ()); std::string method_name=std::string (method->name ()); uint32_t args_size=0 ; std::string args_str; if (request->SerializeToString (&args_str)) { args_size=args_str.size (); } else { controller->SetFailed ("serialize request error!" ); return ; } mprpc::RpcHeader rpcHeader; rpcHeader.set_service_name (service_name); rpcHeader.set_method_name (method_name); rpcHeader.set_args_size (args_size); uint32_t header_size=0 ; std::string rpc_header_str; if (rpcHeader.SerializeToString (&rpc_header_str)) { header_size=rpc_header_str.size (); } else { controller->SetFailed ("serialize rpc header error!" ); return ; } std::string send_rpc_str; send_rpc_str.insert (0 ,std::string ((char *)&header_size,4 )); send_rpc_str+=rpc_header_str; send_rpc_str+=args_str; std::cout << "====================================" << std::endl; std::cout << "header_size:" << header_size << std::endl; std::cout << "rpc_header_str:" << rpc_header_str << std::endl; std::cout << "service_name:" << service_name << std::endl; std::cout << "method_name:" << method_name << std::endl; std::cout << "args_str:" << args_str << std::endl; std::cout << "====================================" << std::endl; int clientfd=socket (AF_INET,SOCK_STREAM,0 ); if (-1 ==clientfd) { char errtext[512 ]={0 }; sprintf (errtext,"create socket error! errno:%d" ,errno); controller->SetFailed (errtext); return ; } std::string ip = MprpcApplication::GetInstance ().GetMprpcConfig ().Load ("rpcserverip" ); uint16_t port = atoi (MprpcApplication::GetInstance ().GetMprpcConfig ().Load ("rpcserverport" ).c_str ()); struct sockaddr_in server_addr; server_addr.sin_family=AF_INET; server_addr.sin_port=htons (port); server_addr.sin_addr.s_addr=inet_addr (ip.c_str ()); if (-1 ==connect (clientfd,(struct sockaddr *)&server_addr,sizeof (server_addr))) { close (clientfd); char errtext[512 ]={0 }; sprintf (errtext,"connect error! errno:%d" ,errno); controller->SetFailed (errtext); return ; } if (-1 ==send (clientfd,send_rpc_str.c_str (),send_rpc_str.size (),0 )) { close (clientfd); char errtext[512 ]={0 }; sprintf (errtext,"send error! errno:%d" ,errno); controller->SetFailed (errtext); return ; } char recv_buf[1024 ]={0 }; int recv_size=0 ; if (-1 ==(recv_size=recv (clientfd,recv_buf,1024 ,0 ))) { close (clientfd); char errtext[512 ]={0 }; sprintf (errtext,"recv error! errno:%d" ,errno); controller->SetFailed (errtext); return ; } if (!response->ParseFromArray (recv_buf,recv_size)) { close (clientfd); char errtext[2048 ]={0 }; sprintf (errtext,"parse error! response_str:%s" ,recv_buf); controller->SetFailed (errtext); return ; } close (clientfd); }
告知客户端RPC调用状态MprpcController mprpccontroller.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #pragma once #include <google/protobuf/service.h> #include <string> class MprpcController : public google::protobuf::RpcController{ public : MprpcController (); void Reset () ; bool Failed () const ; std::string ErrorText () const ; void SetFailed (const std::string& reason) ; void StartCancel () ; bool IsCanceled () const ; void NotifyOnCancel (google::protobuf::Closure *callback) ; private : bool m_failed; std::string m_errText; };
mprpccontroller.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #include "mprpccontroller.h" MprpcController::MprpcController () { m_failed = false ; m_errText = "" ; } void MprpcController::Reset () { m_failed = false ; m_errText = "" ; } bool MprpcController::Failed () const { return m_failed; } std::string MprpcController::ErrorText () const { return m_errText; } void MprpcController::SetFailed (const std::string &reason) { m_failed = true ; m_errText = reason; } void MprpcController::StartCancel () {}bool MprpcController::IsCanceled () const {return false ;}void MprpcController::NotifyOnCancel (google::protobuf::Closure *callback) {}
日志板块
lockqueue.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 #pragma once #include <queue> #include <thread> #include <mutex> #include <condition_variable> template <typename T>class LockQueue { public : void Push (const T&data) { std::lock_guard<std::mutex> lock (m_mutex) ; m_queue.push (data); m_condvariable.notify_one (); } T Pop () { std::unique_lock<std::mutex> lock (m_mutex) ; while (m_queue.empty ()) { m_condvariable.wait (lock); } T data=m_queue.front (); m_queue.pop (); return data; } private : std::queue<T> m_queue; std::mutex m_mutex; std::condition_variable m_condvariable; };
logger.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #pragma once #include "lockqueue.h" #include <string> enum LogLevel { INFO, ERROR, }; class Logger { public : static Logger &GetInstance () ; void SetLogLevel (LogLevel level) ; void Log (std::string msg) ; private : int m_loglevel; LockQueue<std::string> m_lckQue; Logger (); Logger (const Logger &) = delete ; Logger (Logger &&) = delete ; }; #define LOG_INFO(logmsgformat, ...) \ do \ { \ Logger &logger = Logger::GetInstance(); \ logger.SetLogLevel(INFO); \ char c[1024] = {0}; \ snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \ logger.Log(c); \ } while (0); #define LOG_ERR(logmsgformat, ...) \ do \ { \ Logger &logger = Logger::GetInstance(); \ logger.SetLogLevel(ERROR); \ char c[1024] = {0}; \ snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \ logger.Log(c); \ } while (0);
logger.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 #include "logger.h" #include <time.h> #include <iostream> Logger &Logger::GetInstance () { static Logger logger; return logger; } Logger::Logger () { std::thread writeLogTask ([&](){ for (;;) { time_t now=time(nullptr ); tm *nowtm=localtime(&now); char file_name[128 ]; sprintf(file_name,"%d-%d-%d-log.txt" ,nowtm->tm_year+1900 ,nowtm->tm_mon+1 ,nowtm->tm_mday); FILE *pf=fopen(file_name,"a+" ); if (pf==nullptr ) { std::cout<<"logger file : " <<file_name<<"open error!" <<std::endl; exit(EXIT_FAILURE); } std::string msg=m_lckQue.Pop(); char time_buf[128 ]={0 }; sprintf(time_buf,"%d:%d:%d =>[%s] " ,nowtm->tm_hour,nowtm->tm_min,nowtm->tm_sec,(m_loglevel == INFO ? "info" :"error" )); msg.insert(0 ,time_buf); msg.append("\n" ); fputs(msg.c_str(),pf); fclose(pf); } }) ; writeLogTask.detach (); } void Logger::SetLogLevel (LogLevel level) { m_loglevel=level; } void Logger::Log (std::string msg) { m_lckQue.Push (msg); }
zookeeper在分布式网络通信框架的应用 zookeeper在本项目起到客户端向zookeeper查找目标服务的rpc节点的地址(ip+port),这个前提是服务端向zookeeper注册的。zookeeper负责协调这之间的关系。并且zookeeper会向rpc节点定期心跳,确保存活。一旦超时,该服务的对应的rpc节点会在zookeeper删除,客户端访问不到。
zookeeperutil.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 #pragma once #include <semaphore.h> #include <zookeeper/zookeeper.h> #include <string> class ZkClient { public : ZkClient (); ~ZkClient (); void Start () ; void Create (const char *path,const char *data,int datalen,int state=0 ) ; std::string GetData (const char *path) ; private : zhandle_t *m_zhangle; };
zookeeperutil.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 #define THREADED #include "zookeeperutil.h" #include "mprpcapplication.h" #include <semaphore.h> #include <iostream> void global_watcher (zhandle_t *zh,int type,int state,const char *path,void *watcherCtx) { if (type==ZOO_SESSION_EVENT) { if (state==ZOO_CONNECTED_STATE) { sem_t *sem=(sem_t *)zoo_get_context (zh); sem_post (sem); } } } ZkClient::ZkClient () : m_zhangle (nullptr ) { } ZkClient::~ZkClient () { if (m_zhangle!=nullptr ) { zookeeper_close (m_zhangle); } } void ZkClient::Start () { std::string host=MprpcApplication::GetInstance ().GetMprpcConfig ().Load ("zookeeperip" ); std::string port=MprpcApplication::GetInstance ().GetMprpcConfig ().Load ("zookeeperport" ); std::string connstr=host+":" +port; m_zhangle=zookeeper_init (connstr.c_str (),global_watcher,30000 ,nullptr ,nullptr ,0 ); if (nullptr ==m_zhangle) { std::cout<<"zookeeper_init error!" <<std::endl; exit (EXIT_FAILURE); } sem_t sem; sem_init (&sem,0 ,0 ); zoo_set_context (m_zhangle,&sem); sem_wait (&sem); std::cout<< "zooKeeper_init success!" <<std::endl; } void ZkClient::Create (const char *path,const char *data,int datalen,int state) { char path_buffer[128 ]; int bufferlen=sizeof (path_buffer); int flag; flag=zoo_exists (m_zhangle,path,0 ,nullptr ); if (ZNONODE==flag) { flag=zoo_create (m_zhangle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen); if (flag==ZOK) { std::cout<<"znode create success...path:" <<path<<std::endl; } else { std::cout<<"flag:" <<flag<<std::endl; std::cout<<"znode create error... path:" <<path<<std::endl; exit (EXIT_FAILURE); } } } std::string ZkClient::GetData (const char * path) { char buffer[64 ]; int bufferlen=sizeof (buffer); int flag = zoo_get (m_zhangle,path,0 ,buffer,&bufferlen,nullptr ); if (flag !=ZOK) { std::cout<<"get znode error... path:" <<path<<std::endl; return "" ; } else { return buffer; } }
首先在**RpcProvider::Run()**函数中服务端向zookeeper注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ZkClient zkCli; zkCli.Start (); for (auto &sp:m_serviceMap){ std::string service_path="/" +sp.first; zkCli.Create (service_path.c_str (),nullptr ,0 ); for (auto &mp:sp.second.m_methodMap) { std::string method_path=service_path+"/" +mp.first; char method_path_data[128 ]={0 }; sprintf (method_path_data,"%s:%d" ,ip.c_str (),port); zkCli.Create (method_path.c_str (),method_path_data,strlen (method_path_data),ZOO_EPHEMERAL); } }
在MprpcChannel::CallMethod函数 中客户端向zookeeper找到对应rpc节点的地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ZkClient zkcli; zkcli.Start (); std::string method_path="/" +service_name+"/" +method_name; std::string host_data=zkcli.GetData (method_path.c_str ()); if (host_data == "" ){ controller->SetFailed (method_path+"is not exist!" ); return ; } int idx=host_data.find (":" );if (idx==-1 ){ controller->SetFailed (method_path+"address is invalid!" ); return ; } std::string ip=host_data.substr (0 ,idx); uint16_t port=atoi (host_data.substr (idx+1 ,host_data.size ()-idx).c_str ());
项目思路