Dive Deep Redis Internals ~ GETコマンド実行時の動作 ~

Redis本を執筆しました。こちらも是非合わせてご覧ください。

———————————————————————————————————————————————————

Redis 4.0.10をソースからビルドしてデフォルト設定時、GETコマンド実行時の動作をソースコードで確認します。 https://github.com/antirez/redis

GETコマンド実行時の挙動

以下、2つの段階に分けて処理が実行されます。

1. クライアントから送られてきたリクエストを処理してClient Output Bufferに保存する処理
2. Client Output Bufferに保存されたリプライ内容をクライアントへ送る処理

概要

0. main関数からaeMain関数までの処理

aeMain関数までは、server.cでmain関数実行後、大きく以下の処理が実行される。パラメータ内容の読み込み後、Redisサーバの初期化を行う。

1 loadServerConfig 2 initServer 3 aeMain

1. クライアントから送られてきたリクエストを処理してClient Output Bufferに保存する処理

  • 0 aeMain at ae.c
  • 1 aeProcessEvents at ae.c
    • 2 aeSearchNearestTimer at ae.c
    • 2 aeApiPoll at ae.c
    • 2 afterSleep at server.c
    • 2 rfileProc -> readQueryFromClient at networking.c
      • 3 processInputBuffer at networking.c
        • 4 processMultibulkBuffer at networking.c
          • 5 createStringObject at object.c
            • 6 createEmbeddedStringObject at object.c
          • 5 createStringObject at object.c
        • 4 processMultibulkBuffer at networking.c
        • 4 processCommand at server.c
          • 5 lookupCommand at server.c
            • 6 dictFetchValue at dict.c
              • 7 dictFind at dict.c
                • 8 dictHashKey -> dictSdsCaseHash at server.c
        • 4 processCommand at server.c
          • 5 call at server.c
            • 6 proc -> getCommand at t_string.c
              • 7 getGenericCommand at t_string.c
                • 8 lookupKeyReadOrReply at db.c
                  • 9 lookupKeyRead at db.c
                    • 10 lookupKeyReadWithFlags at db.c
                • 8 addReplyBulk at networking.c
        • 4 resetClient at networking.c
    • 2 processTimeEvents at ae.c

2. Client Output Bufferに保存されたリプライ内容をクライアントへ送る処理

  • 0 aeMain at ae.c
    • 1 beforeSleep at server.c
      • 2 activeExpireCycle at expire.c
      • 2 moduleHandleBlockedClients at module.c
      • 2 flushAppendOnlyFile at aof.c
      • 2 handleClientsWithPendingWrites at networking.c
        • 3 writeToClient at networking.c
          • 4 clientHasPendingReplies at networking.c

詳細

クライアントから送られてきたリクエストを処理してClient Output Bufferに保存する処理

0 aeMain at ae.c

aeMain関数中のwhileで待機。aeProcessEvents関数が呼ばれる

496│ void aeMain(aeEventLoop *eventLoop) {
497│     eventLoop->stop = 0;
498├>    while (!eventLoop->stop) {
499│         if (eventLoop->beforesleep != NULL)
500│             eventLoop->beforesleep(eventLoop);
501│         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
502│     }
503│ }

1 aeProcessEvents at ae.c

346│  * Without special flags the function sleeps until some file event
347│  * fires, or when the next time event occurs (if any).
348│  *
349│  * If flags is 0, the function does nothing and returns.
350│  * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
351│  * if flags has AE_FILE_EVENTS set, file events are processed.
352│  * if flags has AE_TIME_EVENTS set, time events are processed.
353│  * if flags has AE_DONT_WAIT set the function returns ASAP until all
354│  * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
355│  * the events that's possible to process without to wait are processed.
356│  *
357│  * The function returns the number of events processed. */
358│ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
359├>{
360│     int processed = 0, numevents;
361│
362│     /* Nothing to do? return ASAP */
363│     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
364│
365│     /* Note that we want call select() even if there are no
366│      * file events to process as long as we want to process time
367│      * events, in order to sleep until the next time event is ready
368│      * to fire. */
369│     if (eventLoop->maxfd != -1 ||
370│         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
371│         int j;

aeSearchNearestTimer関数で直近のタイマーイベントを調べる

365│     /* Note that we want call select() even if there are no
366│      * file events to process as long as we want to process time
367│      * events, in order to sleep until the next time event is ready
368│      * to fire. */
369│     if (eventLoop->maxfd != -1 ||
370│         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
371│         int j;
372│         aeTimeEvent *shortest = NULL;
373│         struct timeval tv, *tvp;
374│
375│         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
376├>            shortest = aeSearchNearestTimer(eventLoop);
377│         if (shortest) {
378│             long now_sec, now_ms;
379│
380│             aeGetTime(&now_sec, &now_ms);
381│             tvp = &tv;
382│
383│             /* How many milliseconds we need to wait for the next
384│              * time event to fire? */
385│             long long ms =
386│                 (shortest->when_sec - now_sec)*1000 +
387│                 shortest->when_ms - now_ms;
388│

2 aeSearchNearestTimer at ae.c

243│ /* Search the first timer to fire.
244│  * This operation is useful to know how many time the select can be
245│  * put in sleep without to delay any event.
246│  * If there are no timers NULL is returned.
247│  *
248│  * Note that's O(N) since time events are unsorted.
249│  * Possible optimizations (not needed by Redis so far, but...):
250│  * 1) Insert the event in order, so that the nearest is just the head.
251│  *    Much better but still insertion or deletion of timers is O(N).
252│  * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
253│  */
254│ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
255│ {
256├>    aeTimeEvent *te = eventLoop->timeEventHead;
257│     aeTimeEvent *nearest = NULL;
258│
259│     while(te) {
260│         if (!nearest || te->when_sec < nearest->when_sec ||
261│                 (te->when_sec == nearest->when_sec &&
262│                  te->when_ms < nearest->when_ms))
263│             nearest = te;
264│         te = te->next;
265│     }
266│     return nearest;
267│ }
268│

1 aeProcessEvents at ae.c

409│         /* Call the multiplexing API, will return only on timeout or when
410│          * some event fires. */
411├>        numevents = aeApiPoll(eventLoop, tvp);
412│
413│         /* After sleep callback. */
414│         if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
415│             eventLoop->aftersleep(eventLoop);
416│
417│         for (j = 0; j < numevents; j++) {
418│             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
419│             int mask = eventLoop->fired[j].mask;
420│             int fd = eventLoop->fired[j].fd;
421│             int fired = 0; /* Number of events fired for current fd. */
422│

2 aeApiPoll at ae.c

numeventsには1の値が代入された状態で関数を抜ける

108│ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
109│     aeApiState *state = eventLoop->apidata;
110│     int retval, numevents = 0;
111│
112│     retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
113│             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
114│     if (retval > 0) {
115│         int j;
116│
117├>        numevents = retval;
118│         for (j = 0; j < numevents; j++) {
119│             int mask = 0;
120│             struct epoll_event *e = state->events+j;
121│
122│             if (e->events & EPOLLIN) mask |= AE_READABLE;
123│             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
124│             if (e->events & EPOLLERR) mask |= AE_WRITABLE;
125│             if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
126│             eventLoop->fired[j].fd = e->data.fd;
127│             eventLoop->fired[j].mask = mask;
128│         }
129│     }
130├>    return numevents;
131│ }

1 aeProcessEvents at ae.c

413│         /* After sleep callback. */
414│         if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
415├>            eventLoop->aftersleep(eventLoop);
416│
417│         for (j = 0; j < numevents; j++) {
418│             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
419│             int mask = eventLoop->fired[j].mask;
420│             int fd = eventLoop->fired[j].fd;
421│             int fired = 0; /* Number of events fired for current fd. */
422│

2 afterSleep at server.c

1240│ /* This function is called immadiately after the event loop multiplexing
1241│  * API returned, and the control is going to soon return to Redis by invoking
1242│  * the different events callbacks. */
1243│ void afterSleep(struct aeEventLoop *eventLoop) {
1244│     UNUSED(eventLoop);
1245├>    if (moduleCount()) moduleAcquireGIL();
1246│ }

1 aeProcessEvents at ae.c

ここでは GET なので AE_READABLE で rfileProc 関数が呼ばれる

436│             /* Note the "fe->mask & mask & ..." code: maybe an already
437│              * processed event removed an element that fired and we still
438│              * didn't processed, so we check if the event is still valid.
439│              *
440│              * Fire the readable event if the call sequence is not
441│              * inverted. */
442├>            if (!invert && fe->mask & mask & AE_READABLE) {
443│                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
444│                 fired++;
445│             }
446│
447│             /* Fire the writable event. */
448│             if (fe->mask & mask & AE_WRITABLE) {
449│                 if (!fired || fe->wfileProc != fe->rfileProc) {
450│                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
451│                     fired++;
452│                 }
453│             }

2 rfileProc -> readQueryFromClient at networking.c

1379│ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1380│     client *c = (client*) privdata;
1381│     int nread, readlen;
1382│     size_t qblen;
1383│     UNUSED(el);
1384│     UNUSED(mask);
1385│
1386├>    readlen = PROTO_IOBUF_LEN;
1387│     /* If this is a multi bulk request, and we are processing a bulk reply
1388│      * that is large enough, try to maximize the probability that the query
1389│      * buffer contains exactly the SDS string representing the object, even
1390│      * at the risk of requiring more read(2) calls. This way the function
1391│      * processMultiBulkBuffer() can avoid copying buffers to create the
1392│      * Redis Object representing the argument. */
1393│     if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1394│         && c->bulklen >= PROTO_MBULK_BIG_ARG)
1395│     {
1396│         ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
1397│
1398│         if (remaining < readlen) readlen = remaining;

processInputBuffer関数が呼ばれる

1440│     /* Time to process the buffer. If the client is a master we need to
1441│      * compute the difference between the applied offset before and after
1442│      * processing the buffer, to understand how much of the replication stream
1443│      * was actually applied to the master state: this quantity, and its
1444│      * corresponding part of the replication stream, will be propagated to
1445│      * the sub-slaves and to the replication backlog. */
1446├>    if (!(c->flags & CLIENT_MASTER)) {
1447│         processInputBuffer(c);
1448│     } else {
1449│         size_t prev_offset = c->reploff;
1450│         processInputBuffer(c);
1451│         size_t applied = c->reploff - prev_offset;
1452│         if (applied) {
1453│             replicationFeedSlavesFromMasterStream(server.slaves,
1454│                     c->pending_querybuf, applied);
1455│             sdsrange(c->pending_querybuf,applied,-1);
1456│         }
1457│     }

3 processInputBuffer at networking.c

1314│ /* This function is called every time, in the client structure 'c', there is
1315│  * more query buffer to process, because we read more data from the socket
1316│  * or because a client was blocked and later reactivated, so there could be
1317│  * pending query buffer, already representing a full command, to process. */
1318│ void processInputBuffer(client *c) {
1319│     server.current_client = c;
1320│     /* Keep processing while there is something in the input buffer */
1321├>    while(sdslen(c->querybuf)) {
1322│         /* Return if clients are paused. */
1323│         if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
1324│
1325│         /* Immediately abort if the client is in the middle of something. */
1326│         if (c->flags & CLIENT_BLOCKED) break;
1327│
1328│         /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
1329│          * written to the client. Make sure to not let the reply grow after
1330│          * this flag has been set (i.e. don't process more commands).
1331│          *
1332│          * The same applies for clients we want to terminate ASAP. */
1333│         if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

Redisではリクエストの種類にINLINEとMULTIBULKの2種類があり、リクエスト内容が"*"で始まっていることからMULTIBULKが選択される。

1335│         /* Determine request type when unknown. */
1336│         if (!c->reqtype) {
1337│             if (c->querybuf[0] == '*') {
1338├>                c->reqtype = PROTO_REQ_MULTIBULK;
1339│             } else {
1340│                 c->reqtype = PROTO_REQ_INLINE;
1341│             }
1342│         }

その後、processMultibulkBuffer関数が実行される

1344│         if (c->reqtype == PROTO_REQ_INLINE) {
1345│             if (processInlineBuffer(c) != C_OK) break;
1346│         } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
1347├>            if (processMultibulkBuffer(c) != C_OK) break;
1348│         } else {
1349│             serverPanic("Unknown request type");
1350│         }

4 processMultibulkBuffer at networking.c

1164│ /* Process the query buffer for client 'c', setting up the client argument
1165│  * vector for command execution. Returns C_OK if after running the function
1166│  * the client has a well-formed ready to be processed command, otherwise
1167│  * C_ERR if there is still to read more buffer to get the full command.
1168│  * The function also returns C_ERR when there is a protocol error: in such a
1169│  * case the client structure is setup to reply with the error and close
1170│  * the connection.
1171│  *
1172│  * This function is called if processInputBuffer() detects that the next
1173│  * command is in RESP format, so the first byte in the command is found
1174│  * to be '*'. Otherwise for inline commands processInlineBuffer() is called. */
1175├>int processMultibulkBuffer(client *c) {
1176│     char *newline = NULL;
1177│     long pos = 0;
1178│     int ok;
1179│     long long ll;
1180│
1181│     if (c->multibulklen == 0) {
1182│         /* The client should have been reset */
1183│         serverAssertWithInfo(c,NULL,c->argc == 0);
1184│
1185│         /* Multi bulk length cannot be read without a \r\n */
1186│         newline = strchr(c->querybuf,'\r');
1187│         if (newline == NULL) {

このとき、c->querybuf には、sds にRESP形式で"*2\r\n$3\r\nget\r\n$4\r\nhoge\r\n"の文字列が格納されている

1199│         /* We know for sure there is a whole line since newline != NULL,
1200│          * so go ahead and find out the multi bulk length. */
1201│         serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
1202├>        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
1203│         if (!ok || ll > 1024*1024) {
1204│             addReplyError(c,"Protocol error: invalid multibulk length");
1205│             setProtocolError("invalid mbulk count",c,pos);
1206│             return C_ERR;
1207│         }
1208│
1209│         pos = (newline-c->querybuf)+2;
1210│         if (ll <= 0) {
1211│             sdsrange(c->querybuf,pos,-1);
1212│             return C_OK;
1213│         }

ちょっと処理を進めて、c->querybuf+pos+1 には、"3\r\nget\r\n$4\r\nhoge\r\n" が格納されている

1249├>            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
1250│             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
1251│                 addReplyError(c,"Protocol error: invalid bulk length");
1252│                 setProtocolError("invalid bulk length",c,pos);
1253│                 return C_ERR;
1254│             }
1255│
1256│             pos += newline-(c->querybuf+pos)+2;
1257│             if (ll >= PROTO_MBULK_BIG_ARG) {
1258│                 size_t qblen;

オブジェクト作成の文字列がバッファに含まれるので最適化処理を行わず、createStringObject関数を実行

1275│         /* Read bulk argument */
1276│         if (sdslen(c->querybuf)-pos < (size_t)(c->bulklen+2)) {
1277│             /* Not enough data (+2 == trailing \r\n) */
1278│             break;
1279│         } else {
1280│             /* Optimization: if the buffer contains JUST our bulk element
1281│              * instead of creating a new object by *copying* the sds we
1282│              * just use the current sds string. */
1283│             if (pos == 0 &&
1284│                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1285│                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
1286│             {
1287│                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1288│                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1289│                 /* Assume that if we saw a fat argument we'll see another one
1290│                  * likely... */
1291│                 c->querybuf = sdsnewlen(NULL,c->bulklen+2);
1292│                 sdsclear(c->querybuf);
1293│                 pos = 0;
1294│             } else {
1295│                 c->argv[c->argc++] =
1296├>                    createStringObject(c->querybuf+pos,c->bulklen);
1297│                 pos += c->bulklen+2;
1298│             }

5 createStringObject at object.c

Redisで文字列をembstrかrawのエンコーディングで保存するか、格納対象の文字列とパラメータ値に基づいて実行。ここでは、createEmbeddedStringObject関数を実行

 110│ /* Create a string object with EMBSTR encoding if it is smaller than
 111│  * OBJ_ENCODING_EMBSTR_SIZE_LIMIT, otherwise the RAW encoding is
 112│  * used.
 113│  *
 114│  * The current limit of 39 is chosen so that the biggest string object
 115│  * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
 116│ #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44
 117│ robj *createStringObject(const char *ptr, size_t len) {
 118├>    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
 119│         return createEmbeddedStringObject(ptr,len);
 120│     else
 121│         return createRawStringObject(ptr,len);
 122│ }

6 createEmbeddedStringObject at object.c

文字列をembstrのエンコーディングで作成していく。sh->bufには"get"が格納される

  81│ /* Create a string object with encoding OBJ_ENCODING_EMBSTR, that is
  82│  * an object where the sds string is actually an unmodifiable string
  83│  * allocated in the same chunk as the object itself. */
  84│ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
  85│     robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1);
  86│     struct sdshdr8 *sh = (void*)(o+1);
  87│
  88│     o->type = OBJ_STRING;
  89│     o->encoding = OBJ_ENCODING_EMBSTR;
  90│     o->ptr = sh+1;
  91│     o->refcount = 1;
  92│     if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
  93│         o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
  94│     } else {
  95│         o->lru = LRU_CLOCK();
  96│     }
  97│
  98│     sh->len = len;
  99│     sh->alloc = len;
 100├>    sh->flags = SDS_TYPE_8;
 101│     if (ptr) {
 102│         memcpy(sh->buf,ptr,len);
 103│         sh->buf[len] = '\0';
 104│     } else {
 105│         memset(sh->buf,0,len+1);
 106│     }
 107│     return o;
 108│ }

ここで作成しているオブジェクトは以下の形式の構造体

type = struct redisObject {
    unsigned int type : 4;
    unsigned int encoding : 4;
    unsigned int lru : 24;
    int refcount;
    void *ptr;
}

4 processMultibulkBuffer at networking.c

1222│     serverAssertWithInfo(c,NULL,c->multibulklen > 0);
1223│     while(c->multibulklen) {
1224│         /* Read bulk length if unknown */
1225│         if (c->bulklen == -1) {
1226│             newline = strchr(c->querybuf+pos,'\r');
1227├>            if (newline == NULL) {
1228│                 if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
1229│                     addReplyError(c,
1230│                         "Protocol error: too big bulk count string");
1231│                     setProtocolError("too big bulk count string",c,0);
1232│                     return C_ERR;
1233│                 }
1234│                 break;
1235│             }
1236│
1237│             /* Buffer should also contain \n */
1238│             if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
1239│                 break;

c->querybuf+pos+1 には、先程"3\r\nget\r\n$4\r\nhoge\r\n" が格納されていたが、"4\r\nhoge\r\n"が今は格納されている。

1249├>            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
1250│             if (!ok || ll < 0 || ll > server.proto_max_bulk_len) {
1251│                 addReplyError(c,"Protocol error: invalid bulk length");
1252│                 setProtocolError("invalid bulk length",c,pos);
1253│                 return C_ERR;
1254│             }

c->querybuf+posを進め、"hoge\r\n"が格納された状態で、createStringObject関数を再度実行

1283│             if (pos == 0 &&
1284│                 c->bulklen >= PROTO_MBULK_BIG_ARG &&
1285│                 sdslen(c->querybuf) == (size_t)(c->bulklen+2))
1286│             {
1287│                 c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
1288│                 sdsIncrLen(c->querybuf,-2); /* remove CRLF */
1289│                 /* Assume that if we saw a fat argument we'll see another one
1290│                  * likely... */
1291│                 c->querybuf = sdsnewlen(NULL,c->bulklen+2);
1292│                 sdsclear(c->querybuf);
1293│                 pos = 0;
1294│             } else {
1295│                 c->argv[c->argc++] =
1296├>                    createStringObject(c->querybuf+pos,c->bulklen);
1297│                 pos += c->bulklen+2;
1298│             }
1299│             c->bulklen = -1;
1300│             c->multibulklen--;
1301│         }
1302│     }

5 createStringObject at object.c

sh->bufには"hoge"が格納された状態で値を返す。

  98│     sh->len = len;
  99│     sh->alloc = len;
 100│     sh->flags = SDS_TYPE_8;
 101│     if (ptr) {
 102│         memcpy(sh->buf,ptr,len);
 103│         sh->buf[len] = '\0';
 104│     } else {
 105│         memset(sh->buf,0,len+1);
 106│     }
 107├>    return o;

4 processMultibulkBuffer at networking.c

1304│     /* Trim to pos */
1305├>    if (pos) sdsrange(c->querybuf,pos,-1);
1306│
1307│     /* We're done when c->multibulk == 0 */
1308│     if (c->multibulklen == 0) return C_OK;
1309│
1310│     /* Still not ready to process the command */
1311│     return C_ERR;

3 processInputBuffer at networking.c

1352│         /* Multibulk processing could see a <= 0 length. */
1353│         if (c->argc == 0) {
1354│             resetClient(c);
1355│         } else {
1356│             /* Only reset the client when the command was executed. */
1357├>            if (processCommand(c) == C_OK) {
1358│                 if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1359│                     /* Update the applied replication offset of our master. */
1360│                     c->reploff = c->read_reploff - sdslen(c->querybuf);
1361│                 }
1362│
1363│                 /* Don't reset the client structure for clients blocked in a
1364│                  * module blocking command, so that the reply callback will
1365│                  * still be able to access the client argv and argc field.
1366│                  * The client will be reset in unblockClientFromModule(). */
1367│                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
1368│                     resetClient(c);
1369│             }

4 processCommand at server.c

lookupCommand関数が呼ばれる

2324│ /* If this function gets called we already read a whole
2325│  * command, arguments are in the client argv/argc fields.
2326│  * processCommand() execute the command or prepare the
2327│  * server for a bulk read from the client.
2328│  *
2329│  * If C_OK is returned the client is still alive and valid and
2330│  * other operations can be performed by the caller. Otherwise
2331│  * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
2332├>int processCommand(client *c) {
2333│     /* The QUIT command is handled separately. Normal command procs will
2334│      * go through checking for replication and QUIT will cause trouble
2335│      * when FORCE_REPLICATION is enabled and would be implemented in
2336│      * a regular command proc. */
2337│     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
2338│         addReply(c,shared.ok);
2339│         c->flags |= CLIENT_CLOSE_AFTER_REPLY;
2340│         return C_ERR;
2341│     }
2342│
2343│     /* Now lookup the command and check ASAP about trivial error conditions
2344│      * such as wrong arity, bad command name and so forth. */
2345├>    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2346│     if (!c->cmd) {
2347│         flagTransaction(c);
2348│         sds args = sdsempty();
2349│         int i;
2350│         for (i=1; i < c->argc && sdslen(args) < 128; i++)
2351│             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
2352│         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
2353│             (char*)c->argv[0]->ptr, args);
2354│         sdsfree(args);
2355│         return C_OK;
2356│     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2357│                (c->argc < -c->cmd->arity)) {

5 lookupCommand at server.c

2070│ struct redisCommand *lookupCommand(sds name) {
2071├>    return dictFetchValue(server.commands, name);
2072│ }

6 dictFetchValue at dict.c

 497│ void *dictFetchValue(dict *d, const void *key) {
 498│     dictEntry *he;
 499│
 500├>    he = dictFind(d,key);
 501│     return he ? dictGetVal(he) : NULL;
 502│ }

7 dictFind at dict.c

dictHashKey関数を実行すると、dictSdsCaseHash関数が呼ばれる。

 476│ dictEntry *dictFind(dict *d, const void *key)
 477│ {
 478│     dictEntry *he;
 479│     uint64_t h, idx, table;
 480│
 481├>    if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
 482│     if (dictIsRehashing(d)) _dictRehashStep(d);
 483│     h = dictHashKey(d, key);
 484│     for (table = 0; table <= 1; table++) {
 485│         idx = h & d->ht[table].sizemask;
 486│         he = d->ht[table].table[idx];
 487│         while(he) {
 488│             if (key==he->key || dictCompareKeys(d, key, he->key))
 489│                 return he;
 490│             he = he->next;
 491│         }
 492│         if (!dictIsRehashing(d)) return NULL;
 493│     }

8 dictHashKey -> dictSdsCaseHash at server.c

 498│ uint64_t dictSdsCaseHash(const void *key) {
 499├>    return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
 500│ }

4 processCommand at server.c

c->cmd には、以下の値へのポインタが格納された状態で処理を続行。

{name = 0x561b161f3dd2 &quot;get&quot;, proc = 0x561b1611cafa &lt;getCommand&gt;, arity = 2, sflags = 0x561b161f3dd6 &quot;r
F&quot;, flags = 8194, getkeys_proc = 0x0, firstkey = 1, lastkey = 1, keystep = 1, microseconds = 62, calls = 5}
2343│     /* Now lookup the command and check ASAP about trivial error conditions
2344│      * such as wrong arity, bad command name and so forth. */
2345│     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
2346├>    if (!c->cmd) {
2347│         flagTransaction(c);
2348│         sds args = sdsempty();
2349│         int i;
2350│         for (i=1; i < c->argc && sdslen(args) < 128; i++)
2351│             args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
2352│         addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
2353│             (char*)c->argv[0]->ptr, args);
2354│         sdsfree(args);
2355│         return C_OK;
2356│     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
2357│                (c->argc < -c->cmd->arity)) {
2358│         flagTransaction(c);

クラスターモード有効時で、対象のキーが今いるノードにスロットが無い場合等にはリダイレクト処理。ここでは特に必要ないのでスキップ。

2372│     /* If cluster is enabled perform the cluster redirection here.
2373│      * However we don't perform the redirection if:
2374│      * 1) The sender of this command is our master.
2375│      * 2) The command has no key arguments. */
2376├>    if (server.cluster_enabled &&
2377│         !(c->flags & CLIENT_MASTER) &&
2378│         !(c->flags & CLIENT_LUA &&
2379│           server.lua_caller->flags & CLIENT_MASTER) &&
2380│         !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
2381│           c->cmd->proc != execCommand))
2382│     {
2383│         int hashslot;
2384│         int error_code;
2385│         clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
2386│                                         &hashslot,&error_code);
2387│         if (n == NULL || n != server.cluster->myself) {
2388│             if (c->cmd->proc == execCommand) {

その後、call 関数を実行して、実際に入力されたコマンドの内容を処理。

2507│     /* Exec the command */
2508│     if (c->flags & CLIENT_MULTI &&
2509│         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2510│         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2511│     {
2512│         queueMultiCommand(c);
2513│         addReply(c,shared.queued);
2514│     } else {
2515├>        call(c,CMD_CALL_FULL);
2516│         c->woff = server.master_repl_offset;
2517│         if (listLength(server.ready_keys))
2518│             handleClientsBlockedOnLists();
2519│     }
2520│     return C_OK;
2521│ }

5 call at server.c

c->cmd->proc(c)が実行される。proc関数は、redisCommandTable変数で各コマンドに対して定義している実行する関数が呼ばれる。 https://github.com/antirez/redis/blob/4.0.10/src/server.c#L75-L308

2196│  * slaves propagation will never occur.
2197│  *
2198│  * Client flags are modified by the implementation of a given command
2199│  * using the following API:
2200│  *
2201│  * forceCommandPropagation(client *c, int flags);
2202│  * preventCommandPropagation(client *c);
2203│  * preventCommandAOF(client *c);
2204│  * preventCommandReplication(client *c);
2205│  *
2206│  */
2207│ void call(client *c, int flags) {
2208│     long long dirty, start, duration;
2209├>    int client_old_flags = c->flags;
2210│
2211│     /* Sent the command to clients in MONITOR mode, only if the commands are
2212│      * not generated from reading an AOF. */
2213│     if (listLength(server.monitors) &&
2214│         !server.loading &&
2215│         !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
2216│     {
2217│         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
2218│     }
2219│
2220│     /* Initialization: clear the flags that must be set by the command on
2221│      * demand, and initialize the array for additional commands propagation. */
2222│     c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
2223│     redisOpArray prev_also_propagate = server.also_propagate;
2224│     redisOpArrayInit(&server.also_propagate);
2225│
2226│     /* Call the command. */
2227│     dirty = server.dirty;
2228│     start = ustime();
2229├>    c->cmd->proc(c);
2230│     duration = ustime()-start;
2231│     dirty = server.dirty-dirty;
2232│     if (dirty < 0) dirty = 0;
2233│
2234│     /* When EVAL is called loading the AOF we don't want commands called
2235│      * from Lua to go into the slowlog or to populate statistics. */
2236│     if (server.loading && c->flags & CLIENT_LUA)
2237│         flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
2238│
2239│     /* If the caller is Lua, we want to force the EVAL caller to propagate
2240│      * the script if the command flag or client flag are forcing the
2241│      * propagation. */

6 proc -> getCommand at t_string.c

172│ void getCommand(client *c) {
173├>    getGenericCommand(c);
174│ }

7 getGenericCommand at t_string.c

addReplyBulkを実行

157│ int getGenericCommand(client *c) {
158│     robj *o;
159│
160├>    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
161│         return C_OK;
162│
163│     if (o->type != OBJ_STRING) {
164│         addReply(c,shared.wrongtypeerr);
165│         return C_ERR;
166│     } else {
167│         addReplyBulk(c,o);
168│         return C_OK;
169│     }
170│ }

8 lookupKeyReadOrReply at db.c

lookupKeyRead関数を実行。addReply関数は実行せず、robjオブジェクトへのポインタを返す

 151│ robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
 152├>    robj *o = lookupKeyRead(c->db, key);
 153│     if (!o) addReply(c,reply);
 154│     return o;
 155│ }

9 lookupKeyRead at db.c

 135│ /* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
 136│  * common case. */
 137│ robj *lookupKeyRead(redisDb *db, robj *key) {
 138├>    return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
 139│ }

10 lookupKeyReadWithFlags at db.c

  88│  * Flags change the behavior of this command:
  89│  *
  90│  *  LOOKUP_NONE (or zero): no special flags are passed.
  91│  *  LOOKUP_NOTOUCH: don't alter the last access time of the key.
  92│  *
  93│  * Note: this function also returns NULL is the key is logically expired
  94│  * but still existing, in case this is a slave, since this API is called only
  95│  * for read operations. Even if the key expiry is master-driven, we can
  96│  * correctly report a key is expired on slaves even if the master is lagging
  97│  * expiring our key via DELs in the replication link. */
  98│ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
  99│     robj *val;
 100│
 101├>    if (expireIfNeeded(db,key) == 1) {
 102│         /* Key expired. If we are in the context of a master, expireIfNeeded()
 103│          * returns 0 only when the key does not exist at all, so it's safe
 104│          * to return NULL ASAP. */
 105│         if (server.masterhost == NULL) return NULL;
 106│
 107│         /* However if we are in the context of a slave, expireIfNeeded() will
 108│          * not really try to expire the key, it only returns information
 109│          * about the "logical" status of the key: key expiring is up to the
 110│          * master in order to have a consistent view of master's data set.
 111│          *
 112│          * However, if the command caller is not the master, and as additional
 113│          * safety measure, the command invoked is a read-only command, we can
:
:
 127├>    val = lookupKey(db,key,flags);
 128│     if (val == NULL)
 129│         server.stat_keyspace_misses++;
 130│     else
 131│         server.stat_keyspace_hits++;
 132│     return val;
 133│ }

8 addReplyBulk at networking.c

MULTI_BULK形式でリプライを生成。

 556│ /* Add a Redis Object as a bulk reply */
 557│ void addReplyBulk(client *c, robj *obj) {
 558├>    addReplyBulkLen(c,obj);
 559│     addReply(c,obj);
 560│     addReply(c,shared.crlf);
 561│ }

5 call at server.c

call関数の処理続行。必要に応じてスローログの記録処理。statsの記録にも反映。

2249│     /* Log the command into the Slow log if needed, and populate the
2250│      * per-command statistics that we show in INFO commandstats. */
2251│     if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
2252│         char *latency_event = (c->cmd->flags & CMD_FAST) ?
2253│                               "fast-command" : "command";
2254│         latencyAddSampleIfNeeded(latency_event,duration/1000);
2255│         slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
2256│     }
2257│     if (flags & CMD_CALL_STATS) {
2258│         c->lastcmd->microseconds += duration;
2259├>        c->lastcmd->calls++;
2260│     }

AOFやレプリケーションにも反映。

2262│     /* Propagate the command into the AOF and replication link */
2263│     if (flags & CMD_CALL_PROPAGATE &&
2264│         (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
2265│     {
2266├>        int propagate_flags = PROPAGATE_NONE;
2267│
2268│         /* Check if the command operated changes in the data set. If so
2269│          * set for replication / AOF propagation. */
2270│         if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
2271│
2272│         /* If the client forced AOF / replication of the command, set
2273│          * the flags regardless of the command effects on the data set. */
2274│         if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
2275│         if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
2276│
2277│         /* However prevent AOF / replication propagation if the command
2278│          * implementatino called preventCommandPropagation() or similar,
:
:
2287│         /* Call propagate() only if at least one of AOF / replication
2288│          * propagation is needed. Note that modules commands handle replication
2289│          * in an explicit way, so we never replicate them automatically. */
2290├>        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
2291│             propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
2292│     }

4 processCommand at server.c

call関数実行後、processCommand関数の処理に戻ってくる。handleClientsBlockedOnLists関数は実行せず、そのまま、C_OKを返す。

2507│     /* Exec the command */
2508│     if (c->flags & CLIENT_MULTI &&
2509│         c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
2510│         c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
2511│     {
2512│         queueMultiCommand(c);
2513│         addReply(c,shared.queued);
2514│     } else {
2515│         call(c,CMD_CALL_FULL);
2516│         c->woff = server.master_repl_offset;
2517├>        if (listLength(server.ready_keys))
2518│             handleClientsBlockedOnLists();
2519│     }
2520│     return C_OK;

3 processInputBuffer at networking.c

processCommand関数実行後、processInputBuffer関数の処理に戻ってくる。ブロックされていない処理なので、resetClient関数を実行。その後、while文の最後まで達する。その後、processInputBuffer関数を抜け、readQueryFromClient関数を抜け、aeProcessEvents関数の処理に戻ってくる。

1352│         /* Multibulk processing could see a <= 0 length. */
1353│         if (c->argc == 0) {
1354│             resetClient(c);
1355│         } else {
1356│             /* Only reset the client when the command was executed. */
1357│             if (processCommand(c) == C_OK) {
1358├>                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
1359│                     /* Update the applied replication offset of our master. */
1360│                     c->reploff = c->read_reploff - sdslen(c->querybuf);
1361│                 }
1362│
1363│                 /* Don't reset the client structure for clients blocked in a
1364│                  * module blocking command, so that the reply callback will
1365│                  * still be able to access the client argv and argc field.
1366│                  * The client will be reset in unblockClientFromModule(). */
1367│                 if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
1368│                     resetClient(c);
1369│             }

4 resetClient at networking.c

1041│ /* resetClient prepare the client to process the next command */
1042│ void resetClient(client *c) {
1043│     redisCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
1044│
1045│     freeClientArgv(c);
1046│     c->reqtype = 0;
1047│     c->multibulklen = 0;
1048│     c->bulklen = -1;
1049│
1050│     /* We clear the ASKING flag as well if we are not inside a MULTI, and
1051│      * if what we just executed is not the ASKING command itself. */
1052├>    if (!(c->flags & CLIENT_MULTI) && prevcmd != askingCommand)
1053│         c->flags &= ~CLIENT_ASKING;
1054│
1055│     /* Remove the CLIENT_REPLY_SKIP flag if any so that the reply
1056│      * to the next command will be sent, but set the flag if the command
1057│      * we just processed was "CLIENT REPLY SKIP". */
1058│     c->flags &= ~CLIENT_REPLY_SKIP;
1059│     if (c->flags & CLIENT_REPLY_SKIP_NEXT) {
1060│         c->flags |= CLIENT_REPLY_SKIP;
1061│         c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
1062│     }
1063│ }

1 aeProcessEvents

Read処理実行後、Write処理等は特に無いため、そのまま処理継続。最後にタイマーイベントを確認。processTimeEvents関数実行後、aeProcessEvents関数の処理を終え、aeMain関数に戻ってくる。

436│             /* Note the "fe->mask & mask & ..." code: maybe an already
437│              * processed event removed an element that fired and we still
438│              * didn't processed, so we check if the event is still valid.
439│              *
440│              * Fire the readable event if the call sequence is not
441│              * inverted. */
442│             if (!invert && fe->mask & mask & AE_READABLE) {
443│                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
444├>                fired++;
445│             }
446│
447│             /* Fire the writable event. */
448│             if (fe->mask & mask & AE_WRITABLE) {
449│                 if (!fired || fe->wfileProc != fe->rfileProc) {
450│                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);
451│                     fired++;
452│                 }
453│             }
:
:
467│     /* Check time events */
468├>    if (flags & AE_TIME_EVENTS)
469│         processed += processTimeEvents(eventLoop);
470│
471│     return processed; /* return the number of processed file/time events */
472│ }

2 processTimeEvents at ae.c

269│ /* Process time events */
270│ static int processTimeEvents(aeEventLoop *eventLoop) {
271│     int processed = 0;
272│     aeTimeEvent *te;
273│     long long maxId;
274├>    time_t now = time(NULL);
275│
276│     /* If the system clock is moved to the future, and then set back to the
277│      * right value, time events may be delayed in a random way. Often this
278│      * means that scheduled operations will not be performed soon enough.
279│      *
280│      * Here we try to detect system clock skews, and force all the time
281│      * events to be processed ASAP when this happens: the idea is that
282│      * processing events earlier is less dangerous than delaying them
283│      * indefinitely, and practice suggests it is. */
284│     if (now < eventLoop->lastTime) {
285│         te = eventLoop->timeEventHead;
286│         while(te) {
287│             te->when_sec = 0;
288│             te = te->next;
289│         }
290│     }
291│     eventLoop->lastTime = now;
292│
293│     te = eventLoop->timeEventHead;
294│     maxId = eventLoop->timeEventNextId-1;
295├>    while(te) {
296│         long now_sec, now_ms;
297│         long long id;
298│
299│         /* Remove events scheduled for deletion. */
300│         if (te->id == AE_DELETED_EVENT_ID) {
301│             aeTimeEvent *next = te->next;
302│             if (te->prev)
303│                 te->prev->next = te->next;
304│             else
305│                 eventLoop->timeEventHead = te->next;
306│             if (te->next)
307│                 te->next->prev = te->prev;
:
:
320│         if (te->id > maxId) {
321│             te = te->next;
322│             continue;
323│         }
324│         aeGetTime(&now_sec, &now_ms);
325│         if (now_sec > te->when_sec ||
326│             (now_sec == te->when_sec && now_ms >= te->when_ms))
327│         {
328│             int retval;
329│
330├>            id = te->id;
331│             retval = te->timeProc(eventLoop, id, te->clientData);
332│             processed++;
333│             if (retval != AE_NOMORE) {
334│                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
335│             } else {
336│                 te->id = AE_DELETED_EVENT_ID;
337│             }
338│         }
339│         te = te->next;
340│     }
341│     return processed;
342│ }

Client Output Bufferに保存されたリプライ内容をクライアントへ送る処理

0 aeMain at ae.c

aeMain関数に戻ってきた後、beforesleep関数が呼ばれる。

496│ void aeMain(aeEventLoop *eventLoop) {
497│     eventLoop->stop = 0;
498│     while (!eventLoop->stop) {
499│         if (eventLoop->beforesleep != NULL)
500├>            eventLoop->beforesleep(eventLoop);
501│         aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
502│     }
503│ }

1 beforeSleep at server.c

handleClientsWithPendingWrites関数でClient Output Bufferに格納された結果をクライアントに返す。その後、再びaeMain関数のループ処理に戻ってきて、一連の処理は終了。

1183│ /* This function gets called every time Redis is entering the
1184│  * main loop of the event driven library, that is, before to sleep
1185│  * for ready file descriptors. */
1186├>void beforeSleep(struct aeEventLoop *eventLoop) {
1187│     UNUSED(eventLoop);
1188│
1189│     /* Call the Redis Cluster before sleep function. Note that this function
1190│      * may change the state of Redis Cluster (from ok to fail or vice versa),
1191│      * so it's a good idea to call it before serving the unblocked clients
1192│      * later in this function. */
1193│     if (server.cluster_enabled) clusterBeforeSleep();
1194│
1195│     /* Run a fast expire cycle (the called function will return
1196│      * ASAP if a fast cycle is not needed). */
1197│     if (server.active_expire_enabled && server.masterhost == NULL)
1198│         activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
1199│
1200│     /* Send all the slaves an ACK request if at least one client blocked
1201│      * during the previous event loop iteration. */
1202│     if (server.get_ack_from_slaves) {
1203│         robj *argv[3];
1204│
1205│         argv[0] = createStringObject("REPLCONF",8);
1206│         argv[1] = createStringObject("GETACK",6);
1207│         argv[2] = createStringObject("*",1); /* Not used argument. */
1208│         replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
1209│         decrRefCount(argv[0]);
1210│         decrRefCount(argv[1]);
1211│         decrRefCount(argv[2]);
1212│         server.get_ack_from_slaves = 0;
1213│     }
1214│
1215│     /* Unblock all the clients blocked for synchronous replication
1216│      * in WAIT. */
1217│     if (listLength(server.clients_waiting_acks))
1218│         processClientsWaitingReplicas();
1219│
1220│     /* Check if there are clients unblocked by modules that implement
1221│      * blocking commands. */
1222├>    moduleHandleBlockedClients();
1223│
1224│     /* Try to process pending commands for clients that were just unblocked. */
1225│     if (listLength(server.unblocked_clients))
1226│         processUnblockedClients();
1227│
1228│     /* Write the AOF buffer on disk */
1229│     flushAppendOnlyFile(0);
1230│
1231│     /* Handle writes with pending output buffers. */
1232│     handleClientsWithPendingWrites();
1233│
1234│     /* Before we are going to sleep, let the threads access the dataset by
1235│      * releasing the GIL. Redis main thread will not touch anything at this
1236│      * time. */
1237│     if (moduleCount()) moduleReleaseGIL();
1238│ }

2 activeExpireCycle at expire.c

 93│  * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
 94│  * executed, where the time limit is a percentage of the REDIS_HZ period
 95│  * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
 96│
 97│ void activeExpireCycle(int type) {
 98│     /* This function has some global state in order to continue the work
 99│      * incrementally across calls. */
100│     static unsigned int current_db = 0; /* Last DB tested. */
101│     static int timelimit_exit = 0;      /* Time limit hit in previous call? */
102│     static long long last_fast_cycle = 0; /* When last fast cycle ran. */
103│
104├>    int j, iteration = 0;
105│     int dbs_per_call = CRON_DBS_PER_CALL;
106│     long long start = ustime(), timelimit, elapsed;
107│
108│     /* When clients are paused the dataset should be static not just from the
109│      * POV of clients not being able to write, but also from the POV of
110│      * expires and evictions of keys not being performed. */
111│     if (clientsArePaused()) return;
112│
113│     if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
114│         /* Don't start a fast cycle if the previous cycle did not exit
115│          * for time limt. Also don't repeat a fast cycle for the same period
116│          * as the fast cycle total duration itself. */

2 moduleHandleBlockedClients at module.c

3515│ /* This function will check the moduleUnblockedClients queue in order to
3516│  * call the reply callback and really unblock the client.
3517│  *
3518│  * Clients end into this list because of calls to RM_UnblockClient(),
3519│  * however it is possible that while the module was doing work for the
3520│  * blocked client, it was terminated by Redis (for timeout or other reasons).
3521│  * When this happens the RedisModuleBlockedClient structure in the queue
3522│  * will have the 'client' field set to NULL. */
3523├>void moduleHandleBlockedClients(void) {
3524│     listNode *ln;
3525│     RedisModuleBlockedClient *bc;
3526│
3527│     pthread_mutex_lock(&moduleUnblockedClientsMutex);
3528│     /* Here we unblock all the pending clients blocked in modules operations
3529│      * so we can read every pending "awake byte" in the pipe. */
3530│     char buf[1];
3531│     while (read(server.module_blocked_pipe[0],buf,1) == 1);
3532│     while (listLength(moduleUnblockedClients)) {
3533│         ln = listFirst(moduleUnblockedClients);
3534│         bc = ln->value;
3535│         client *c = bc->client;

2 flushAppendOnlyFile at aof.c

AOFの設定は特にしてないので、ここでは特に何も処理を行わない。

 320│  * About the 'force' argument:
 321│  *
 322│  * When the fsync policy is set to 'everysec' we may delay the flush if there
 323│  * is still an fsync() going on in the background thread, since for instance
 324│  * on Linux write(2) will be blocked by the background fsync anyway.
 325│  * When this happens we remember that there is some aof buffer to be
 326│  * flushed ASAP, and will try to do that in the serverCron() function.
 327│  *
 328│  * However if force is set to 1 we'll write regardless of the background
 329│  * fsync. */
 330│ #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
 331│ void flushAppendOnlyFile(int force) {
 332│     ssize_t nwritten;
 333├>    int sync_in_progress = 0;
 334│     mstime_t latency;
 335│
 336│     if (sdslen(server.aof_buf) == 0) return;
 337│
 338│     if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
 339│         sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;
 340│
 341│     if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
 342│         /* With this append fsync policy we do background fsyncing.
 343│          * If the fsync is still in progress we can try to delay
 344│          * the write for a couple of seconds. */
 345│         if (sync_in_progress) {

2 handleClientsWithPendingWrites at networking.c

writeToClient関数を実行

 999│ /* This function is called just before entering the event loop, in the hope
1000│  * we can just write the replies to the client output buffer without any
1001│  * need to use a syscall in order to install the writable event handler,
1002│  * get it called, and so forth. */
1003│ int handleClientsWithPendingWrites(void) {
1004│     listIter li;
1005│     listNode *ln;
1006│     int processed = listLength(server.clients_pending_write);
1007│
1008│     listRewind(server.clients_pending_write,&li);
1009│     while((ln = listNext(&li))) {
1010│         client *c = listNodeValue(ln);
1011│         c->flags &= ~CLIENT_PENDING_WRITE;
1012├>        listDelNode(server.clients_pending_write,ln);
1013│
1014│         /* Try to write buffers to the client socket. */
1015│         if (writeToClient(c->fd,c,0) == C_ERR) continue;
1016│
1017│         /* If after the synchronous writes above we still have data to
1018│          * output to the client, we need to install the writable handler. */
1019│         if (clientHasPendingReplies(c)) {
1020│             int ae_flags = AE_WRITABLE;
1021│             /* For the fsync=always policy, we want that a given FD is never
1022│              * served for reading and writing in the same event loop iteration,
1023│              * so that in the middle of receiving the query, and serving it
1024│              * to the client, we'll call beforeSleep() that will do the
1025│              * actual fsync of AOF to disk. AE_BARRIER ensures that. */
1026│             if (server.aof_state == AOF_ON &&
1027│                 server.aof_fsync == AOF_FSYNC_ALWAYS)

3 writeToClient at networking.c

clientHasPendingReplies関数を実行。write関数実行後、クライアントには結果が返される。

 899│ /* Write data in output buffers to client. Return C_OK if the client
 900│  * is still valid after the call, C_ERR if it was freed. */
 901│ int writeToClient(int fd, client *c, int handler_installed) {
 902│     ssize_t nwritten = 0, totwritten = 0;
 903│     size_t objlen;
 904│     sds o;
 905│
 906├>    while(clientHasPendingReplies(c)) {
 907│         if (c->bufpos > 0) {
 908│             nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
 909│             if (nwritten <= 0) break;
 910│             c->sentlen += nwritten;
 911│             totwritten += nwritten;
 912│
 913│             /* If the buffer was sent, set bufpos to zero to continue with
 914│              * the remainder of the reply. */
 915│             if ((int)c->sentlen == c->bufpos) {
 916│                 c->bufpos = 0;
 917│                 c->sentlen = 0;
 918│             }
:
:
 943│         }
 944│         /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
 945│          * bytes, in a single threaded server it's a good idea to serve
 946│          * other clients as well, even if a very large request comes from
 947│          * super fast link that is always able to accept data (in real world
 948│          * scenario think about 'KEYS *' against the loopback interface).
 949│          *
 950│          * However if we are over the maxmemory limit we ignore that and
 951│          * just deliver as much data as it is possible to deliver.
 952│          *
 953│          * Moreover, we also send as much as possible if the client is
 954│          * a slave (otherwise, on high-speed traffic, the replication
 955│          * buffer will grow indefinitely) */
 956├>        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
 957│             (server.maxmemory == 0 ||
 958│              zmalloc_used_memory() < server.maxmemory) &&
 959│             !(c->flags & CLIENT_SLAVE)) break;
 960│     }
 961│     server.stat_net_output_bytes += totwritten;
 962│     if (nwritten == -1) {
 963│         if (errno == EAGAIN) {
 964│             nwritten = 0;
 965│         } else {
 966│             serverLog(LL_VERBOSE,
 967│                 "Error writing to client: %s", strerror(errno));
 968│             freeClient(c);

その後、clientHasPendingReplies関数でクライアントに返すpending中のリプライが無いことを確認して、aeDeleteFileEvent関数を実行

 972├>    if (totwritten > 0) {
 973│         /* For clients representing masters we don't count sending data
 974│          * as an interaction, since we always send REPLCONF ACK commands
 975│          * that take some time to just fill the socket output buffer.
 976│          * We just rely on data / pings received for timeout detection. */
 977│         if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
 978│     }
 979│     if (!clientHasPendingReplies(c)) {
 980│         c->sentlen = 0;
 981│         if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 982│
 983│         /* Close connection after entire reply has been sent. */
 984│         if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
 985│             freeClient(c);
 986│             return C_ERR;
 987│         }
 988│     }
 989│     return C_OK;
 990│ }

4 clientHasPendingReplies at networking.c

クライアントに返す結果があるのでtrueを返す。

 607│ /* Return true if the specified client has pending reply buffers to write to
 608│  * the socket. */
 609│ int clientHasPendingReplies(client *c) {
 610├>    return c->bufpos || listLength(c->reply);
 611│ }

事前準備

Redis

デバッグ用にビルドしておく。また、最適化は無効化しておく。

$ wget http://download.redis.io/releases/redis-4.0.11.tar.gz
$ tar xzf redis-4.0.11.tar.gz 
$ cd redis-4.0.11/src/
$ vim Makefile # Rewrite from "OPTIMIZATION?=-O2" to "OPTIMIZATION?=-O0"
$ make CFLAGS="-g "
$ sudo make PREFIX=/usr/local/redis install
$ ./redis-server

別Terminal

$ gdb -p `ps aux | grep redis-server | grep -v grep | awk '{print $2}'`

もしくは brew install gdb cgdb でインストールして、gdb の代わりに cgdb を利用

別Terminal2

$ redis-cli

Docker

もしくは用意されているDockerコンテナを利用する方法もOK

$ docker pull futoase/redis-debug-4.0
$ docker run -it -p 9876:9876 \
  --privileged --cap-add=SYS_PTRACE \
  --security-opt seccomp=unconfined \
  futoase/redis-debug-4.0:latest /bin/bash
$ cd /root/redis-4.0.11/src
$ ./redis-server --port 9876 --protected-mode no --daemonize no

別Terminal

$ docker exec -it `docker ps | awk 'NR==2{print $1}'` /bin/bash

$ ps aux | head -n1; ps aux | grep redis-server | grep -v grep
$ cgdb -p プロセスID

別Terminal2

$ redis-cli -p 9876

cgdb

$ git clone https://github.com/cgdb/cgdb.git
$ cd cgdb
$ ./autogen.sh
$ CXXFLAGS='-std=c++11' ./configure --prefix=/usr/local
$ make
$ sudo make install

-std=c++11をつけないと、make時に"kui.cpp:310:15: error: ‘it’ does not name a type"のエラー https://github.com/cgdb/cgdb/issues/184

gdb

Redis のデバッグで、コマンド実行ごとにprocessCommand関数を通過するので、コマンド実行の際の挙動をこちらを追うためにはこちらにブレークポイントを貼っておくと良い。

set listsize 50 : listのデフォルト表示行数調整
以降、step,nextを使い分けて処理のフローを見ていく
ptype 構造体名
bt : バックトレース
info breakpoint : ブレークポイント一覧
b main
b processCommand
del breakpoint ブレークポイント : 一斉削除
del ブレークポイントID : ブレークポイント削除
info threads : スレッド一覧

My Twitter & RSS

Leave a Reply

Your email address will not be published. Required fields are marked *