Redis 4.0.10をソースからビルドしてデフォルト設定時、GETコマンド実行時の動作をソースコードで確認します。 https://github.com/antirez/redis



1. クライアントから送られてきたリクエストを処理してClient Output Bufferに保存する処理
2. Client Output Bufferに保存されたリプライ内容をクライアントへ送る処理


0. main関数からaeMain関数までの処理


1 loadServerConfig 2 initServer 3 aeMain

1. クライアントから送られてきたリクエストを処理してClient Output Bufferに保存する処理

  • 0 aeMain at ae.c
  • 1 aeProcessEvents at ae.c
    • 2 aeSearchNearestTimer at ae.c
    • 2 aeApiPoll at ae.c
    • 2 afterSleep at server.c
    • 2 rfileProc -> readQueryFromClient at networking.c
      • 3 processInputBuffer at networking.c
        • 4 processMultibulkBuffer at networking.c
          • 5 createStringObject at object.c
            • 6 createEmbeddedStringObject at object.c
          • 5 createStringObject at object.c
        • 4 processMultibulkBuffer at networking.c
        • 4 processCommand at server.c
          • 5 lookupCommand at server.c
            • 6 dictFetchValue at dict.c
              • 7 dictFind at dict.c
                • 8 dictHashKey -> dictSdsCaseHash at server.c
        • 4 processCommand at server.c
          • 5 call at server.c
            • 6 proc -> getCommand at t_string.c
              • 7 getGenericCommand at t_string.c
                • 8 lookupKeyReadOrReply at db.c
                  • 9 lookupKeyRead at db.c
                    • 10 lookupKeyReadWithFlags at db.c
                • 8 addReplyBulk at networking.c
        • 4 resetClient at networking.c
    • 2 processTimeEvents at ae.c

2. Client Output Bufferに保存されたリプライ内容をクライアントへ送る処理

  • 0 aeMain at ae.c
    • 1 beforeSleep at server.c
      • 2 activeExpireCycle at expire.c
      • 2 moduleHandleBlockedClients at module.c
      • 2 flushAppendOnlyFile at aof.c
      • 2 handleClientsWithPendingWrites at networking.c
        • 3 writeToClient at networking.c
          • 4 clientHasPendingReplies at networking.c


クライアントから送られてきたリクエストを処理してClient Output Bufferに保存する処理

0 aeMain at ae.c


496│ void aeMain(aeEventLoop *eventLoop) {
497│     eventLoop->stop = 0;
498├>    while (!eventLoop->stop) {
499│         if (eventLoop->beforesleep != NULL)
500│             eventLoop->beforesleep(eventLoop);
501│         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
502│     }
503│ }

1 aeProcessEvents at ae.c

346│  * Without special flags the function sleeps until some file event
347│  * fires, or when the next time event occurs (if any).
348│  *
349│  * If flags is 0, the function does nothing and returns.
350│  * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
351│  * if flags has AE_FILE_EVENTS set, file events are processed.
352│  * if flags has AE_TIME_EVENTS set, time events are processed.
353│  * if flags has AE_DONT_WAIT set the function returns ASAP until all
354│  * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
355│  * the events that's possible to process without to wait are processed.
356│  *
357│  * The function returns the number of events processed. */
358│ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
360│     int processed = 0, numevents;
362│     /* Nothing to do? return ASAP */
363│     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
365│     /* Note that we want call select() even if there are no
366│      * file events to process as long as we want to process time
367│      * events, in order to sleep until the next time event is ready
368│      * to fire. */
369│     if (eventLoop->maxfd != -1 ||
370│         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
365│     /* Note that we want call select() even if there are no
366│      * file events to process as long as we want to process time
367│      * events, in order to sleep until the next time event is ready
368│      * to fire. */
369│     if (eventLoop->maxfd != -1 ||
370│         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
371│         int j;
372│         aeTimeEvent *shortest = NULL;
373│         struct timeval tv, *tvp;
375│         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
376├>            shortest = aeSearchNearestTimer(eventLoop);
377│         if (shortest) {
378│             long now_sec, now_ms;
380│             aeGetTime(&now_sec, &now_ms);
381│             tvp = &tv;
383│             /* How many milliseconds we need to wait for the next
384│              * time event to fire? */
385│             long long ms =
386│                 (shortest->when_sec - now_sec)*1000 +
387│                 shortest->when_ms - now_ms;

2 aeSearchNearestTimer at ae.c

243│ /* Search the first timer to fire.
244│  * This operation is useful to know how many time the select can be
245│  * put in sleep without to delay any event.
246│  * If there are no timers NULL is returned.
247│  *
248│  * Note that's O(N) since time events are unsorted.
249│  * Possible optimizations (not needed by Redis so far, but...):
250│  * 1) Insert the event in order, so that the nearest is just the head.
251│  *    Much better but still insertion or deletion of timers is O(N).
252│  * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
253│  */
254│ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
255│ {
256├>    aeTimeEvent *te = eventLoop->timeEventHead;
257│     aeTimeEvent *nearest = NULL;
259│     while(te) {
260│         if (!nearest || te->when_sec < nearest->when_sec ||
261│                 (te->when_sec == nearest->when_sec &&
262│                  te->when_ms < nearest->when_ms))
263│             nearest = te;
264│         te = te->next;
265│     }
266│     return nearest;
267│ }

1 aeProcessEvents at ae.c

409│         /* Call the multiplexing API, will return only on timeout or when
410│          * some event fires. */
411├>        numevents = aeApiPoll(eventLoop, tvp);
413│         /* After sleep callback. */
414│         if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
415│             eventLoop->aftersleep(eventLoop);
417│         for (j = 0; j < numevents; j++) {
418│             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
419│             int mask = eventLoop->fired[j].mask;
420│             int fd = eventLoop->fired[j].fd;
421│             int fired = 0; /* Number of events fired for current fd. */

2 aeApiPoll at ae.c


108│ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
109│     aeApiState *state = eventLoop->apidata;
110│     int retval, numevents = 0;
112│     retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
113│             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
114│     if (retval > 0) {
115│         int j;
117├>        numevents = retval;
118│         for (j = 0; j < numevents; j++) {
119│             int mask = 0;
120│             struct epoll_event *e = state->events+j;
122│             if (e->events & EPOLLIN) mask |= AE_READABLE;
123│             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
124│             if (e->events & EPOLLERR) mask |= AE_WRITABLE;
125│             if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
126│             eventLoop->fired[j].fd = e->data.fd;
127│             eventLoop->fired[j].mask = mask;
128│         }
129│     }
130├>    return numevents;
131│ }

1 aeProcessEvents at ae.c

413│         /* After sleep callback. */
414│         if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
415├>            eventLoop->aftersleep(eventLoop);
417│         for (j = 0; j < numevents; j++) {
418│             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
419│             int mask = eventLoop->fired[j].mask;
420│             int fd = eventLoop->fired[j].fd;
421│             int fired = 0; /* Number of events fired for current fd. */

2 afterSleep at server.c

1240│ /* This function is called immadiately after the event loop multiplexing
1241│  * API returned, and the control is going to soon return to Redis by invoking
1242│  * the different events callbacks. */
1243│ void afterSleep(struct aeEventLoop *eventLoop) {
1244│     UNUSED(eventLoop);
1245├>    if (moduleCount()) moduleAcquireGIL();
1246│ }

1 aeProcessEvents at ae.c

ここでは GET なので AE_READABLE で rfileProc 関数が呼ばれる

436│             /* Note the "fe->mask & mask & ..." code: maybe an already
437│              * processed event removed an element that fired and we still
438│              * didn't processed, so we check if the event is still valid.
439│              *
440│              * Fire the readable event if the call sequence is not
441│              * inverted. */
442├>            if (!invert && fe->mask & mask & AE_READABLE) {
443│                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
444│                 fired++;
445│             }
447│             /* Fire the writable event. */
448│             if (fe->mask & mask & AE_WRITABLE) {
449│                 if (!fired || fe->wfileProc != fe->rfileProc) {
450│                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
451│                     fired++;
452│                 }
453│             }

2 rfileProc -> readQueryFromClient at networking.c

1379│ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1380│     client *c = (client*) privdata;
1381│     int nread, readlen;
1382│     size_t qblen;
1383│     UNUSED(el);
1384│     UNUSED(mask);
1386├>    readlen = PROTO_IOBUF_LEN;
1387│     /* If this is a multi bulk request, and we are processing a bulk reply
1388│      * that is large enough, try to maximize the probability that the query
1389│      * buffer contains exactly the SDS string representing the object, even
1390│      * at the risk of requiring more read(2) calls. This way the function
1391│      * processMultiBulkBuffer() can avoid copying buffers to create the
1392│      * Redis Object representing the argument. */
1393│     if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1394│         && c->bulklen >= PROTO_MBULK_BIG_ARG)
1395│     {
1396│         ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
1398│         if (remaining < readlen) readlen = remaining;


1440│     /* Time to process the buffer. If the client is a master we need to
1441│      * compute the difference between the applied offset before and after
1442│      * processing the buffer, to understand how much of the replication stream
1443│      * was actually applied to the master state: this quantity, and its
1444│      * corresponding part of the replication stream, will be propagated to
1445│      * the sub-slaves and to the replication backlog. */
1446├>    if (!(c->flags & CLIENT_MASTER)) {
1447│         processInputBuffer(c);
1448│     } else {
1449│         size_t prev_offset = c->reploff;
1450│         processInputBuffer(c);
1451│         size_t applied = c->reploff - prev_offset;
1452│         if (applied) {
1453│             replicationFeedSlavesFromMasterStream(server.slaves,
1454│                     c->pending_querybuf, applied);
1455│             sdsrange(c->pending_querybuf,applied,-1);
1456│         }
1457│     }

3 processInputBuffer at networking.c

1314│ /* This function is called every time, in the client structure 'c', there is
1315│  * more query buffer to process, because we read more data from the socket
1316│  * or because a client was blocked and later reactivated, so there could be
1317│  * pending query buffer, already representing a full command, to process. */
1318│ void processInputBuffer(client *c) {
1319│     server.current_client = c;
1320│     /* Keep processing while there is something in the input buffer */
1321├>    while(sdslen(c->querybuf)) {
1322│         /* Return if clients are paused. */
1323│         if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
1325│         /* Immediately abort if the client is in the middle of something. */
1326│         if (c->flags & CLIENT_BLOCKED) break;
1328│         /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
1329│          * written to the client. Make sure to not let the reply grow after
1330│          * this flag has been set (i.e. don't process more commands).
1331│          *
1332│          * The same applies for clients we want to terminate ASAP. */
1333│         if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;


1335│         /* Determine request type when unknown. */
1336│         if (!c->reqtype) {
1337│             if (c->querybuf[0] == '*') {
1338├>                c->reqtype = PROTO_REQ_MULTIBULK;
1339│             } else {
1340│                 c->reqtype = PROTO_REQ_INLINE;
1341│             }
1342│         }


1344│         if (c->reqtype == PROTO_REQ_INLINE) {
1345│             if (processInlineBuffer(c) != C_OK) break;
1346│         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
1347├>            if (processMultibulkBuffer(c) != C_OK) break;
1348│         } else {
1349│             serverPanic("Unknown request type");
1350│         }

4 processMultibulkBuffer at networking.c

1164│ /* Process the query buffer for client 'c', setting up the client argument
1165│  * vector for command execution. Returns C_OK if after running the function
1166│  * the client has a well-formed ready to be processed command, otherwise
1167│  * C_ERR if there is still to read more buffer to get the full command.
1168│  * The function also returns C_ERR when there is a protocol error: in such a
1169│  * case the client structure is setup to reply with the error and close
1170│  * the connection.
1171│  *
1172│  * This function is called if processInputBuffer() detects that the next
1173│  * command is in RESP format, so the first byte in the command is found
1174│  * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
1175├>int processMultibulkBuffer(client *c) {
1176│     char *newline = NULL;
1177│     long pos = 0;
1178│     int ok;
1179│     long long ll;
1181│     if (c->multibulklen == 0) {
1182│         /* The client should have been reset */
1183│         serverAssertWithInfo(c,NULL,c->argc == 0);
1185│         /* Multi bulk length cannot be read without a \r\n */
1186│         newline = strchr(c->querybuf,'\r');
1187│         if (newline == NULL) {

このとき、c->querybuf には、sds にRESP形式で"*2\r\n$3\r\nget\r\n$4\r\nhoge\r\n"の文字列が格納されている

1199│         /* We know for sure there is a whole line since newline != NULL,
1200│          * so go ahead and find out the multi bulk length. */
1201│         serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
1202├>        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
1203│         if (!ok || ll > 1024*1024) {
1204│             addReplyError(c,"Protocol error: invalid multibulk length");
1205│             setProtocolError("invalid mbulk count",c,pos);
1206│             return C_ERR;
1207│         }
1209│         pos = (newline-c->querybuf)+2;
1210│         if (ll <= 0) {
1211│             sdsrange(c->querybuf,pos,-1);
1212│             return C_OK;
1213│         }

ちょっと処理を進めて、c->querybuf+pos+1 には、"3\r\nget\r\n$4\r\nhoge\r\n" が格納されている

1249├>            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
1250│             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
1251│                 addReplyError(c,"Protocol error: invalid bulk length");
1252│                 setProtocolError("invalid bulk length",c,pos);
1253│                 return C_ERR;
1254│             }
1256│             pos += newline-(c->querybuf+pos)+2;
1257│             if (ll >= PROTO_MBULK_BIG_ARG) {
1258│                 size_t qblen;


1275│         /* Read bulk argument */
1276│         if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) {
1277│             /* Not enough data (+2 == trailing \r\n) */
1278│             break;
1279│         } else {
1280│             /* Optimization: if the buffer contains JUST our bulk element
1281│              * instead of creating a new object by *copying* the sds we
1282│              * just use the current sds string. */
1283│             if (pos == 0 &&
1284│                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1285│                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
1286│             {
1287│                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1288│                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1289│                 /* Assume that if we saw a fat argument we'll see another one
1290│                  * likely... */
1291│                 c->querybuf = sdsnewlen(NULL,c->bulklen+2);
1292│                 sdsclear(c->querybuf);
1293│                 pos = 0;
1294│             } else {
1295│                 c->argv[c->argc++] =
1296├>                    createStringObject(c->querybuf+pos,c->bulklen);
1297│                 pos += c->bulklen+2;
1298│             }

5 createStringObject at object.c


 110│ /* Create a string object with EMBSTR encoding if it is smaller than
 111│  * OBJ_ENCODING_EMBSTR_SIZE_LIMIT, otherwise the RAW encoding is
 112│  * used.
 113│  *
 114│  * The current limit of 39 is chosen so that the biggest string object
 115│  * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
 117│ robj *createStringObject(const char *ptr, size_t len) {
 118├>    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
 119│         return createEmbeddedStringObject(ptr,len);
 120│     else
 121│         return createRawStringObject(ptr,len);
 122│ }

6 createEmbeddedStringObject at object.c


  81│ /* Create a string object with encoding OBJ_ENCODING_EMBSTR, that is
  82│  * an object where the sds string is actually an unmodifiable string
  83│  * allocated in the same chunk as the object itself. */
  84│ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
  85│     robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
  86│     struct sdshdr8 *sh = (void*)(o+1);
  88│     o->type = OBJ_STRING;
  89│     o->encoding = OBJ_ENCODING_EMBSTR;
  90│     o->ptr = sh+1;
  91│     o->refcount = 1;
  92│     if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
  93│         o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
  94│     } else {
  95│         o->lru = LRU_CLOCK();
  96│     }
  98│     sh->len = len;
  99│     sh->alloc = len;
 100├>    sh->flags = SDS_TYPE_8;
 101│     if (ptr) {
 102│         memcpy(sh->buf,ptr,len);
 103│         sh->buf[len] = '\0';
 104│     } else {
 105│         memset(sh->buf,0,len+1);
 106│     }
 107│     return o;
 108│ }


type = struct redisObject {
    unsigned int type : 4;
    unsigned int encoding : 4;
    unsigned int lru : 24;
    int refcount;
    void *ptr;

4 processMultibulkBuffer at networking.c

1222│     serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1223│     while(c->multibulklen) {
1224│         /* Read bulk length if unknown */
1225│         if (c->bulklen == -1) {
1226│             newline = strchr(c->querybuf+pos,'\r');
1227├>            if (newline == NULL) {
1228│                 if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1229│                     addReplyError(c,
1230│                         "Protocol error: too big bulk count string");
1231│                     setProtocolError("too big bulk count string",c,0);
1232│                     return C_ERR;
1233│                 }
1234│                 break;
1235│             }
1237│             /* Buffer should also contain \n */
1238│             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
1239│                 break;

c->querybuf+pos+1 には、先程"3\r\nget\r\n$4\r\nhoge\r\n" が格納されていたが、"4\r\nhoge\r\n"が今は格納されている。

1249├>            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
1250│             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
1251│                 addReplyError(c,"Protocol error: invalid bulk length");
1252│                 setProtocolError("invalid bulk length",c,pos);
1253│                 return C_ERR;
1254│             }


1283│             if (pos == 0 &&
1284│                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1285│                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
1286│             {
1287│                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1288│                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1289│                 /* Assume that if we saw a fat argument we'll see another one
1290│                  * likely... */
1291│                 c->querybuf = sdsnewlen(NULL,c->bulklen+2);
1292│                 sdsclear(c->querybuf);
1293│                 pos = 0;
1294│             } else {
1295│                 c->argv[c->argc++] =
1296├>                    createStringObject(c->querybuf+pos,c->bulklen);
1297│                 pos += c->bulklen+2;
1298│             }
1299│             c->bulklen = -1;
1300│             c->multibulklen--;
1301│         }
1302│     }

5 createStringObject at object.c


  98│     sh->len = len;
  99│     sh->alloc = len;
 100│     sh->flags = SDS_TYPE_8;
 101│     if (ptr) {
 102│         memcpy(sh->buf,ptr,len);
 103│         sh->buf[len] = '\0';
 104│     } else {
 105│         memset(sh->buf,0,len+1);
 106│     }
 107├>    return o;

4 processMultibulkBuffer at networking.c

1304│     /* Trim to pos */
1305├>    if (pos) sdsrange(c->querybuf,pos,-1);
1307│     /* We're done when c->multibulk == 0 */
1308│     if (c->multibulklen == 0) return C_OK;
1310│     /* Still not ready to process the command */
1311│     return C_ERR;

3 processInputBuffer at networking.c

1352│         /* Multibulk processing could see a <= 0 length. */
1353│         if (c->argc == 0) {
1354│             resetClient(c);
1355│         } else {
1356│             /* Only reset the client when the command was executed. */
1357├>            if (processCommand(c) == C_OK) {
1358│                 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1359│                     /* Update the applied replication offset of our master. */
1360│                     c->reploff = c->read_reploff - sdslen(c->querybuf);
1361│                 }
1363│                 /* Don't reset the client structure for clients blocked in a
1364│                  * module blocking command, so that the reply callback will
1365│                  * still be able to access the client argv and argc field.
1366│                  * The client will be reset in unblockClientFromModule(). */
1367│                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
1368│                     resetClient(c);
1369│             }

4 processCommand at server.c


2324│ /* If this function gets called we already read a whole
2325│  * command, arguments are in the client argv/argc fields.
2326│  * processCommand() execute the command or prepare the
2327│  * server for a bulk read from the client.
2328│  *
2329│  * If C_OK is returned the client is still alive and valid and
2330│  * other operations can be performed by the caller. Otherwise
2331│  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
2332├>int processCommand(client *c) {
2333│     /* The QUIT command is handled separately. Normal command procs will
2334│      * go through checking for replication and QUIT will cause trouble
2335│      * when FORCE_REPLICATION is enabled and would be implemented in
2336│      * a regular command proc. */
2337│     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2338│         addReply(c,shared.ok);
2339│         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2340│         return C_ERR;
2341│     }
2343│     /* Now lookup the command and check ASAP about trivial error conditions
2344│      * such as wrong arity, bad command name and so forth. */
2345├>    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2346│     if (!c->cmd) {
2347│         flagTransaction(c);
2348│         sds args = sdsempty();
2349│         int i;
2350│         for (i=1; i < c->argc && sdslen(args) < 128; i++)
2351│             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
2352│         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
2353│             (char*)c->argv[0]->ptr, args);
2354│         sdsfree(args);
2355│         return C_OK;
2356│     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2357│                (c->argc < -c->cmd->arity)) {

5 lookupCommand at server.c

2070│ struct redisCommand *lookupCommand(sds name) {
2071├>    return dictFetchValue(server.commands, name);
2072│ }

6 dictFetchValue at dict.c

 497│ void *dictFetchValue(dict *d, const void *key) {
 498│     dictEntry *he;
 500├>    he = dictFind(d,key);
 501│     return he ? dictGetVal(he) : NULL;
 502│ }

7 dictFind at dict.c


 476│ dictEntry *dictFind(dict *d, const void *key)
 477│ {
 478│     dictEntry *he;
 479│     uint64_t h, idx, table;
 481├>    if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
 482│     if (dictIsRehashing(d)) _dictRehashStep(d);
 483│     h = dictHashKey(d, key);
 484│     for (table = 0; table <= 1; table++) {
 485│         idx = h & d->ht[table].sizemask;
 486│         he = d->ht[table].table[idx];
 487│         while(he) {
 488│             if (key==he->key || dictCompareKeys(d, key, he->key))
 489│                 return he;
 490│             he = he->next;
 491│         }
 492│         if (!dictIsRehashing(d)) return NULL;
 493│     }

8 dictHashKey -> dictSdsCaseHash at server.c

 498│ uint64_t dictSdsCaseHash(const void *key) {
 499├>    return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
 500│ }

4 processCommand at server.c

c->cmd には、以下の値へのポインタが格納された状態で処理を続行。

{name = 0x561b161f3dd2 &quot;get&quot;, proc = 0x561b1611cafa &lt;getCommand&gt;, arity = 2, sflags = 0x561b161f3dd6 &quot;r
F&quot;, flags = 8194, getkeys_proc = 0x0, firstkey = 1, lastkey = 1, keystep = 1, microseconds = 62, calls = 5}
2343│     /* Now lookup the command and check ASAP about trivial error conditions
2344│      * such as wrong arity, bad command name and so forth. */
2345│     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2346├>    if (!c->cmd) {
2347│         flagTransaction(c);
2348│         sds args = sdsempty();
2349│         int i;
2350│         for (i=1; i < c->argc && sdslen(args) < 128; i++)
2351│             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
2352│         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
2353│             (char*)c->argv[0]->ptr, args);
2354│         sdsfree(args);
2355│         return C_OK;
2356│     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2357│                (c->argc < -c->cmd->arity)) {
2358│         flagTransaction(c);


2372│     /* If cluster is enabled perform the cluster redirection here.
2373│      * However we don't perform the redirection if:
2374│      * 1) The sender of this command is our master.
2375│      * 2) The command has no key arguments. */
2376├>    if (server.cluster_enabled &&
2377│         !(c->flags & CLIENT_MASTER) &&
2378│         !(c->flags & CLIENT_LUA &&
2379│           server.lua_caller->flags & CLIENT_MASTER) &&
2380│         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2381│           c->cmd->proc != execCommand))
2382│     {
2383│         int hashslot;
2384│         int error_code;
2385│         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2386│                                         &hashslot,&error_code);
2387│         if (n == NULL || n != server.cluster->myself) {
2388│             if (c->cmd->proc == execCommand) {

その後、call 関数を実行して、実際に入力されたコマンドの内容を処理。

2507│     /* Exec the command */
2508│     if (c->flags & CLIENT_MULTI &&
2509│         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2510│         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2511│     {
2512│         queueMultiCommand(c);
2513│         addReply(c,shared.queued);
2514│     } else {
2515├>        call(c,CMD_CALL_FULL);
2516│         c->woff = server.master_repl_offset;
2517│         if (listLength(server.ready_keys))
2518│             handleClientsBlockedOnLists();
2519│     }
2520│     return C_OK;
2521│ }

5 call at server.c

c->cmd->proc(c)が実行される。proc関数は、redisCommandTable変数で各コマンドに対して定義している実行する関数が呼ばれる。 https://github.com/antirez/redis/blob/4.0.10/src/server.c#L75-L308

2196│  * slaves propagation will never occur.
2197│  *
2198│  * Client flags are modified by the implementation of a given command
2199│  * using the following API:
2200│  *
2201│  * forceCommandPropagation(client *c, int flags);
2202│  * preventCommandPropagation(client *c);
2203│  * preventCommandAOF(client *c);
2204│  * preventCommandReplication(client *c);
2205│  *
2206│  */
2207│ void call(client *c, int flags) {
2208│     long long dirty, start, duration;
2209├>    int client_old_flags = c->flags;
2211│     /* Sent the command to clients in MONITOR mode, only if the commands are
2212│      * not generated from reading an AOF. */
2213│     if (listLength(server.monitors) &&
2214│         !server.loading &&
2215│         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2216│     {
2217│         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2218│     }
2220│     /* Initialization: clear the flags that must be set by the command on
2221│      * demand, and initialize the array for additional commands propagation. */
2223│     redisOpArray prev_also_propagate = server.also_propagate;
2224│     redisOpArrayInit(&server.also_propagate);
2226│     /* Call the command. */
2227│     dirty = server.dirty;
2228│     start = ustime();
2229├>    c->cmd->proc(c);
2230│     duration = ustime()-start;
2231│     dirty = server.dirty-dirty;
2232│     if (dirty < 0) dirty = 0;
2234│     /* When EVAL is called loading the AOF we don't want commands called
2235│      * from Lua to go into the slowlog or to populate statistics. */
2236│     if (server.loading && c->flags & CLIENT_LUA)
2237│         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2239│     /* If the caller is Lua, we want to force the EVAL caller to propagate
2240│      * the script if the command flag or client flag are forcing the
2241│      * propagation. */

6 proc -> getCommand at t_string.c

172│ void getCommand(client *c) {
173├>    getGenericCommand(c);
174│ }

7 getGenericCommand at t_string.c


157│ int getGenericCommand(client *c) {
158│     robj *o;
160├>    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
161│         return C_OK;
163│     if (o->type != OBJ_STRING) {
164│         addReply(c,shared.wrongtypeerr);
165│         return C_ERR;
166│     } else {
167│         addReplyBulk(c,o);
168│         return C_OK;
169│     }
170│ }

8 lookupKeyReadOrReply at db.c


 151│ robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
 152├>    robj *o = lookupKeyRead(c->db, key);
 153│     if (!o) addReply(c,reply);
 154│     return o;
 155│ }

9 lookupKeyRead at db.c

 135│ /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
 136│  * common case. */
 137│ robj *lookupKeyRead(redisDb *db, robj *key) {
 138├>    return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
 139│ }

10 lookupKeyReadWithFlags at db.c

  88│  * Flags change the behavior of this command:
  89│  *
  90│  *  LOOKUP_NONE (or zero): no special flags are passed.
  91│  *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
  92│  *
  93│  * Note: this function also returns NULL is the key is logically expired
  94│  * but still existing, in case this is a slave, since this API is called only
  95│  * for read operations. Even if the key expiry is master-driven, we can
  96│  * correctly report a key is expired on slaves even if the master is lagging
  97│  * expiring our key via DELs in the replication link. */
  98│ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
  99│     robj *val;
 101├>    if (expireIfNeeded(db,key) == 1) {
 102│         /* Key expired. If we are in the context of a master, expireIfNeeded()
 103│          * returns 0 only when the key does not exist at all, so it's safe
 104│          * to return NULL ASAP. */
 105│         if (server.masterhost == NULL) return NULL;
 107│         /* However if we are in the context of a slave, expireIfNeeded() will
 108│          * not really try to expire the key, it only returns information
 109│          * about the "logical" status of the key: key expiring is up to the
 110│          * master in order to have a consistent view of master's data set.
 111│          *
 112│          * However, if the command caller is not the master, and as additional
 113│          * safety measure, the command invoked is a read-only command, we can
 127├>    val = lookupKey(db,key,flags);
 128│     if (val == NULL)
 129│         server.stat_keyspace_misses++;
 130│     else
 131│         server.stat_keyspace_hits++;
 132│     return val;
 133│ }

8 addReplyBulk at networking.c


 556│ /* Add a Redis Object as a bulk reply */
 557│ void addReplyBulk(client *c, robj *obj) {
 558├>    addReplyBulkLen(c,obj);
 559│     addReply(c,obj);
 560│     addReply(c,shared.crlf);
 561│ }

5 call at server.c


2249│     /* Log the command into the Slow log if needed, and populate the
2250│      * per-command statistics that we show in INFO commandstats. */
2251│     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2252│         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2253│                               "fast-command" : "command";
2254│         latencyAddSampleIfNeeded(latency_event,duration/1000);
2255│         slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
2256│     }
2257│     if (flags & CMD_CALL_STATS) {
2258│         c->lastcmd->microseconds += duration;
2259├>        c->lastcmd->calls++;
2260│     }


2262│     /* Propagate the command into the AOF and replication link */
2263│     if (flags & CMD_CALL_PROPAGATE &&
2264│         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2265│     {
2266├>        int propagate_flags = PROPAGATE_NONE;
2268│         /* Check if the command operated changes in the data set. If so
2269│          * set for replication / AOF propagation. */
2270│         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2272│         /* If the client forced AOF / replication of the command, set
2273│          * the flags regardless of the command effects on the data set. */
2274│         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2275│         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2277│         /* However prevent AOF / replication propagation if the command
2278│          * implementatino called preventCommandPropagation() or similar,
2287│         /* Call propagate() only if at least one of AOF / replication
2288│          * propagation is needed. Note that modules commands handle replication
2289│          * in an explicit way, so we never replicate them automatically. */
2290├>        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
2291│             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2292│     }

4 processCommand at server.c


2507│     /* Exec the command */
2508│     if (c->flags & CLIENT_MULTI &&
2509│         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2510│         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2511│     {
2512│         queueMultiCommand(c);
2513│         addReply(c,shared.queued);
2514│     } else {
2515│         call(c,CMD_CALL_FULL);
2516│         c->woff = server.master_repl_offset;
2517├>        if (listLength(server.ready_keys))
2518│             handleClientsBlockedOnLists();
2519│     }
2520│     return C_OK;

3 processInputBuffer at networking.c


1352│         /* Multibulk processing could see a <= 0 length. */
1353│         if (c->argc == 0) {
1354│             resetClient(c);
1355│         } else {
1356│             /* Only reset the client when the command was executed. */
1357│             if (processCommand(c) == C_OK) {
1358├>                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1359│                     /* Update the applied replication offset of our master. */
1360│                     c->reploff = c->read_reploff - sdslen(c->querybuf);
1361│                 }
1363│                 /* Don't reset the client structure for clients blocked in a
1364│                  * module blocking command, so that the reply callback will
1365│                  * still be able to access the client argv and argc field.
1366│                  * The client will be reset in unblockClientFromModule(). */
1367│                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
1368│                     resetClient(c);
1369│             }

4 resetClient at networking.c

1041│ /* resetClient prepare the client to process the next command */
1042│ void resetClient(client *c) {
1043│     redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1045│     freeClientArgv(c);
1046│     c->reqtype = 0;
1047│     c->multibulklen = 0;
1048│     c->bulklen = -1;
1050│     /* We clear the ASKING flag as well if we are not inside a MULTI, and
1051│      * if what we just executed is not the ASKING command itself. */
1052├>    if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1053│         c->flags &= ~CLIENT_ASKING;
1055│     /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1056│      * to the next command will be sent, but set the flag if the command
1057│      * we just processed was "CLIENT REPLY SKIP". */
1058│     c->flags &= ~CLIENT_REPLY_SKIP;
1059│     if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1060│         c->flags |= CLIENT_REPLY_SKIP;
1061│         c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1062│     }
1063│ }

1 aeProcessEvents


436│             /* Note the "fe->mask & mask & ..." code: maybe an already
437│              * processed event removed an element that fired and we still
438│              * didn't processed, so we check if the event is still valid.
439│              *
440│              * Fire the readable event if the call sequence is not
441│              * inverted. */
442│             if (!invert && fe->mask & mask & AE_READABLE) {
443│                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
444├>                fired++;
445│             }
447│             /* Fire the writable event. */
448│             if (fe->mask & mask & AE_WRITABLE) {
449│                 if (!fired || fe->wfileProc != fe->rfileProc) {
450│                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
451│                     fired++;
452│                 }
453│             }
467│     /* Check time events */
468├>    if (flags & AE_TIME_EVENTS)
469│         processed += processTimeEvents(eventLoop);
471│     return processed; /* return the number of processed file/time events */
472│ }

2 processTimeEvents at ae.c

269│ /* Process time events */
270│ static int processTimeEvents(aeEventLoop *eventLoop) {
271│     int processed = 0;
272│     aeTimeEvent *te;
273│     long long maxId;
274├>    time_t now = time(NULL);
276│     /* If the system clock is moved to the future, and then set back to the
277│      * right value, time events may be delayed in a random way. Often this
278│      * means that scheduled operations will not be performed soon enough.
279│      *
280│      * Here we try to detect system clock skews, and force all the time
281│      * events to be processed ASAP when this happens: the idea is that
282│      * processing events earlier is less dangerous than delaying them
283│      * indefinitely, and practice suggests it is. */
284│     if (now < eventLoop->lastTime) {
285│         te = eventLoop->timeEventHead;
286│         while(te) {
287│             te->when_sec = 0;
288│             te = te->next;
289│         }
290│     }
291│     eventLoop->lastTime = now;
293│     te = eventLoop->timeEventHead;
294│     maxId = eventLoop->timeEventNextId-1;
295├>    while(te) {
296│         long now_sec, now_ms;
297│         long long id;
299│         /* Remove events scheduled for deletion. */
300│         if (te->id == AE_DELETED_EVENT_ID) {
301│             aeTimeEvent *next = te->next;
302│             if (te->prev)
303│                 te->prev->next = te->next;
304│             else
305│                 eventLoop->timeEventHead = te->next;
306│             if (te->next)
307│                 te->next->prev = te->prev;
320│         if (te->id > maxId) {
321│             te = te->next;
322│             continue;
323│         }
324│         aeGetTime(&now_sec, &now_ms);
325│         if (now_sec > te->when_sec ||
326│             (now_sec == te->when_sec && now_ms >= te->when_ms))
327│         {
328│             int retval;
330├>            id = te->id;
331│             retval = te->timeProc(eventLoop, id, te->clientData);
332│             processed++;
333│             if (retval != AE_NOMORE) {
334│                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
335│             } else {
336│                 te->id = AE_DELETED_EVENT_ID;
337│             }
338│         }
339│         te = te->next;
340│     }
341│     return processed;
342│ }

Client Output Bufferに保存されたリプライ内容をクライアントへ送る処理

0 aeMain at ae.c


496│ void aeMain(aeEventLoop *eventLoop) {
497│     eventLoop->stop = 0;
498│     while (!eventLoop->stop) {
499│         if (eventLoop->beforesleep != NULL)
500├>            eventLoop->beforesleep(eventLoop);
501│         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
502│     }
503│ }

1 beforeSleep at server.c

handleClientsWithPendingWrites関数でClient Output Bufferに格納された結果をクライアントに返す。その後、再びaeMain関数のループ処理に戻ってきて、一連の処理は終了。

1183│ /* This function gets called every time Redis is entering the
1184│  * main loop of the event driven library, that is, before to sleep
1185│  * for ready file descriptors. */
1186├>void beforeSleep(struct aeEventLoop *eventLoop) {
1187│     UNUSED(eventLoop);
1189│     /* Call the Redis Cluster before sleep function. Note that this function
1190│      * may change the state of Redis Cluster (from ok to fail or vice versa),
1191│      * so it's a good idea to call it before serving the unblocked clients
1192│      * later in this function. */
1193│     if (server.cluster_enabled) clusterBeforeSleep();
1195│     /* Run a fast expire cycle (the called function will return
1196│      * ASAP if a fast cycle is not needed). */
1197│     if (server.active_expire_enabled && server.masterhost == NULL)
1198│         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1200│     /* Send all the slaves an ACK request if at least one client blocked
1201│      * during the previous event loop iteration. */
1202│     if (server.get_ack_from_slaves) {
1203│         robj *argv[3];
1205│         argv[0] = createStringObject("REPLCONF",8);
1206│         argv[1] = createStringObject("GETACK",6);
1207│         argv[2] = createStringObject("*",1); /* Not used argument. */
1208│         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1209│         decrRefCount(argv[0]);
1210│         decrRefCount(argv[1]);
1211│         decrRefCount(argv[2]);
1212│         server.get_ack_from_slaves = 0;
1213│     }
1215│     /* Unblock all the clients blocked for synchronous replication
1216│      * in WAIT. */
1217│     if (listLength(server.clients_waiting_acks))
1218│         processClientsWaitingReplicas();
1220│     /* Check if there are clients unblocked by modules that implement
1221│      * blocking commands. */
1222├>    moduleHandleBlockedClients();
1224│     /* Try to process pending commands for clients that were just unblocked. */
1225│     if (listLength(server.unblocked_clients))
1226│         processUnblockedClients();
1228│     /* Write the AOF buffer on disk */
1229│     flushAppendOnlyFile(0);
1231│     /* Handle writes with pending output buffers. */
1232│     handleClientsWithPendingWrites();
1234│     /* Before we are going to sleep, let the threads access the dataset by
1235│      * releasing the GIL. Redis main thread will not touch anything at this
1236│      * time. */
1237│     if (moduleCount()) moduleReleaseGIL();
1238│ }

2 activeExpireCycle at expire.c

 93│  * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
 94│  * executed, where the time limit is a percentage of the REDIS_HZ period
 95│  * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
 97│ void activeExpireCycle(int type) {
 98│     /* This function has some global state in order to continue the work
 99│      * incrementally across calls. */
100│     static unsigned int current_db = 0; /* Last DB tested. */
101│     static int timelimit_exit = 0;      /* Time limit hit in previous call? */
102│     static long long last_fast_cycle = 0; /* When last fast cycle ran. */
104├>    int j, iteration = 0;
105│     int dbs_per_call = CRON_DBS_PER_CALL;
106│     long long start = ustime(), timelimit, elapsed;
108│     /* When clients are paused the dataset should be static not just from the
109│      * POV of clients not being able to write, but also from the POV of
110│      * expires and evictions of keys not being performed. */
111│     if (clientsArePaused()) return;
113│     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
114│         /* Don't start a fast cycle if the previous cycle did not exit
115│          * for time limt. Also don't repeat a fast cycle for the same period
116│          * as the fast cycle total duration itself. */

2 moduleHandleBlockedClients at module.c

3515│ /* This function will check the moduleUnblockedClients queue in order to
3516│  * call the reply callback and really unblock the client.
3517│  *
3518│  * Clients end into this list because of calls to RM_UnblockClient(),
3519│  * however it is possible that while the module was doing work for the
3520│  * blocked client, it was terminated by Redis (for timeout or other reasons).
3521│  * When this happens the RedisModuleBlockedClient structure in the queue
3522│  * will have the 'client' field set to NULL. */
3523├>void moduleHandleBlockedClients(void) {
3524│     listNode *ln;
3525│     RedisModuleBlockedClient *bc;
3527│     pthread_mutex_lock(&moduleUnblockedClientsMutex);
3528│     /* Here we unblock all the pending clients blocked in modules operations
3529│      * so we can read every pending "awake byte" in the pipe. */
3530│     char buf[1];
3531│     while (read(server.module_blocked_pipe[0],buf,1) == 1);
3532│     while (listLength(moduleUnblockedClients)) {
3533│         ln = listFirst(moduleUnblockedClients);
3534│         bc = ln->value;
3535│         client *c = bc->client;

2 flushAppendOnlyFile at aof.c


 320│  * About the 'force' argument:
 321│  *
 322│  * When the fsync policy is set to 'everysec' we may delay the flush if there
 323│  * is still an fsync() going on in the background thread, since for instance
 324│  * on Linux write(2) will be blocked by the background fsync anyway.
 325│  * When this happens we remember that there is some aof buffer to be
 326│  * flushed ASAP, and will try to do that in the serverCron() function.
 327│  *
 328│  * However if force is set to 1 we'll write regardless of the background
 329│  * fsync. */
 330│ #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
 331│ void flushAppendOnlyFile(int force) {
 332│     ssize_t nwritten;
 333├>    int sync_in_progress = 0;
 334│     mstime_t latency;
 336│     if (sdslen(server.aof_buf) == 0) return;
 338│     if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
 339│         sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
 341│     if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
 342│         /* With this append fsync policy we do background fsyncing.
 343│          * If the fsync is still in progress we can try to delay
 344│          * the write for a couple of seconds. */
 345│         if (sync_in_progress) {

2 handleClientsWithPendingWrites at networking.c


 999│ /* This function is called just before entering the event loop, in the hope
1000│  * we can just write the replies to the client output buffer without any
1001│  * need to use a syscall in order to install the writable event handler,
1002│  * get it called, and so forth. */
1003│ int handleClientsWithPendingWrites(void) {
1004│     listIter li;
1005│     listNode *ln;
1006│     int processed = listLength(server.clients_pending_write);
1008│     listRewind(server.clients_pending_write,&li);
1009│     while((ln = listNext(&li))) {
1010│         client *c = listNodeValue(ln);
1011│         c->flags &= ~CLIENT_PENDING_WRITE;
1012├>        listDelNode(server.clients_pending_write,ln);
1014│         /* Try to write buffers to the client socket. */
1015│         if (writeToClient(c->fd,c,0) == C_ERR) continue;
1017│         /* If after the synchronous writes above we still have data to
1018│          * output to the client, we need to install the writable handler. */
1019│         if (clientHasPendingReplies(c)) {
1020│             int ae_flags = AE_WRITABLE;
1021│             /* For the fsync=always policy, we want that a given FD is never
1022│              * served for reading and writing in the same event loop iteration,
1023│              * so that in the middle of receiving the query, and serving it
1024│              * to the client, we'll call beforeSleep() that will do the
1025│              * actual fsync of AOF to disk. AE_BARRIER ensures that. */
1026│             if (server.aof_state == AOF_ON &&
1027│                 server.aof_fsync == AOF_FSYNC_ALWAYS)

3 writeToClient at networking.c


 899│ /* Write data in output buffers to client. Return C_OK if the client
 900│  * is still valid after the call, C_ERR if it was freed. */
 901│ int writeToClient(int fd, client *c, int handler_installed) {
 902│     ssize_t nwritten = 0, totwritten = 0;
 903│     size_t objlen;
 904│     sds o;
 906├>    while(clientHasPendingReplies(c)) {
 907│         if (c->bufpos > 0) {
 908│             nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
 909│             if (nwritten <= 0) break;
 910│             c->sentlen += nwritten;
 911│             totwritten += nwritten;
 913│             /* If the buffer was sent, set bufpos to zero to continue with
 914│              * the remainder of the reply. */
 915│             if ((int)c->sentlen == c->bufpos) {
 916│                 c->bufpos = 0;
 917│                 c->sentlen = 0;
 918│             }
 943│         }
 944│         /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
 945│          * bytes, in a single threaded server it's a good idea to serve
 946│          * other clients as well, even if a very large request comes from
 947│          * super fast link that is always able to accept data (in real world
 948│          * scenario think about 'KEYS *' against the loopback interface).
 949│          *
 950│          * However if we are over the maxmemory limit we ignore that and
 951│          * just deliver as much data as it is possible to deliver.
 952│          *
 953│          * Moreover, we also send as much as possible if the client is
 954│          * a slave (otherwise, on high-speed traffic, the replication
 955│          * buffer will grow indefinitely) */
 956├>        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
 957│             (server.maxmemory == 0 ||
 958│              zmalloc_used_memory() < server.maxmemory) &&
 959│             !(c->flags & CLIENT_SLAVE)) break;
 960│     }
 961│     server.stat_net_output_bytes += totwritten;
 962│     if (nwritten == -1) {
 963│         if (errno == EAGAIN) {
 964│             nwritten = 0;
 965│         } else {
 966│             serverLog(LL_VERBOSE,
 967│                 "Error writing to client: %s", strerror(errno));
 968│             freeClient(c);


 972├>    if (totwritten > 0) {
 973│         /* For clients representing masters we don't count sending data
 974│          * as an interaction, since we always send REPLCONF ACK commands
 975│          * that take some time to just fill the socket output buffer.
 976│          * We just rely on data / pings received for timeout detection. */
 977│         if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
 978│     }
 979│     if (!clientHasPendingReplies(c)) {
 980│         c->sentlen = 0;
 981│         if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 983│         /* Close connection after entire reply has been sent. */
 984│         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
 985│             freeClient(c);
 986│             return C_ERR;
 987│         }
 988│     }
 989│     return C_OK;
 990│ }

4 clientHasPendingReplies at networking.c


 607│ /* Return true if the specified client has pending reply buffers to write to
 608│  * the socket. */
 609│ int clientHasPendingReplies(client *c) {
 610├>    return c->bufpos || listLength(c->reply);
 611│ }




$ wget http://download.redis.io/releases/redis-4.0.11.tar.gz
$ tar xzf redis-4.0.11.tar.gz 
$ cd redis-4.0.11/src/
$ vim Makefile # Rewrite from "OPTIMIZATION?=-O2" to "OPTIMIZATION?=-O0"
$ make CFLAGS="-g "
$ sudo make PREFIX=/usr/local/redis install
$ ./redis-server


$ gdb -p `ps aux | grep redis-server | grep -v grep | awk '{print $2}'`

もしくは brew install gdb cgdb でインストールして、gdb の代わりに cgdb を利用


$ redis-cli



$ docker pull futoase/redis-debug-4.0
$ docker run -it -p 9876:9876 \
  --privileged --cap-add=SYS_PTRACE \
  --security-opt seccomp=unconfined \
  futoase/redis-debug-4.0:latest /bin/bash
$ cd /root/redis-4.0.11/src
$ ./redis-server --port 9876 --protected-mode no --daemonize no


$ docker exec -it `docker ps | awk 'NR==2{print $1}'` /bin/bash

$ ps aux | head -n1; ps aux | grep redis-server | grep -v grep
$ cgdb -p プロセスID


$ redis-cli -p 9876


$ git clone https://github.com/cgdb/cgdb.git
$ cd cgdb
$ ./autogen.sh
$ CXXFLAGS='-std=c++11' ./configure --prefix=/usr/local
$ make
$ sudo make install

-std=c++11をつけないと、make時に"kui.cpp:310:15: error: ‘it’ does not name a type"のエラー https://github.com/cgdb/cgdb/issues/184


Redis のデバッグで、コマンド実行ごとにprocessCommand関数を通過するので、コマンド実行の際の挙動をこちらを追うためにはこちらにブレークポイントを貼っておくと良い。

set listsize 50 : listのデフォルト表示行数調整
ptype 構造体名
bt : バックトレース
info breakpoint : ブレークポイント一覧
b main
b processCommand
del breakpoint ブレークポイント : 一斉削除
del ブレークポイントID : ブレークポイント削除
info threads : スレッド一覧

