欢迎访问 生活随笔!

尊龙游戏旗舰厅官网

当前位置: 尊龙游戏旗舰厅官网 > > 数据库 >内容正文

数据库

redis源码学习-尊龙游戏旗舰厅官网

发布时间:2025/1/21 数据库 16 豆豆
尊龙游戏旗舰厅官网 收集整理的这篇文章主要介绍了 redis源码学习-masterslave的命令交互 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

0. 写在前面

        version redis2.2.2

        redis中可以支持主从结构,本文主要从master和slave的心跳机制出发(ping),分析redis的命令行交互。

        在redis中,server为每个连接建立一个redisclient数据对象,来描述对应的连接。其中,redisclient为命令交互设置了缓冲区。querybuf用于存储客户端送过来的命令,buf和reply是用于应答的缓冲。querybuf是在文件事件readqueryfromclient中被填充,每次填充的最大字节数默认为1024b。而应答缓冲区是由addreply()函数填充,并由文件事件sendreplytoclient中发送给客户端。具体数据流如图1所示。masterporcess与slaveprocess进行命令交互。其中,蓝色矩形框代表函数,白色矩形框代表数据,曲线描述数据流,折线描述数据间的从属关系。


图1. master&slave交互的数据流(蓝色矩形框代表函数,白色矩形框代表数据,曲线描述数据流,折线描述数据间的从属关系)

1. 相关数据结构

 

typedef struct redisclient {int fd; //connect fd...sds querybuf; //命令缓冲区,由readqueryfromclient()事件进行填充(sds equals to char*)int argc; //for command;记录参数个数robj **argv; //for command;记录命令行参数int reqtype; //命令解析协议:inline or multibulk...time_t lastinteraction; /* 最近交互时间 */...list *reply; //replay object list/* response buffer */char buf[redis_reply_chunk_bytes]; //reply buffer,由addreply()函数进行填充int bufpos; //记录buf已填充的长度int sentlen; //replay阶段,记录当前buf已发送了多少字节 } redisclient;struct redisserver {...list *clients;dict *commands; /* command table hahs table */...list *slaves, *monitors; //master : slave链表char neterr[anet_err_len];aeeventloop *el; //event listint cronloops; //servercorn 执行次数...redisclient *master; //slave :记录 master 的连接信息的clientint replstate; //slave :当前的状态... };struct rediscommand readonlycommandtable[] = {...{"sync",synccommand,1,0,null,0,0,0},...{"ping",pingcommand,1,0,null,0,0,0},... }

 

2. query的读取和命令的解析

 

        从图1可以看出,命令交互数据query的读取是在文件事件readqueryfromclient中填充到c->querybuf中。之后,querybuf由函数processinputbuffer进行命令的解析。命令的解析过程如图2所示。在函数processinputbuffer中,将缓存与querybuf中的所有命令(命令间按\n\r分隔)进行解析。之后,查询命令hashtabe查找相关命令函数。最后调用相应命令hander执行命令。


图2.querybuf的解析

具体代码分析如下:

 

void readqueryfromclient(aeeventloop *el, int fd, void *privdata, int mask) {redisclient *c = (redisclient*) privdata;char buf[redis_iobuf_len];int nread;redis_notused(el);redis_notused(mask);nread = read(fd, buf, redis_iobuf_len);...check...if (nread) {c->querybuf = sdscatlen(c->querybuf,buf,nread);c->lastinteraction = time(null);//更新时间戳} else {return;}processinputbuffer(c);//处理client传输过来的数据 }void processinputbuffer(redisclient *c) {/* 执行querybub中的所有命令*/while(sdslen(c->querybuf)) {...check.../*判定命令的解析协议 */if (!c->reqtype) {if (c->querybuf[0] == '*') {c->reqtype = redis_req_multibulk;} else {c->reqtype = redis_req_inline;//按行解析}}if (c->reqtype == redis_req_inline) {/*processinlinebuffer: 1. 取出c->querybuf起始端到\r\n位置的字符串,更新c->querybuf2. 将取出的字符串按照“ ”空格进行分段解析,得到命令及其参数格式为: argc,*argv[],其中argv[0]为命令,argv[1~argc-1]为参数*/if (processinlinebuffer(c) != redis_ok) break;} else if (c->reqtype == redis_req_multibulk) {...}/* multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetclient(c);} else {/* only reset the client when the command was executed. */if (processcommand(c) == redis_ok) //执行命令resetclient(c);}} }/* if this function gets called we already read a whole* command, argments are in the client argv/argc fields.* processcommand() execute the command or prepare the* server for a bulk read from the client.*/ int processcommand(redisclient *c) {struct rediscommand *cmd;.../* now lookup the command and check asap about trivial error conditions* such wrong arity, bad command name and so forth. */cmd = lookupcommand(c->argv[0]->ptr);...check.../* exec the command */if (c->flags & redis_multi &&cmd->proc != execcommand && cmd->proc != discardcommand &&cmd->proc != multicommand && cmd->proc != watchcommand){queuemulticommand(c,cmd);addreply(c,shared.queued);} else {if (server.vm_enabled && server.vm_max_threads > 0 &&blockclientonswappedkeys(c,cmd)) return redis_err;call(c,cmd); //执行命令}return redis_ok; }/* call() is the core of redis execution of a command */ void call(redisclient *c, struct rediscommand *cmd) {long long dirty;dirty = server.dirty;cmd->proc(c); //执行命令dirty = server.dirty-dirty;if (server.appendonly && dirty)feedappendonlyfile(cmd,c->db->id,c->argv,c->argc);if ((dirty || cmd->flags & redis_cmd_force_replication) &&listlength(server.slaves))replicationfeedslaves(server.slaves,c->db->id,c->argv,c->argc);if (listlength(server.monitors))replicationfeedmonitors(server.monitors,c->db->id,c->argv,c->argc);server.stat_numcommands ; }

 

3. 具体命令的执行(ping命令)

    其中,addreply将相关命令执行结果放入client的reply缓冲区中。reply缓冲区的发送时机是在事件sendreplytoclient中进行。

 

#define redis_string 0 shared.pong = createobject(redis_string,sdsnew(" pong\r\n")); //{"ping",pingcommand,1,0,null,0,0,0} void pingcommand(redisclient *c) {addreply(c,shared.pong); //ping的回复是pong,打乒乓,呵呵 }//将命令执行的返回结构写入c->buf 或者 c->reply void addreply(redisclient *c, robj *obj) {if (_installwriteevent(c) != redis_ok) return;//创建event sendreplytoclientredisassert(!server.vm_enabled || obj->storage == redis_vm_memory);/* this is an important place where we can avoid copy-on-write* when there is a saving child running, avoiding touching the* refcount field of the object if it's not needed.** if the encoding is raw and there is room in the static buffer* we'll be able to send the object to the client without* messing with its page. */if (obj->encoding == redis_encoding_raw) {if (_addreplytobuffer(c,obj->ptr,sdslen(obj->ptr)) != redis_ok)_addreplyobjecttolist(c,obj);} else {/* fixme: convert the long into string and use _addreplytobuffer()* instead of calling getdecodedobject. as this place in the* code is too performance critical. */obj = getdecodedobject(obj);if (_addreplytobuffer(c,obj->ptr,sdslen(obj->ptr)) != redis_ok)_addreplyobjecttolist(c,obj);decrrefcount(obj);} }

 

4. reply缓冲区数据的发送

 

        将c->buf 和 c->reply中的数据发送到客户端(slave or master)。在每次文件事件中发送所有的reply缓冲区中的数据。

 

void sendreplytoclient(aeeventloop *el, int fd, void *privdata, int mask) {redisclient *c = privdata;int nwritten = 0, totwritten = 0, objlen;robj *o;redis_notused(el);redis_notused(mask);while(c->bufpos > 0 || listlength(c->reply)) {if (c->bufpos > 0) {//发送c->buf中的数据if (c->flags & redis_master) {/* don't reply to a master */nwritten = c->bufpos - c->sentlen;} else {nwritten = write(fd,c->buf c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;}c->sentlen = nwritten;totwritten = nwritten;/* if the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if (c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {//发送c->reply中的数据o = listnodevalue(listfirst(c->reply));objlen = sdslen(o->ptr);if (objlen == 0) {listdelnode(c->reply,listfirst(c->reply));continue;}if (c->flags & redis_master) {/* don't reply to a master */nwritten = objlen - c->sentlen;} else {nwritten = write(fd, ((char*)o->ptr) c->sentlen,objlen-c->sentlen);if (nwritten <= 0) break;}c->sentlen = nwritten;totwritten = nwritten;/* if we fully sent the object on head go to the next one */if (c->sentlen == objlen) {listdelnode(c->reply,listfirst(c->reply));c->sentlen = 0;}}/* note that we avoid to send more thank redis_max_write_per_event* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'keys *' against the loopback interfae) */if (totwritten > redis_max_write_per_event) break;}...check...if (totwritten > 0) c->lastinteraction = time(null);//??why delete file event of write ? ?if (listlength(c->reply) == 0) {c->sentlen = 0;aedeletefileevent(server.el,c->fd,ae_writable);/* close connection after entire reply has been sent. */if (c->flags & redis_close_after_reply) freeclient(c);} }

 

5. 总结

        命令行交互过程中,1.为每个连接有相应的数据进行描述(redisclient),这样便于连接的管理。2.命令行交互中,引入命令缓冲区querybuf,这样可以延时处理命令,这在事件轮询机制中,是至关重要的。

      原文链接 http://blog.csdn.net/ordeder/article/details/16105345

 

总结

以上是尊龙游戏旗舰厅官网为你收集整理的redis源码学习-masterslave的命令交互的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得尊龙游戏旗舰厅官网网站内容还不错,欢迎将尊龙游戏旗舰厅官网推荐给好友。

  • 上一篇:
  • 下一篇:
网站地图