Dive Deep Memcached ~ SETコマンド実行時の動作 ~

Redis本を執筆しました。Memcachedとの詳細な比較も付録で取り上げていますので、こちらも是非合わせてご覧ください。

———————————————————————————————————————————————————

Memcached 1.5.14をソースからビルドしてデフォルト設定時、SETコマンド実行時の動作をソースコードで確認します。 https://github.com/memcached/memcached

概要

最初の初期化

  • 1 main at memcached.c
    • 2 sanitycheck at memcached.c
    • 2 settings_init at memcached.c
    • 2 init_lru_maintainer at items.c
    • 2 hash_init at hash.c
    • 2 logger_init at logger.c
      • 3 start_logger_thread at logger.c
      • 3 STATS_LOCK, STATS_UNLOCK at thread.c
    • 2 stats_init at memcached.c
      • 3 stats_prefix_init at stats.c
      • 3 assoc_init at assoc.c
    • 2 conn_init at memcached.c
    • 2 slabs_init at slabs.c
    • 2 memcached_thread_init at thread.c
      • 3 setup_thread at thread.c
        • 4 cq_init at thread.c
        • 4 cache_create at cache.c
      • 3 create_worker at thread.c
    • 2 init_lru_crawler at crawler.c
    • 2 start_assoc_maintenance_thread at assoc.c
    • 2 start_item_crawler_thread at crawler.c
    • 2 start_lru_maintainer_thread at items.c
    • 2 start_slab_maintenance_thread at slabs.c
    • 2 clock_handler at memcached.c
      • 3 assoc_start_expand at assoc.c
    • 2 server_sockets at memcached.c
      • 3 server_socket at memcached.c
        • 4 new_socket at memcached.c
        • 4 conn_new at memcached.c
    • 2 uriencode_init at util.c
    • 2 event_base_loop at memcached.c : main_baseに対するイベント処理のループ

SETコマンドの挙動

  • 1 event_handler at memcached.c
    • 2 drive_machine at memcached.c
      • 3 dispatch_conn_new at thread.c
        • 4 cqi_new at thread.c
        • 4 cq_push at thread.c
      • 3 reset_cmd_handler at memcached.c
        • 4 conn_shrink at memcached.c
        • 4 conn_set_state at memcached.c
      • 3 update_event at memcached.c
      • 3 try_read_network at memcached.c
        • 4 tcp_read at memcached.c
      • 3 try_read_command at memcached.c
        • 4 process_command at memcached.c
          • 5 add_msghdr at memcached.c
          • 5 tokenize_command at memcached.c
          • 5 process_update_command at memcached.c
            • 6 set_noreply_maybe at memcached.c
            • 6 realtime at memcached.c
            • 6 item_alloc at thread.c
              • 7 do_item_alloc at items.c
                • 8 item_make_header at items.c
                • 8 slabs_clsid at slabs.c
                • 8 slabs_alloc at slabs.c
                  • 9 do_slabs_alloc at slabs.c
                    • 10 do_slabs_newslab at slabs.c
                      • 11 grow_slab_list at slabs.c
                      • 11 memory_allocate at slabs.c
                      • 11 split_slab_page_into_freelist at slabs.c
                        • 12 do_slabs_free at slabs.c
      • 3 complete_nread at memcached.c
        • 4 complete_nread_ascii at memcached.c
          • 5 store_item at thread.c
            • 6 hash(MurmurHash3_x86_32)
              • 7 getblock32 at murmur3_hash.c
              • 7 rotl32 at murmur3_hash.c
              • 7 fmix32 at murmur3_hash.c
            • 6 item_lock, item_unlock at thread.c
            • 6 do_store_item at memcached.c
              • 7 do_item_get at items.c
              • 7 do_item_link at items.c
                • 8 get_cas_id at items.c
                • 8 assoc_insert at assoc.c
                • 8 item_link_q at items.c
                  • 9 do_item_link_q at items.c
                • 8 item_stats_sizes_add at items.c
          • 5 out_string at memcached.c
          • 5 item_remove at thread.c
            • 6 do_item_remove at items.c
      • 3 add_iov at memcached.c
        • 4 ensure_iov_space at memcached.c
      • 3 transmit at memcached.c
        • 4 tcp_sendmsg at memcached.c
      • 3 reset_cmd_handler at memcached.c

詳細

最初の初期化

memcached -u ec2-user -vvv コマンドを実行した際の挙動

1 main at memcached.c

6762│ int main (int argc, char **argv) {
6763│     int c;
6764│     bool lock_memory = false;
6765│     bool do_daemonize = false;
6766│     bool preallocate = false;
6767│     int maxcore = 0;
6768│     char *username = NULL;
6769│     char *pid_file = NULL;
6770│     struct passwd *pw;
6771│     struct rlimit rlim;
6772│     char *buf;
6773│     char unit = '\0';
6774│     int size_max = 0;
6775├───> int retval = EXIT_SUCCESS;
6776│     bool protocol_specified = false;
6777│     bool tcp_specified = false;
6778│     bool udp_specified = false;
6779│     bool start_lru_maintainer = true;
6780│     bool start_lru_crawler = true;
6781│     bool start_assoc_maint = true;
6782│     enum hashfunc_type hash_type = MURMUR3_HASH;
6783│     uint32_t tocrawl;
6784│     uint32_t slab_sizes[MAX_NUMBER_OF_SLAB_CLASSES];
6785│     bool use_slab_sizes = false;
6786│     char *slab_sizes_unparsed = NULL;
6787│     bool slab_chunk_size_changed = false;

sanitycheck関数後に、signal関数。その後、settings_init

6929├───> if (!sanitycheck()) {
6930│         return EX_OSERR;
6931│     }
6932│
6933│     /* handle SIGINT, SIGTERM */
6934│     signal(SIGINT, sig_handler);
6935│     signal(SIGTERM, sig_handler);
6936│
6937│     /* init settings */
6938│     settings_init();
6939│ #ifdef EXTSTORE
6940│     settings.ext_item_size = 512;
6941│     settings.ext_item_age = UINT_MAX;

2 sanitycheck at memcached.c

event_get_versionはlibeventの関数。ここでは、"2.0.21-stable"という文字列を取得しeverに格納されている。

6702│ /**
6703│  * Do basic sanity check of the runtime environment
6704│  * @return true if no errors found, false if we can't use this env
6705│  */
6706│ static bool sanitycheck(void) {
6707│     /* One of our biggest problems is old and bogus libevents */
6708├───> const char *ever = event_get_version();
6709│     if (ever != NULL) {
6710│         if (strncmp(ever, "1.", 2) == 0) {
6711│             /* Require at least 1.3 (that's still a couple of years old) */
6712│             if (('0' <= ever[2] && ever[2] < '3') && !isdigit(ever[3])) {
6713│                 fprintf(stderr, "You are using libevent %s.\nPlease upgrade to"
6714│                         " a more recent version (1.3 or newer)\n",
6715│                         event_get_version());
6716│                 return false;
6717│             }
6718│         }
6719│     }
6720│

2 settings_init at memcached.c

パラメータのデフォルト値が設定されている

 247│ static void settings_init(void) {
 248│     settings.use_cas = true;
 249│     settings.access = 0700;
 250│     settings.port = 11211;
 251├───> settings.udpport = 0;
 252│ #ifdef TLS
 253│     settings.ssl_enabled = false;
 254│     settings.ssl_ctx = NULL;
 255│     settings.ssl_chain_cert = NULL;
 256│     settings.ssl_key = NULL;
 257│     settings.ssl_verify_mode = SSL_VERIFY_NONE;
 258│     settings.ssl_keyformat = SSL_FILETYPE_PEM;
 259│     settings.ssl_ciphers = NULL;
 260│     settings.ssl_ca_cert = NULL;
 261│     settings.ssl_last_cert_refresh_time = current_time;
 262│     settings.ssl_wbuf_size = 16 * 1024; // default is 16KB (SSL max frame size is 17KB)
 263│ #endif
 264│     /* By default this string should be NULL for getaddrinfo() */
 265│     settings.inter = NULL;
 266│     settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
 267│     settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
 268│     settings.verbose = 0;
 269│     settings.oldest_live = 0;
 270│     settings.oldest_cas = 0;          /* supplements accuracy of oldest_live */
 271│     settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
 272│     settings.socketpath = NULL;       /* by default, not using a unix socket */
 273│     settings.factor = 1.25;
 274│     settings.chunk_size = 48;         /* space for a modest key and value */
 275│     settings.num_threads = 4;         /* N workers */
 276│     settings.num_threads_per_udp = 0;
 277├───> settings.prefix_delimiter = ':';
 278│     settings.detail_enabled = 0;
 279│     settings.reqs_per_event = 20;
 280│     settings.backlog = 1024;
 281│     settings.binding_protocol = negotiating_prot;
 282│     settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
 283│     settings.slab_page_size = 1024 * 1024; /* chunks are split from 1MB pages. */
 284│     settings.slab_chunk_size_max = settings.slab_page_size / 2;
 285│     settings.sasl = false;
 286│     settings.maxconns_fast = true;
 287│     settings.lru_crawler = false;
 288│     settings.lru_crawler_sleep = 100;
 289│     settings.lru_crawler_tocrawl = 0;
 290│     settings.lru_maintainer_thread = false;
 291│     settings.lru_segmented = true;
 292│     settings.hot_lru_pct = 20;
 293│     settings.warm_lru_pct = 40;
 294│     settings.hot_max_factor = 0.2;
 295│     settings.warm_max_factor = 2.0;
 296│     settings.inline_ascii_response = false;
 297│     settings.temp_lru = false;
 298│     settings.temporary_ttl = 61;
 299│     settings.idle_timeout = 0; /* disabled */
 300│     settings.hashpower_init = 0;
 301│     settings.slab_reassign = true;
 302│     settings.slab_automove = 1;
 303├───> settings.slab_automove_ratio = 0.8;
 304│     settings.slab_automove_window = 30;
 305│     settings.shutdown_command = false;
 306│     settings.tail_repair_time = TAIL_REPAIR_TIME_DEFAULT;
 307│     settings.flush_enabled = true;
 308│     settings.dump_enabled = true;
 309│     settings.crawls_persleep = 1000;
 310│     settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE;
 311│     settings.logger_buf_size = LOGGER_BUF_SIZE;
 312│     settings.drop_privileges = false;
 313│ #ifdef MEMCACHED_DEBUG
 314│     settings.relaxed_privileges = false;
 315│ #endif

1 main at memcached.c

6957│
6958│     /* Run regardless of initializing it later */
6959├───> init_lru_maintainer();
6960│
6961│     /* set stderr non-buffering (for running under, say, daemontools) */
6962│     setbuf(stderr, NULL);
6963│
6964│     char *shortopts =
6965│    	"a:"  /* access mask for unix socket */
6966│           "A"  /* enable admin shutdown command */
6967│           "Z"   /* enable SSL */
6968│           "p:"  /* TCP port number to listen on */
6969│           "s:"  /* unix socket path to listen on */
6970│           "U:"  /* UDP port number to listen on */
6971│           "m:"  /* max memory to use for items in megabytes */

2 init_lru_maintainer at items.c

1704│ int init_lru_maintainer(void) {
1705├───> if (lru_maintainer_initialized == 0) {
1706│         pthread_mutex_init(&lru_maintainer_lock, NULL);
1707│         lru_maintainer_initialized = 1;
1708│     }
1709│     return 0;
1710│ }

1 main at memcached.c

6998│     /* process arguments */
6999│ #ifdef HAVE_GETOPT_LONG
7000├───> const struct option longopts[] = {
7001│         {"unix-mask", required_argument, 0, 'a'},
7002│         {"enable-shutdown", no_argument, 0, 'A'},
7003│         {"enable-ssl", no_argument, 0, 'Z'},
7004│         {"port", required_argument, 0, 'p'},
7005│         {"unix-socket", required_argument, 0, 's'},
7006│         {"udp-port", required_argument, 0, 'U'},
7007│         {"memory-limit", required_argument, 0, 'm'},
7008│         {"disable-evictions", no_argument, 0, 'M'},
7009│         {"conn-limit", required_argument, 0, 'c'},
7010│         {"lock-memory", no_argument, 0, 'k'},
7011│         {"help", no_argument, 0, 'h'},
7012│         {"license", no_argument, 0, 'i'},
:
:
7028│         {"max-item-size", required_argument, 0, 'I'},
7029│         {"enable-sasl", no_argument, 0, 'S'},
7030│         {"disable-flush-all", no_argument, 0, 'F'},
7031│         {"disable-dumping", no_argument, 0, 'X'},
7032│         {"extended", required_argument, 0, 'o'},
7033│         {0, 0, 0, 0}
7034│     };
7035│     int optindex;
7036│     while (-1 != (c = getopt_long(argc, argv, shortopts,
7037│                     longopts, &optindex))) {
7038│ #else
7039│     while (-1 != (c = getopt(argc, argv, shortopts))) {
7040│ #endif
7041├───────> switch (c) {
7042│         case 'A':
7043│             /* enables "shutdown" command */
7044│             settings.shutdown_command = true;
7045│             break;
7046│         case 'Z':
7047│             /* enable secure communication*/
7048│ #ifdef TLS
7049│             settings.ssl_enabled = true;
7050│ #else
7051│             fprintf(stderr, "This server is not built with TLS support.\n");
7052│             exit(EX_USAGE);
7053│ #endif

-u ec2-user のようにユーザー名を与えているので、username 変数に"ec2-user"が設定される。

7119│         case 'r':
7120│             maxcore = 1;
7121│             break;
7122│         case 'R':
7123│       	  settings.reqs_per_event = atoi(optarg);
7124│             if (settings.reqs_per_event == 0) {
7125│                 fprintf(stderr, "Number of requests per event must be greater than 0\n");
7126│                 return 1;
7127│      	  }
7128│             break;
7129│         case 'u':
7130├───────────> username = optarg;
7131│             break;
7132│         case 'P':
7133│             pid_file = optarg;
7134│             break;
7135│         case 'f':
7136│             settings.factor = atof(optarg);
7137│             if (settings.factor <= 1.0) {
7138│                 fprintf(stderr, "Factor must be greater than 1\n");
7139│                 return 1;
7140│             }
7141│             break;

-vvv と指定しているので、getopt_longの取得結果のvの処理はwhile文で3回繰り返される。

7083│         case 'h':
7084│             usage();
7085│             exit(EXIT_SUCCESS);
7086│         case 'i':
7087│             usage_license();
7088│             exit(EXIT_SUCCESS);
7089│         case 'V':
7090│             printf(PACKAGE " " VERSION "\n");
7091│             exit(EXIT_SUCCESS);
7092│         case 'k':
7093│      	  lock_memory = true;
7094│             break;
7095│         case 'v':
7096├───────────> settings.verbose++;
7097│             break;
7098│         case 'l':
7099│             if (settings.inter != NULL) {
7100│                 if (strstr(settings.inter, optarg) != NULL) {
7101│                     break;
7102│                 }
7103│                 size_t len = strlen(settings.inter) + strlen(optarg) + 2;
7104│                 char *p = malloc(len);
7105│                 if (p == NULL) {
7106│                     fprintf(stderr, "Failed to allocate memory\n");
7107│                     return 1;
7108│                 }

その後、パラメータの値が、制限値外のものが指定されていたらエラーで終了する。

7746│     if (settings.item_size_max < 1024) {
7747│         fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n");
7748│         exit(EX_USAGE);
7749│     }
7750│     if (settings.item_size_max > (settings.maxbytes / 2)) {
7751│         fprintf(stderr, "Cannot set item size limit higher than 1/2 of memory max.\n");
7752│         exit(EX_USAGE);
7753│     }
7754│     if (settings.item_size_max > (1024 * 1024 * 1024)) {
7755│         fprintf(stderr, "Cannot set item size limit higher than a gigabyte.\n");
7756│         exit(EX_USAGE);
7757│     }
7758├───> if (settings.item_size_max > 1024 * 1024) {
7759│         if (!slab_chunk_size_changed) {
7760│             // Ideal new default is 16k, but needs stitching.
7761│             settings.slab_chunk_size_max = settings.slab_page_size / 2;
7762│         }
7763│     }
7764│
7765│     if (settings.slab_chunk_size_max > settings.item_size_max) {
7766│         fprintf(stderr, "slab_chunk_max (bytes: %d) cannot be larger than -I (item_size_max %d)\n",
7767│                 settings.slab_chunk_size_max, settings.item_size_max);
7768│         exit(EX_USAGE);
7769│     }
7770│
7771├───> if (settings.item_size_max % settings.slab_chunk_size_max != 0) {
7772│         fprintf(stderr, "-I (item_size_max: %d) must be evenly divisible by slab_chunk_max (bytes: %d)\n",
7773│                 settings.item_size_max, settings.slab_chunk_size_max);
7774│         exit(EX_USAGE);
7775│     }
7776│
7777│     if (settings.slab_page_size % settings.slab_chunk_size_max != 0) {
7778│         fprintf(stderr, "slab_chunk_max (bytes: %d) must divide evenly into %d (slab_page_size)\n",
7779│                 settings.slab_chunk_size_max, settings.slab_page_size);
7780│         exit(EX_USAGE);
7781│     }
:
:
7811│     if (slab_sizes_unparsed != NULL) {
7812│         if (_parse_slab_sizes(slab_sizes_unparsed, slab_sizes)) {
7813│             use_slab_sizes = true;
7814│         } else {
7815│      	  exit(EX_USAGE);
7816│         }
7817│     }
7818│
7819│     if (settings.hot_lru_pct + settings.warm_lru_pct > 80) {
7820│         fprintf(stderr, "hot_lru_pct + warm_lru_pct cannot be more than 80%% combined\n");
7821│         exit(EX_USAGE);
7822│     }
7823│
7824├───> if (settings.temp_lru && !start_lru_maintainer) {
7825│         fprintf(stderr, "temporary_ttl requires lru_maintainer to be enabled\n");
7826│         exit(EX_USAGE);
7827│     }
7828│
7829│     if (hash_init(hash_type) != 0) {
7830│         fprintf(stderr, "Failed to initialize hash_algorithm!\n");
7831│         exit(EX_USAGE);
7832│     }

hash_init関数が実行されている

2 hash_init at hash.c

typeでMURMUR3_HASHが指定されている。

 7│ int hash_init(enum hashfunc_type type) {
 8├───> switch(type) {
 9│         case JENKINS_HASH:
10│             hash = jenkins_hash;
11│             settings.hash_algorithm = "jenkins";
12│             break;
13│         case MURMUR3_HASH:
14│             hash = MurmurHash3_x86_32;
15│             settings.hash_algorithm = "murmur3";
16│             break;
17│         default:
18│             return -1;
19│     }
20│     return 0;
21│ }

1 main at memcacached.c

必要に応じて多くのコネクション数を許可するためにgetrlimit

7903│     /*
7904│      * If needed, increase rlimits to allow as many connections
7905│      * as needed.
7906│      */
7907│
7908├───> if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7909│         fprintf(stderr, "failed to getrlimit number of files\n");
7910│         exit(EX_OSERR);
7911│     } else {
7912│         rlim.rlim_cur = settings.maxconns;
7913│         rlim.rlim_max = settings.maxconns;
7914│         if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7915│             fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller
7916│             exit(EX_OSERR);
7917│         }
7918│     }

mainスレッドのlibeventインスタンスの初期化

7970│     /* initialize main thread libevent instance */
7971│ #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
7972│     /* If libevent version is larger/equal to 2.0.2-alpha, use newer version */
7973│     struct event_config *ev_config;
7974├───> ev_config = event_config_new();
7975│     event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
7976│     main_base = event_base_new_with_config(ev_config);
7977│     event_config_free(ev_config);
7978│ #else
7979│     /* Otherwise, use older API */
7980│     main_base = event_init();
7981│ #endif

各種スレッド等の初期化

7983│     /* initialize other stuff */
7984├───> logger_init();
7985│     stats_init();
7986│     assoc_init(settings.hashpower_init);
7987│     conn_init();
7988│     slabs_init(settings.maxbytes, settings.factor, preallocate,
7989│             use_slab_sizes ? slab_sizes : NULL);

2 logger_init at logger.c

563│ /*************************
564│  * Public functions for submitting logs and starting loggers from workers.
565│  *************************/
566│
567│ /* Global logger thread start/init */
568│ void logger_init(void) {
569│     /* TODO: auto destructor when threads exit */
570│     /* TODO: error handling */
571│
572│     /* init stack for iterating loggers */
573│     logger_stack_head = 0;
574│     logger_stack_tail = 0;
575├───> pthread_key_create(&logger_key, NULL);
576│
577│     if (start_logger_thread() != 0) {
578│         abort();
579│     }
580│
581│     /* This can be removed once the global stats initializer is improved */
582│     STATS_LOCK();
583│     stats.log_worker_dropped = 0;
584│     stats.log_worker_written = 0;
585│     stats.log_watcher_skipped = 0;
586│     stats.log_watcher_sent = 0;
587│     STATS_UNLOCK();
588│     /* This is what adding a STDERR watcher looks like. should replace old
589│      * "verbose" settings. */
590│     //logger_add_watcher(NULL, 0);
591│     return;
592│ }

3 start_logger_thread at logger.c

545│ static int start_logger_thread(void) {
546│     int ret;
547├───> do_run_logger_thread = 1;
548│     if ((ret = pthread_create(&logger_tid, NULL,
549│                               logger_thread, NULL)) != 0) {
550│         fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
551│         return -1;
552│     }
553│     return 0;
554│ }

3 STATS_LOCK, STATS_UNLOCK at thread.c

696│ /******************************* GLOBAL STATS ******************************/
697│
698│ void STATS_LOCK() {
699│     pthread_mutex_lock(&stats_lock);
700│ }
701│
702│ void STATS_UNLOCK() {
703├───> pthread_mutex_unlock(&stats_lock);
704│ }

2 stats_init at memcached.c

225│ static void stats_init(void) {
226├───> memset(&stats, 0, sizeof(struct stats));
227│     memset(&stats_state, 0, sizeof(struct stats_state));
228│     stats_state.accepting_conns = true; /* assuming we start in this state. */
229│
230│     /* make the time we started always be 2 seconds before we really
231│        did, so time(0) - time.started is never zero.  if so, things
232│        like 'settings.oldest_live' which act as booleans as well as
233│        values are now false in boolean context... */
234│     process_started = time(0) - ITEM_UPDATE_INTERVAL - 2;
235│     stats_prefix_init();
236│ }

3 stats_prefix_init at stats.c

38│ void stats_prefix_init() {
39├───> memset(prefix_stats, 0, sizeof(prefix_stats));
40│ }

3 assoc_init at assoc.c

59│ void assoc_init(const int hashtable_init) {
60├───> if (hashtable_init) {
61│         hashpower = hashtable_init;
62│     }
63│     primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
64│     if (! primary_hashtable) {
65│         fprintf(stderr, "Failed to init hashtable.\n");
66│         exit(EXIT_FAILURE);
67│     }
68│     STATS_LOCK();
69│     stats_state.hash_power_level = hashpower;
70│     stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
71│     STATS_UNLOCK();
72│ }

2 conn_init at memcached.c

448│ /*
449│  * Initializes the connections array. We don't actually allocate connection
450│  * structures until they're needed, so as to avoid wasting memory when the
451│  * maximum connection count is much higher than the actual number of
452│  * connections.
453│  *
454│  * This does end up wasting a few pointers' worth of memory for FDs that are
455│  * used for things other than connections, but that's worth it in exchange for
456│  * being able to directly index the conns array by FD.
457│  */
458│ static void conn_init(void) {
459│     /* We're unlikely to see an FD much higher than maxconns. */
460│     int next_fd = dup(1);
461├───> int headroom = 10;	  /* account for extra unexpected open FDs */
462│     struct rlimit rl;
463│
464│     max_fds = settings.maxconns + headroom + next_fd;
465│
466│     /* But if possible, get the actual highest FD we can possibly ever see. */
467│     if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
468│         max_fds = rl.rlim_max;
469│     } else {
470│         fprintf(stderr, "Failed to query maximum file descriptor; "
471│                         "falling back to maxconns\n");
472│     }
473│

2 slabs_init (limit=67108864, factor=1.25, prealloc=false, slab_sizes=0x0) at slabs.c:158

Slabの初期化 ★5

 153│ /**
 154│  * Determines the chunk sizes and initializes the slab class descriptors
 155│  * accordingly.
 156│  */
 157│ void slabs_init(const size_t limit, const double factor, const bool prealloc, const uint32_t *slab_sizes) {
 158│     int i = POWER_SMALLEST - 1;
 159│     unsigned int size = sizeof(item) + settings.chunk_size;
 160│
 161│     /* Some platforms use runtime transparent hugepages. If for any reason
 162│      * the initial allocation fails, the required settings do not persist
 163│      * for remaining allocations. As such it makes little sense to do slab
 164│      * preallocation. */
 165├───> bool __attribute__ ((unused)) do_slab_prealloc = false;
 166│
 167│     mem_limit = limit;
 168│
 169│     if (prealloc) {
 170│ #if defined(__linux__) && defined(MADV_HUGEPAGE)
 171│         mem_base = alloc_large_chunk_linux(mem_limit);
 172│         if (mem_base)
 173│             do_slab_prealloc = true;
 174│ #else
 175│         /* Allocate everything in a big chunk with malloc */
 176│         mem_base = malloc(mem_limit);
 177│         do_slab_prealloc = true;

slabclass配列の初期化

 190│     while (++i < MAX_NUMBER_OF_SLAB_CLASSES-1) {
 191│         if (slab_sizes != NULL) {
 192│             if (slab_sizes[i-1] == 0)
 193│                 break;
 194│             size = slab_sizes[i-1];
 195│         } else if (size >= settings.slab_chunk_size_max / factor) {
 196│             break;
 197│         }
 198│         /* Make sure items are always n-byte aligned */
 199│         if (size % CHUNK_ALIGN_BYTES)
 200│             size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
 201│
 202├───────> slabclass[i].size = size;
 203│         slabclass[i].perslab = settings.slab_page_size / slabclass[i].size;
 204│         if (slab_sizes == NULL)
 205│             size *= factor;
 206│         if (settings.verbose > 1) {
 207│             fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
 208│                     i, slabclass[i].size, slabclass[i].perslab);
 209│         }
 210│     }
 220│     /* for the test suite:  faking of how much we've already malloc'd */
 221│     {
 222│         char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
 223│         if (t_initial_malloc) {
 224│             mem_malloced = (size_t)atol(t_initial_malloc);
 225│         }
 226│
 227│     }
 228│
 229│     if (prealloc && do_slab_prealloc) {
 230│         slabs_preallocate(power_largest);
 231│     }
 232├>}

1 main at memcacached.c

workerスレッドの設定

8025│     /* start up worker threads if MT mode */
8026│ #ifdef EXTSTORE
8027│     slabs_set_storage(storage);
8028│     memcached_thread_init(settings.num_threads, storage);
8029│     init_lru_crawler(storage);
8030│ #else
8031├───> memcached_thread_init(settings.num_threads, NULL);
8032│     init_lru_crawler(NULL);
8033│ #endif
8034│
8035│     if (start_assoc_maint && start_assoc_maintenance_thread() == -1) {
8036│         exit(EXIT_FAILURE);
8037│     }
8038│     if (start_lru_crawler && start_item_crawler_thread() != 0) {
8039│         fprintf(stderr, "Failed to enable LRU crawler thread\n");
8040│         exit(EXIT_FAILURE);
8041│     }
8042│ #ifdef EXTSTORE
8043│     if (storage && start_storage_compact_thread(storage) != 0) {
8044│         fprintf(stderr, "Failed to start storage compaction thread\n");
8045│         exit(EXIT_FAILURE);
8046│     }
8047│     if (storage && start_storage_write_thread(storage) != 0) {
8048│         fprintf(stderr, "Failed to start storage writer thread\n");
8049│         exit(EXIT_FAILURE);
8050│     }
8051│
8052│     if (start_lru_maintainer && start_lru_maintainer_thread(storage) != 0) {
8053│ #else
8054├───> if (start_lru_maintainer && start_lru_maintainer_thread(NULL) != 0) {
8055│ #endif
8056│         fprintf(stderr, "Failed to enable LRU maintainer thread\n");
8057│         return 1;
8058│     }
8059│
8060│     if (settings.slab_reassign &&
8061├───────> start_slab_maintenance_thread() == -1) {
8062│         exit(EXIT_FAILURE);
8063│     }
8064│
8065│     if (settings.idle_timeout && start_conn_timeout_thread() == -1) {
8066│         exit(EXIT_FAILURE);
8067│     }
8068│

2 memcached_thread_init at thread.c

772│ /*
773│  * Initializes the thread subsystem, creating various worker threads.
774│  *
775│  * nthreads  Number of worker event handler threads to spawn
776│  */
777│ void memcached_thread_init(int nthreads, void *arg) {
778│     int         i;
779│     int         power;
780│
781│     for (i = 0; i < POWER_LARGEST; i++) {
782├───────> pthread_mutex_init(&lru_locks[i], NULL);
783│     }
784│     pthread_mutex_init(&worker_hang_lock, NULL);
785│
786│     pthread_mutex_init(&init_lock, NULL);
787│     pthread_cond_init(&init_cond, NULL);
788│
789│     pthread_mutex_init(&cqi_freelist_lock, NULL);
790│     cqi_freelist = NULL;
791│
792│     /* Want a wide lock table, but don't waste memory */
793│     if (nthreads < 3) {
794│         power = 10;
815│     item_lock_count = hashsize(power);
816│     item_lock_hashpower = power;
817│
818│     item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
819│     if (! item_locks) {
820│         perror("Can't allocate item locks");
821│         exit(1);
822│     }
823│     for (i = 0; i < item_lock_count; i++) {
824│         pthread_mutex_init(&item_locks[i], NULL);
825│     }
826│
827├───> threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
828│     if (! threads) {
829│         perror("Can't allocate thread descriptors");
830│         exit(1);
831│     }
832│
833│     for (i = 0; i < nthreads; i++) {
834│         int fds[2];
835│         if (pipe(fds)) {
836│             perror("Can't create notify pipe");
837│             exit(1);
838│         }
839│
:
:
833│     for (i = 0; i < nthreads; i++) {
834│         int fds[2];
835│         if (pipe(fds)) {
836│             perror("Can't create notify pipe");
837│             exit(1);
838│         }
839│
840│         threads[i].notify_receive_fd = fds[0];
841│         threads[i].notify_send_fd = fds[1];
842│ #ifdef EXTSTORE
843│         threads[i].storage = arg;
844│ #endif
845├───────> setup_thread(&threads[i]);
846│         /* Reserve three fds for the libevent base, and two for the pipe */
847│         stats_state.reserved_fds += 5;
848│     }
849│
850│     /* Create threads after we've done all the libevent setup. */
851│     for (i = 0; i < nthreads; i++) {
852│         create_worker(worker_libevent, &threads[i]);
853│     }
854│
855│     /* Wait for all the threads to set themselves up before returning. */
856│     pthread_mutex_lock(&init_lock);
857│     wait_for_thread_registration(nthreads);
858│     pthread_mutex_unlock(&init_lock);
859│ }

3 setup_thread (me=0x6486f0) at thread.c:321

313│ /****************************** LIBEVENT THREADS *****************************/
314│
315│ /*
316│  * Set up a thread's information.
317│  */
318│ static void setup_thread(LIBEVENT_THREAD *me) {
319│ #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
320│     struct event_config *ev_config;
321│     ev_config = event_config_new();
322│     event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
323│     me->base = event_base_new_with_config(ev_config);
324├───> event_config_free(ev_config);
325│ #else
326│     me->base = event_init();
327│ #endif
328│
329│     if (! me->base) {
330│         fprintf(stderr, "Can't allocate event base\n");
331│         exit(1);
332│     }
333│
334│     /* Listen for notifications from other threads */
335│     event_set(&me->notify_event, me->notify_receive_fd,
336│               EV_READ | EV_PERSIST, thread_libevent_process, me);
337│     event_base_set(me->base, &me->notify_event);
338│
339│     if (event_add(&me->notify_event, 0) == -1) {
340│         fprintf(stderr, "Can't monitor libevent notify pipe\n");
341│         exit(1);
342│     }
343│
344│     me->new_conn_queue = malloc(sizeof(struct conn_queue));
345├───> if (me->new_conn_queue == NULL) {
346│         perror("Failed to allocate memory for connection queue");
347│         exit(EXIT_FAILURE);
348│     }
349│     cq_init(me->new_conn_queue);
350│
351│     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
352│         perror("Failed to initialize mutex");
353│         exit(EXIT_FAILURE);
354│     }
355│
356│     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
357│                                     NULL, NULL);

4 cq_init (cq=0x64fab0) at thread.c:196

192│ /*
193│  * Initializes a connection queue.
194│  */
195│ static void cq_init(CQ *cq) {
196├───> pthread_mutex_init(&cq->lock, NULL);
197│     cq->head = NULL;
198│     cq->tail = NULL;
199│ }

4 cache_create (name=0x42b442 "suffix", bufsize=50, align=8, constructor=0x0, destructor=0x0) at cache.c:22

 19│ cache_t* cache_create(const char *name, size_t bufsize, size_t align,
 20│                       cache_constructor_t* constructor,
 21│                       cache_destructor_t* destructor) {
 22│     cache_t* ret = calloc(1, sizeof(cache_t));
 23│     char* nm = strdup(name);
 24│     void** ptr = calloc(initial_pool_size, sizeof(void*));
 25├───> if (ret == NULL || nm == NULL || ptr == NULL ||
 26│         pthread_mutex_init(&ret->mutex, NULL) == -1) {
 27│         free(ret);
 28│         free(nm);
 29│         free(ptr);
 30│         return NULL;
 31│     }
 32│
 33│     ret->name = nm;
 34│     ret->ptr = ptr;
 35│     ret->freetotal = initial_pool_size;
 36│     ret->constructor = constructor;
 37│     ret->destructor = destructor;
  38│
 39│ #ifndef NDEBUG
 40│     ret->bufsize = bufsize + 2 * sizeof(redzone_pattern);
 41│ #else
 42│     ret->bufsize = bufsize;
 43│ #endif
 44│
 45├───> return ret;
 46│ }

3 create_worker (func=0x41dddf <worker_libevent>, arg=0x6486f0) at thread.c:296

289│ /*
290│  * Creates a worker thread.
291│  */
292│ static void create_worker(void *(*func)(void *), void *arg) {
293│     pthread_attr_t  attr;
294│     int             ret;
295│
296├───> pthread_attr_init(&attr);
297│
298│     if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
299│         fprintf(stderr, "Can't create thread: %s\n",
300│                 strerror(ret));
301│         exit(1);
302│     }
303│ }

2 init_lru_crawler (arg=0x0) at crawler.c:686

685│ int init_lru_crawler(void *arg) {
686│     if (lru_crawler_initialized == 0) {
687│ #ifdef EXTSTORE
688│         storage = arg;
689│ #endif
690├───────> if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
691│       	 fprintf(stderr, "Can't initialize lru crawler condition\n");
692│             return -1;
693│         }
694│         pthread_mutex_init(&lru_crawler_lock, NULL);
695│         active_crawler_mod.c.c = NULL;
696│         active_crawler_mod.mod = NULL;
697│         active_crawler_mod.data = NULL;
698│         lru_crawler_initialized = 1;
699│     }
700│     return 0;
701│ }

2 start_assoc_maintenance_thread () at assoc.c:270

268│ int start_assoc_maintenance_thread() {
269│     int ret;
270│     char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
271│     if (env != NULL) {
272│         hash_bulk_move = atoi(env);
273│         if (hash_bulk_move == 0) {
274│             hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
275│         }
276│     }
277├───> pthread_mutex_init(&maintenance_lock, NULL);
278│     if ((ret = pthread_create(&maintenance_tid, NULL,
279│                               assoc_maintenance_thread, NULL)) != 0) {
280│         fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
281│         return -1;
282│     }
283│     return 0;
284│ }

2 start_item_crawler_thread () at crawler.c:497

484│  * caller locks mtx. caller spawns thread.
485│  * thread blocks on mutex.
486│  * caller waits on condition, releases lock.
487│  * thread gets lock, sends signal.
488│  * caller can't wait, as thread has lock.
489│  * thread waits on condition, releases lock
490│  * caller wakes on condition, gets lock.
491│  * caller immediately releases lock.
492│  * thread is now safely waiting on condition before the caller returns.
493│  */
494│ int start_item_crawler_thread(void) {
495│     int ret;
496│
497├───> if (settings.lru_crawler)
498│         return -1;
499│     pthread_mutex_lock(&lru_crawler_lock);
500│     do_run_lru_crawler_thread = 1;
501│     if ((ret = pthread_create(&item_crawler_tid, NULL,
502│         item_crawler_thread, NULL)) != 0) {
503│         fprintf(stderr, "Can't create LRU crawler thread: %s\n",
504│             strerror(ret));
505│         pthread_mutex_unlock(&lru_crawler_lock);
506│         return -1;
507│     }
508│     /* Avoid returning until the crawler has actually started */
509│     pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
510│     pthread_mutex_unlock(&lru_crawler_lock);
511│
512│     return 0;
513│ }

2 start_lru_maintainer_thread (arg=0x0) at items.c:1680

1677│ int start_lru_maintainer_thread(void *arg) {
1678│     int ret;
1679│
1680│     pthread_mutex_lock(&lru_maintainer_lock);
1681├───> do_run_lru_maintainer_thread = 1;
1682│     settings.lru_maintainer_thread = true;
1683│     if ((ret = pthread_create(&lru_maintainer_tid, NULL,
1684│         lru_maintainer_thread, arg)) != 0) {
1685│         fprintf(stderr, "Can't create LRU maintainer thread: %s\n",
1686│             strerror(ret));
1687│         pthread_mutex_unlock(&lru_maintainer_lock);
1688│         return -1;
1689│     }
1690│     pthread_mutex_unlock(&lru_maintainer_lock);
1691│
1692│     return 0;
1693│ }

2 start_slab_maintenance_thread () at slabs.c:1311

1309│ int start_slab_maintenance_thread(void) {
1310│     int ret;
1311│     slab_rebalance_signal = 0;
1312│     slab_rebal.slab_start = NULL;
1313│     char *env = getenv("MEMCACHED_SLAB_BULK_CHECK");
1314│     if (env != NULL) {
1315│         slab_bulk_check = atoi(env);
1316│         if (slab_bulk_check == 0) {
1317│      	  slab_bulk_check = DEFAULT_SLAB_BULK_CHECK;
1318│         }
1319│     }
1320│
1321├───> if (pthread_cond_init(&slab_rebalance_cond, NULL) != 0) {
1322│         fprintf(stderr, "Can't initialize rebalance condition\n");
1323│         return -1;
1324│     }
1325│     pthread_mutex_init(&slabs_rebalance_lock, NULL);
1326│
1327│     if ((ret = pthread_create(&rebalance_tid, NULL,
1328│                               slab_rebalance_thread, NULL)) != 0) {
1329│         fprintf(stderr, "Can't create rebal thread: %s\n", strerror(ret));
1330│         return -1;
1331│     }
1332│     return 0;
1333│ }

2 clock_handler (fd=0, which=0, arg=0x0) at memcached.c:6321

6316│ /* libevent uses a monotonic clock when available for event scheduling. Aside
6317│  * from jitter, simply ticking our internal timer here is accurate enough.
6318│  * Note that users who are setting explicit dates for expiration times *must*
6319│  * ensure their clocks are correct before starting memcached. */
6320│ static void clock_handler(const int fd, const short which, void *arg) {
6321├───> struct timeval t = {.tv_sec = 1, .tv_usec = 0};
6322│     static bool initialized = false;
6323│ #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
6324│     static bool monotonic = false;
6325│     static time_t monotonic_start;
6326│ #endif
6327│
6328│     if (initialized) {
6329│         /* only delete the event if it's actually there. */
6330│         evtimer_del(&clockevent);
6331│     } else {
6332│         initialized = true;
6333│         /* process_started is initialized to time() - 2. We initialize to 1 so
6334│          * flush_all won't underflow during tests. */
6335│ #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
6336│         struct timespec ts;
6337│         if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
6338│             monotonic = true;
6339│             monotonic_start = ts.tv_sec - ITEM_UPDATE_INTERVAL - 2;
6340│         }
6341│ #endif
6342│     }
6343│
6344│     // While we're here, check for hash table expansion.
6345│     // This function should be quick to avoid delaying the timer.
6346├───> assoc_start_expand(stats_state.curr_items);
6347│
6348│     evtimer_set(&clockevent, clock_handler, 0);
6349│     event_base_set(main_base, &clockevent);
6350│     evtimer_add(&clockevent, &t);
6351│
6352│ #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
6353│     if (monotonic) {
6354│         struct timespec ts;
6355│         if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
6356│             return;
6357│         current_time = (rel_time_t) (ts.tv_sec - monotonic_start);
6358│         return;
6359│     }
6360│ #endif
6361│     {
6362│         struct timeval tv;
6363│         gettimeofday(&tv, NULL);
6364│         current_time = (rel_time_t) (tv.tv_sec - process_started);
6365│     }
6366│ }

6358行目で関数がreturnされる。

3 assoc_start_expand (curr_items=0) at assoc.c:144

143│ void assoc_start_expand(uint64_t curr_items) {
144├───> if (started_expanding)
145│         return;
146│
147│     if (curr_items > (hashsize(hashpower) * 3) / 2 &&
148│           hashpower < HASHPOWER_MAX) {
149│         started_expanding = true;
150│         pthread_cond_signal(&maintenance_cond);
151│     }
152│ }

1 main at memcached.c

8072│     /* create unix mode sockets after dropping privileges */
8073│     if (settings.socketpath != NULL) {
8074│         errno = 0;
8075│         if (server_socket_unix(settings.socketpath,settings.access)) {
8076│             vperror("failed to listen on UNIX socket: %s", settings.socketpath);
8077│             exit(EX_OSERR);
8078│         }
8079│     }
8080│
8081│     /* create the listening socket, bind it, and init */
8082├───> if (settings.socketpath == NULL) {
8083│         const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
8084│         char *temp_portnumber_filename = NULL;
8085│         size_t len;
8086│         FILE *portnumber_file = NULL;
8087│
8088│         if (portnumber_filename != NULL) {
8089│             len = strlen(portnumber_filename)+4+1;
8090│             temp_portnumber_filename = malloc(len);
8091│             snprintf(temp_portnumber_filename,
8092│                      len,
8093│                      "%s.lck", portnumber_filename);
8094│
8095│             portnumber_file = fopen(temp_portnumber_filename, "a");
8096│             if (portnumber_file == NULL) {
8097│                 fprintf(stderr, "Failed to open \"%s\": %s\n",
8098│                         temp_portnumber_filename, strerror(errno));
8099│             }
8100│         }
8101│
8102├───────> errno = 0;
8103│         if (settings.port && server_sockets(settings.port, tcp_transport,
8104│                                            portnumber_file)) {
8105│             vperror("failed to listen on TCP port %d", settings.port);
8106│             exit(EX_OSERR);
8107│         }
8108│
8109│         /*
8110│          * initialization order: first create the listening sockets
8111│          * (may need root on low ports), then drop root if needed,
8112│          * then daemonize if needed, then init libevent (in some cases
8113│          * descriptors created by libevent wouldn't survive forking).
8114│          */
8115│
8116│         /* create the UDP listening socket and bind it */
8117│         errno = 0;
8118├───────> if (settings.udpport && server_sockets(settings.udpport, udp_transport,
8119│                                               portnumber_file)) {
8120│             vperror("failed to listen on UDP port %d", settings.udpport);
8121│             exit(EX_OSERR);
8122│         }
8123│
8124│         if (portnumber_file) {
8125│             fclose(portnumber_file);
8126│      	  rename(temp_portnumber_filename, portnumber_filename);
8127│         }
8128│         if (temp_portnumber_filename)
8129│             free(temp_portnumber_filename);
8130│     }

2 server_sockets (port=11211, transport=tcp_transport, portnumber_file=0x0) at memcached.c:6146

6144│ static int server_sockets(int port, enum network_transport transport,
6145│                           FILE *portnumber_file) {
6146├───> bool ssl_enabled = false;
6147│
6148│ #ifdef TLS
6149│     const char *notls = "notls";
6150│     ssl_enabled = settings.ssl_enabled;
6151│ #endif
6152│
6153│     if (settings.inter == NULL) {
6154│         return server_socket(settings.inter, port, transport, portnumber_file, ssl_enabled);
6155│     } else {
6156│         // tokenize them and bind to each one of them..
6157│         char *b;
6158│         int ret = 0;

3 server_socket (interface=0x0, port=11211, transport=tcp_transport, portnumber_file=0x0, ssl_enabled=false) at memcached.c:5998

5988│  * @param transport the transport protocol (TCP / UDP)
5989│  * @param portnumber_file A filepointer to write the port numbers to
5990│  *        when they are successfully added to the list of ports we
5991│  *        listen on.
5992│  */
5993│ static int server_socket(const char *interface,
5994│                          int port,
5995│                          enum network_transport transport,
5996│                          FILE *portnumber_file, bool ssl_enabled) {
5997│     int sfd;
5998│     struct linger ling = {0, 0};
5999│     struct addrinfo *ai;
6000│     struct addrinfo *next;
6001├───> struct addrinfo hints = { .ai_flags = AI_PASSIVE,
6002│                               .ai_family = AF_UNSPEC };
6003│     char port_buf[NI_MAXSERV];
6004│     int error;
6005│     int success = 0;
6006│     int flags =1;
6007│
6008│     hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
6009│
6010│     if (port == -1) {
6011│         port = 0;
6012│     }
6013│     snprintf(port_buf, sizeof(port_buf), "%d", port);
6014│     error= getaddrinfo(interface, port_buf, &hints, &ai);
6015│     if (error != 0) {
6016│         if (error != EAI_SYSTEM)
6017│           fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
6018│         else
6019│           perror("getaddrinfo()");
6020│         return 1;
6021│     }
6022│
6023├───> for (next= ai; next; next= next->ai_next) {
6024│         conn *listen_conn_add;
6025│         if ((sfd = new_socket(next)) == -1) {
6026│             /* getaddrinfo can return "junk" addresses,
6027│              * we make sure at least one works before erroring.
6028│              */
6029│             if (errno == EMFILE) {
6030│                 /* ...unless we're out of fds */
6031│                 perror("server_socket");
6032│                 exit(EX_OSERR);
6033│             }
6034│             continue;
6035│         }

4 new_socket (ai=0x6520f0) at memcached.c:5936

5932│ static int new_socket(struct addrinfo *ai) {
5933│     int sfd;
5934│     int flags;
5935│
5936├───> if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
5937│         return -1;
5938│     }
5939│
5940│     if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
5941│         fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
5942│         perror("setting O_NONBLOCK");
5943│         close(sfd);
5944│         return -1;
5945│     }
5946│     return sfd;
5947│ }

3

ソケットオプションの設定

6048│         setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
6049│         if (IS_UDP(transport)) {
6050│             maximize_sndbuf(sfd);
6051│         } else {
6052├───────────> error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
6053│             if (error != 0)
6054│                 perror("setsockopt");
6055│
6056│             error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
6057│             if (error != 0)
6058│                 perror("setsockopt");
6059│
6060│             error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
6061│             if (error != 0)
6062│                 perror("setsockopt");
6063│         }
6064│

bind処理

6065│         if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
6066│             if (errno != EADDRINUSE) {
6067│                 perror("bind()");
6068│                 close(sfd);
6069│                 freeaddrinfo(ai);
6070│                 return 1;
6071│             }
6072│             close(sfd);
6073│             continue;
6074│         } else {
6075├───────────> success++;
6076│             if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
6077│                 perror("listen()");
6078│                 close(sfd);
6079│                 freeaddrinfo(ai);
6080│                 return 1;
6081│             }
6082│             if (portnumber_file != NULL &&
6083│                 (next->ai_addr->sa_family == AF_INET ||
6084│                  next->ai_addr->sa_family == AF_INET6)) {
6085│                 union {
6086│                     struct sockaddr_in in;
6087│                     struct sockaddr_in6 in6;

conn_new関数が呼ばれる

6109│                  * this allows "stats conns" to separately list multiple
6110│                  * parallel UDP requests in progress.
6111│                  *
6112│                  * The dispatch code round-robins new connection requests
6113│                  * among threads, so this is guaranteed to assign one
6114│                  * FD to each thread.
6115│                  */
6116│                 int per_thread_fd = c ? dup(sfd) : sfd;
6117│                 dispatch_conn_new(per_thread_fd, conn_read,
6118│                                   EV_READ | EV_PERSIST,
6119│                                   UDP_READ_BUFFER_SIZE, transport, NULL);
6120│             }
6121│         } else {
6122├───────────> if (!(listen_conn_add = conn_new(sfd, conn_listening,
6123│                                              EV_READ | EV_PERSIST, 1,
6124│                                              transport, main_base, NULL))) {
6125│                 fprintf(stderr, "failed to create listening connection\n");
6126│                 exit(EXIT_FAILURE);
6127│             }
6128│ #ifdef TLS
6129│             listen_conn_add->ssl_enabled = ssl_enabled;
6130│ #else
6131│             assert(ssl_enabled == false);
6132│ #endif
6133│             listen_conn_add->next = listen_conn;
6134│             listen_conn = listen_conn_add;
6135│         }
6136│     }
6137│
6138│     freeaddrinfo(ai);
6139│
6140│     /* Return zero iff we detected no errors in starting up connections */
6141│     return success == 0;
6142│ }

4 conn_new (sfd=35, init_state=conn_listening, event_flags=18, read_buffer_size=1, transport=tcp_transport, base=0x646040, ssl=0x0) at memcached.c:547

540│ conn *conn_new(const int sfd, enum conn_states init_state,
541│                 const int event_flags,
542│                 const int read_buffer_size, enum network_transport transport,
543│                 struct event_base *base, void *ssl) {
544│     conn *c;
545│
546│     assert(sfd >= 0 && sfd < max_fds);
547├───> c = conns[sfd];
548│
549│     if (NULL == c) {
550│         if (!(c = (conn *)calloc(1, sizeof(conn)))) {
551│             STATS_LOCK();
552│             stats.malloc_fails++;
553│             STATS_UNLOCK();
554│             fprintf(stderr, "Failed to allocate connection object\n");
555│      	  return NULL;
556│         }
557│         MEMCACHED_CONN_CREATE(c);
558│         c->read = NULL;
559│         c->sendmsg = NULL;
560│         c->write = NULL;
561│         c->rbuf = c->wbuf = 0;
562│         c->ilist = 0;
563│         c->suffixlist = 0;
564│         c->iov = 0;
565│         c->msglist = 0;
566│         c->hdrbuf = 0;
567│
568│         c->rsize = read_buffer_size;
569│         c->wsize = DATA_BUFFER_SIZE;
570│         c->isize = ITEM_LIST_INITIAL;
571│         c->suffixsize = SUFFIX_LIST_INITIAL;
572│         c->iovsize = IOV_LIST_INITIAL;
573├───────> c->msgsize = MSG_LIST_INITIAL;
574│         c->hdrsize = 0;
575│
576│         c->rbuf = (char *)malloc((size_t)c->rsize);
577│         c->wbuf = (char *)malloc((size_t)c->wsize);
578│         c->ilist = (item **)malloc(sizeof(item *) * c->isize);
579│         c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
580│         c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
581│         c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
582│
583│         if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
584│                 c->msglist == 0 || c->suffixlist == 0) {
585│             conn_free(c);
586│             STATS_LOCK();
587│             stats.malloc_fails++;
588│             STATS_UNLOCK();
589│             fprintf(stderr, "Failed to allocate buffers for connection\n");
590│             return NULL;
591│         }
592│
593│         STATS_LOCK();
594│         stats_state.conn_structs++;
595│         STATS_UNLOCK();
596│
597│         c->sfd = sfd;
598│         conns[sfd] = c;
599│     }
600│
601│     c->transport = transport;
602│     c->protocol = settings.binding_protocol;
603│
604│     /* unix socket mode doesn't need this, so zeroed out.  but why
605│      * is this done for every command?  presumably for UDP
606│      * mode.  */
607│     if (!settings.socketpath) {
608│         c->request_addr_size = sizeof(c->request_addr);
609│     } else {
610│         c->request_addr_size = 0;
611│     }
612│
613├───> if (transport == tcp_transport && init_state == conn_new_cmd) {
614│         if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
615│                         &c->request_addr_size)) {
616│             perror("getpeername");
617│             memset(&c->request_addr, 0, sizeof(c->request_addr));
618│         }
619│     }
620│
621│     if (settings.verbose > 1) {
622│         if (init_state == conn_listening) {
623│             fprintf(stderr, "<%d server listening (%s)\n", sfd,
624│                 prot_text(c->protocol));
625│         } else if (IS_UDP(transport)) {

ここで event_set で event_handler が設定されている。この event_handler関数が、後ほどコマンド実行される度に呼び出される関数。

691│
692├───> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
693│     event_base_set(base, &c->event);
694│     c->ev_flags = event_flags;
695│
696│     if (event_add(&c->event, 0) == -1) {
697│         perror("event_add");
698│         return NULL;
699│     }
700│
701│     STATS_LOCK();
702│     stats_state.curr_conns++;
703│     stats.total_conns++;
704│     STATS_UNLOCK();
705│
706│     MEMCACHED_CONN_ALLOCATE(c->sfd);
707│
708│     return c;
709│ }

1

8132│     /* Give the sockets a moment to open. I know this is dumb, but the error
8133│      * is only an advisory.
8134│      */
8135│     usleep(1000);
8136├───> if (stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) {
8137│         fprintf(stderr, "Maxconns setting is too low, use -c to increase.\n");
8138│         exit(EXIT_FAILURE);
8139│     }
8140│
8141│     if (pid_file != NULL) {
8142│         save_pid(pid_file);
8143│     }
8144│
8145│     /* Drop privileges no longer needed */
8146│     if (settings.drop_privileges) {
8147│         drop_privileges();
8148│     }
8149│
8150│     /* Initialize the uriencode lookup table. */
8151│     uriencode_init();
8152│
8153│     /* enter the event loop */
8154├───> if (event_base_loop(main_base, 0) != 0) {
8155│         retval = EXIT_FAILURE;
8156│     }
8157│
8158│     stop_assoc_maintenance_thread();
8159│
8160│     /* remove the PID file if we're a daemon */
8161│     if (do_daemonize)
8162│         remove_pidfile(pid_file);
8163│     /* Clean up strdup() call for bind() address */
8164│     if (settings.inter)
8165│       free(settings.inter);
8166│

最後にmain_baseがevent_base_loopによりイベントのループ処理が始まる。

2 uriencode_init () at util.c:16

14│ void uriencode_init(void) {
15│     int x;
16├───> char *str = uriencode_str;
17│     for (x = 0; x < 256; x++) {
18│         if (isalnum(x) || x == '-' || x == '.' || x == '_' || x == '~') {
19│             uriencode_map[x] = NULL;
20│         } else {
21│       	 snprintf(str, 4, "%%%02hhX", (unsigned char)x);
22│             uriencode_map[x] = str;
23│             str += 3; /* lobbing off the \0 is fine */
24│         }
25│     }
26│ }

SETコマンドの挙動

1 event_handler (fd=35, which=2, arg=0x6521e0) at memcached.c:5913

5910│ void event_handler(const int fd, const short which, void *arg) {
5911│     conn *c;
5912│
5913├───> c = (conn *)arg;
5914│     assert(c != NULL);
5915│
5916│     c->which = which;
5917│
5918│     /* sanity */
5919│     if (fd != c->sfd) {
5920│         if (settings.verbose > 0)
5921│             fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
5922│         conn_close(c);
5923│         return;
5924│     }
5925│
5926│     drive_machine(c);
5927│
5928│     /* wait for next event */
5929│     return;
5930│ }

2 drive_machine (c=0x6521e0) at memcached.c:5500

5499│ static void drive_machine(conn *c) {
5500│     bool stop = false;
5501│     int sfd;
5502│     socklen_t addrlen;
5503│     struct sockaddr_storage addr;
5504│     int nreqs = settings.reqs_per_event;
5505│     int res;
5506│     const char *str;
5507│ #ifdef HAVE_ACCEPT4
5508│     static int  use_accept4 = 1;
5509│ #else
5510│     static int  use_accept4 = 0;
5511│ #endif
5512│
5513│     assert(c != NULL);
5514│

c->stateがconn_listening

5515├───> while (!stop) {
5517│         switch(c->state) {
5518│         case conn_listening:
5519│             addrlen = sizeof(addr);
5520│ #ifdef HAVE_ACCEPT4
5521│             if (use_accept4) {
5522│                 sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
5523│             } else {
5524│                 sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
5525│             }
5526│ #else
5527│             sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
5528│ #endif
5529├───────────> if (sfd == -1) {
5530│                 if (use_accept4 && errno == ENOSYS) {
5531│                     use_accept4 = 0;
5532│                     continue;
5533│                 }
5534│                 perror(use_accept4 ? "accept4()" : "accept()");
5535│                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
5536│                     /* these are transient, so don't log anything */
5537│                     stop = true;
5538│                 } else if (errno == EMFILE) {
5539│                     if (settings.verbose > 0)
5540│                         fprintf(stderr, "Too many open connections\n");
5541│                     accept_new_conns(false);
5605├───────────────> dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
5606│                                      DATA_BUFFER_SIZE, c->transport, ssl_v);

3 dispatch_conn_new (sfd=37, init_state=conn_new_cmd, event_flags=18, read_buffer_size=2048, transport=tcp_transport

, ssl=0x0) at thread.c:491

484│ /*
485│  * Dispatches a new connection to another thread. This is only ever called
486│  * from the main thread, either during initialization (for UDP) or because
487│  * of an incoming connection.
488│  */
489│ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
490│                        int read_buffer_size, enum network_transport transport, void *ssl) {
491├───> CQ_ITEM *item = cqi_new();
492│     char buf[1];
493│     if (item == NULL) {
494│         close(sfd);
495│         /* given that malloc failed this may also fail, but let's try */
496│         fprintf(stderr, "Failed to allocate memory for connection object\n");
497│         return ;
498│     }
499│
500│     int tid = (last_thread + 1) % settings.num_threads;
501│
502│     LIBEVENT_THREAD *thread = threads + tid;
503│
503│
504│     last_thread = tid;
505│
506│     item->sfd = sfd;
507│     item->init_state = init_state;
508│     item->event_flags = event_flags;
509│     item->read_buffer_size = read_buffer_size;
510│     item->transport = transport;
511│     item->mode = queue_new_conn;
512├───> item->ssl = ssl;
513│
514│     cq_push(thread->new_conn_queue, item);
515│
516│     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
517│     buf[0] = 'c';
518│     if (write(thread->notify_send_fd, buf, 1) != 1) {
519│         perror("Writing to thread notify pipe");
520│     }
521│ }

thread->notify_send_fdにはbufに格納された"c"をwriteする。

4 cqi_new () at thread.c:240

236│ /*
237│  * Returns a fresh connection queue item.
238│  */
239│ static CQ_ITEM *cqi_new(void) {
240├───> CQ_ITEM *item = NULL;
241│     pthread_mutex_lock(&cqi_freelist_lock);
242│     if (cqi_freelist) {
243│         item = cqi_freelist;
244│         cqi_freelist = item->next;
245│     }
246│     pthread_mutex_unlock(&cqi_freelist_lock);
247│
248│     if (NULL == item) {
249│         int i;
250│
251│         /* Allocate a bunch of items at once to reduce fragmentation */
252│         item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
253├───────> if (NULL == item) {
254│             STATS_LOCK();
255│             stats.malloc_fails++;
256│             STATS_UNLOCK();
257│             return NULL;
258│         }
259│
260│         /*
261│          * Link together all the new items except the first one
262│          * (which we'll return to the caller) for placement on
263│          * the freelist.
264│          */
265│         for (i = 2; i < ITEMS_PER_ALLOC; i++)
266│             item[i - 1].next = &item[i];
267│
268│         pthread_mutex_lock(&cqi_freelist_lock);
269│         item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
270│         cqi_freelist = &item[1];
271│         pthread_mutex_unlock(&cqi_freelist_lock);
272│     }
273│
274│     return item;
275│ }

4 cq_push (cq=0x64f250, item=0x658270) at thread.c:225

221│ /*
222│  * Adds an item to a connection queue.
223│  */
224│ static void cq_push(CQ *cq, CQ_ITEM *item) {
225├───> item->next = NULL;
226│
227│     pthread_mutex_lock(&cq->lock);
228│     if (NULL == cq->tail)
229│         cq->head = item;
230│     else
231│         cq->tail->next = item;
232│     cq->tail = item;
233│     pthread_mutex_unlock(&cq->lock);
234│ }

1 event_handler

2 drive_machine

その後一旦libeventの処理に戻り、event_handlerからdrive_machine

c->stateがconn_new_cmdとなっている。

5651│         case conn_new_cmd:
5652│             /* Only process nreqs at a time to avoid starving other
5653│                connections */
5654│
5655│             --nreqs;
5656├───────────> if (nreqs >= 0) {
5657│                 reset_cmd_handler(c);
5658│             } else {
5659│                 pthread_mutex_lock(&c->thread->stats.mutex);
5660│                 c->thread->stats.conn_yields++;
5661│                 pthread_mutex_unlock(&c->thread->stats.mutex);
5662│                 if (c->rbytes > 0) {
5663│              	  /* We have already read in data into the input buffer,
5664│                        so libevent will most likely not signal read events
5665│                        on the socket (unless more data is available. As a
5666│                        hack we should just put in a request to write data,
5667│                        because that should be possible ;-)
5668│                     */

3 reset_cmd_handler (c=0x7ffff00109a0) at memcached.c:2766

最後にconn_set_state(c, conn_waiting)が実行される。

2765│ static void reset_cmd_handler(conn *c) {
2766├───> c->cmd = -1;
2767│     c->substate = bin_no_state;
2768│     if(c->item != NULL) {
2769│         item_remove(c->item);
2770│         c->item = NULL;
2771│     }
2772│     conn_shrink(c);
2773│     if (c->rbytes > 0) {
2774│         conn_set_state(c, conn_parse_cmd);
2775│     } else {
2776│         conn_set_state(c, conn_waiting);
2777│     }
2778│ }

4 conn_shrink (c=0x7ffff00109a0) at memcached.c:910

899│ /*
900│  * Shrinks a connection's buffers if they're too big.  This prevents
901│  * periodic large "get" requests from permanently chewing lots of server
902│  * memory.
903│  *
904│  * This should only be called in between requests since it can wipe output
905│  * buffers!
906│  */
907│ static void conn_shrink(conn *c) {
908│     assert(c != NULL);
909│
910├───> if (IS_UDP(c->transport))
911│         return;
912│
913│     if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
914│         char *newbuf;
915│
916│         if (c->rcurr != c->rbuf)
917│             memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
918│
919│         newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
920│
921│         if (newbuf) {
922│             c->rbuf = newbuf;
923│             c->rsize = DATA_BUFFER_SIZE;
924│         }
925│         /* TODO check other branch... */
926│         c->rcurr = c->rbuf;
927│     }
928│
929├───> if (c->isize > ITEM_LIST_HIGHWAT) {
930│         item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
931│         if (newbuf) {
932│             c->ilist = newbuf;
933│             c->isize = ITEM_LIST_INITIAL;
934│         }
935│     /* TODO check error condition? */
936│     }
937│
938│     if (c->msgsize > MSG_LIST_HIGHWAT) {
939│         struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->m
940│         if (newbuf) {
941│             c->msglist = newbuf;
942│             c->msgsize = MSG_LIST_INITIAL;
943│         }
944│     /* TODO check error condition? */
945│     }
946│
947├───> if (c->iovsize > IOV_LIST_HIGHWAT) {
948│         struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])
949│         if (newbuf) {
950│             c->iov = newbuf;
951│             c->iovsize = IOV_LIST_INITIAL;
952│         }
953│     /* TODO check return value */
954│     }
955│ }

4 conn_set_state (c=0x7ffff00109a0, state=conn_waiting) at memcached.c:985

 976│ /*
 977│  * Sets a connection's current state in the state machine. Any special
 978│  * processing that needs to happen on certain state transitions can
 979│  * happen here.
 980│  */
 981│ static void conn_set_state(conn *c, enum conn_states state) {
 982│     assert(c != NULL);
 983│     assert(state >= conn_listening && state < conn_max_state);
 984│
 985├───> if (state != c->state) {
 986│         if (settings.verbose > 2) {
 987│             fprintf(stderr, "%d: going from %s to %s\n",
 988│                     c->sfd, state_text(c->state),
 989│                     state_text(state));
 990│         }
 991│
 992│         if (state == conn_write || state == conn_mwrite) {
 993│             MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
 994│         }
 995│         c->state = state;
 996│     }
 997│ }

1 event_handler

2 drive_machine

c->stateがconn_waitingになっている。最後にconn_set_state関数でconn_readに設定している。

5612│         case conn_waiting:
5613├───────────> if (!update_event(c, EV_READ | EV_PERSIST)) {
5614│                 if (settings.verbose > 0)
5615│                     fprintf(stderr, "Couldn't update event\n");
5616│                 conn_set_state(c, conn_closing);
5617│                 break;
5618│             }
5619│
5620│             conn_set_state(c, conn_read);
5621│             stop = true;
5622│      	  break;
5623│

3 update_event (c=0x7ffff00109a0, new_flags=18) at memcached.c:5295

5292│ static bool update_event(conn *c, const int new_flags) {
5293│     assert(c != NULL);
5294│
5295├───> struct event_base *base = c->event.ev_base;
5296│     if (c->ev_flags == new_flags)
5297│         return true;
5298│     if (event_del(&c->event) == -1) return false;
5299│     event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
5300│     event_base_set(base, &c->event);
5301│     c->ev_flags = new_flags;
5302│     if (event_add(&c->event, 0) == -1) return false;
5303│     return true;
5304│ }

1 event_handler

2 drive_machine

再びlibeventの処理に戻る。event.cのevent_process_active関数中でevent_process_activeが実行されると再びevent_handlerに戻ってくる c->stateがconn_readに設定されている。

resがREAD_DATA_RECEIVEDより、conn_parse_cmdに設定される。

5624│         case conn_read:
5625├───────────> res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
5626│
5627│             switch (res) {
5628│             case READ_NO_DATA_RECEIVED:
5629│                 conn_set_state(c, conn_waiting);
5630│                 break;
5631│             case READ_DATA_RECEIVED:
5632│                 conn_set_state(c, conn_parse_cmd);
5633│                 break;
5634│      	      case READ_ERROR:
5635│                 conn_set_state(c, conn_closing);
5636│                 break;
5637│             case READ_MEMORY_ERROR: /* Failed to allocate more memory */
5638│                 /* State already set by try_read_network */
5639│                 break;
5640│             }
5641│             break;
5642│

3 try_read_network (c=0x7ffff00109a0) at memcached.c:5231

5218│ /*
5219│  * read from network as much as we can, handle buffer overflow and connection
5220│  * close.
5221│  * before reading, move the remaining incomplete fragment of a command
5222│  * (if any) to the beginning of the buffer.
5223│  *
5224│  * To protect us from someone flooding a connection with bogus data causing
5225│  * the connection to eat up all available memory, break out and start looking
5226│  * at the data I've got after a number of reallocs...
5227│  *
5228│  * @return enum try_read_result
5229│  */
5230│ static enum try_read_result try_read_network(conn *c) {
5231├───> enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
5232│     int res;
5233│     int num_allocs = 0;
5234│     assert(c != NULL);
5235│
5236│     if (c->rcurr != c->rbuf) {
5237│         if (c->rbytes != 0) /* otherwise there's nothing to copy */
5238│             memmove(c->rbuf, c->rcurr, c->rbytes);
5239│         c->rcurr = c->rbuf;
5240│     }
5241│
5242│     while (1) {
5243│         if (c->rbytes >= c->rsize) {

5276行目でbreak。その後、関数もretrunされる。

5265│         int avail = c->rsize - c->rbytes;
5266├───────> res = c->read(c, c->rbuf + c->rbytes, avail);
5267│         if (res > 0) {
5268│             pthread_mutex_lock(&c->thread->stats.mutex);
5269│             c->thread->stats.bytes_read += res;
5270│             pthread_mutex_unlock(&c->thread->stats.mutex);
5271│             gotdata = READ_DATA_RECEIVED;
5272│             c->rbytes += res;
5273│             if (res == avail) {
5274│                 continue;
5275│             } else {
5276│                 break;
5277│             }
5278│         }
5279│         if (res == 0) {
5280│             return READ_ERROR;
5281│         }
5282│         if (res == -1) {
5283│             if (errno == EAGAIN || errno == EWOULDBLOCK) {
5284│                 break;
5285│             }
5286│             return READ_ERROR;
5287│         }
5288│     }

4 tcp_read (c=0x7ffff00109a0, buf=0x7ffff0010bd0, count=2048) at memcached.c:162

 159│ /* Default methods to read from/ write to a socket */
 160│ ssize_t tcp_read(conn *c, void *buf, size_t count) {
 161│     assert (c != NULL);
 162├───> return read(c->sfd, buf, count);
 163│ }

2

5643│         case conn_parse_cmd :
5644├───────────> if (try_read_command(c) == 0) {
5645│                 /* wee need more data! */
5646│                 conn_set_state(c, conn_waiting);
5647│             }
5648│
5649│             break;
5650│

3 try_read_command (c=0x7ffff00109a0) at memcached.c:5053

実際に入力した内容を読み込む処理。

5045│ /*
5046│  * if we have a complete line in the buffer, process it.
5047│  */
5048│ static int try_read_command(conn *c) {
5049│     assert(c != NULL);
5050│     assert(c->rcurr <= (c->rbuf + c->rsize));
5051│     assert(c->rbytes > 0);
5052│
5053├───> if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
5054│         if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
5055│             c->protocol = binary_prot;
5056│         } else {
5057│             c->protocol = ascii_prot;
5058│         }
5059│
5060│         if (settings.verbose > 1) {
5061│             fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
5062│                     prot_text(c->protocol));
5063│         }
5064│     }
5065│

MemcachedではASCIIモードとバイナリーモードが利用できるが、ここではASCIIモードを利用しているので、そちらの処理を実行。

5132│     } else {
5133│         char *el, *cont;
5134│
5135│         if (c->rbytes == 0)
5136│             return 0;
5137│
5138│         el = memchr(c->rcurr, '\n', c->rbytes);
5139├───────> if (!el) {
5140│             if (c->rbytes > 1024) {
5141│                 /*
5142│                  * We didn't have a '\n' in the first k. This _has_ to be a
5143│                  * large multiget, if not we should just nuke the connection.
5144│                  */
5145│                 char *ptr = c->rcurr;
5146│                 while (*ptr == ' ') { /* ignore leading whitespaces */
5147│                     ++ptr;
5148│                 }
5149│
5150│                 if (ptr - c->rcurr > 100 ||
5151│                     (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
5152│
5153│                     conn_set_state(c, conn_closing);
5154│                     return 1;
5155│                 }
5156│             }
5157│
5158│             return 0;
5159│         }
5160├───────> cont = el + 1;
5161│         if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
5162│             el--;
5163│         }
5164│         *el = '\0';
5165│
5166│         assert(cont <= (c->rcurr + c->rbytes));
5167│
5168│         c->last_cmd_time = current_time;
5169│         process_command(c, c->rcurr);
5170│
5171│         c->rbytes -= (cont - c->rcurr);
5172│         c->rcurr = cont;

4

483│ static const char *prot_text(enum protocol prot) {
484│     char *rv = "unknown";
485├───> switch(prot) {
486│         case ascii_prot:
487│      	  rv = "ascii";
488│             break;
489│         case binary_prot:
490│             rv = "binary";
491│             break;
492│         case negotiating_prot:
493│             rv = "auto-negotiate";
494│             break;
495│     }
496│     return rv;
497│ }

4 process_command (c=0x7ffff00109a0, command=0x7ffff0010bd0 "set test1 0 0 7") at memcached.c:4731

4721│ static void process_command(conn *c, char *command) {
4722│
4723│     token_t tokens[MAX_TOKENS];
4724│     size_t ntokens;
4725│     int comm;
4726│
4727│     assert(c != NULL);
4728│
4729│     MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
4730│
4731├───> if (settings.verbose > 1)
4732│         fprintf(stderr, "<%d %s\n", c->sfd, command);
4733│
4734│     /*
4735│      * for commands set/add/replace, we build an item and read the data
4736│      * directly into it, then continue in nread_complete().
4737│      */
4738│
4739│     c->msgcurr = 0;
4740│     c->msgused = 0;
4741│     c->iovused = 0;
4742│     if (add_msghdr(c) != 0) {
4743│         out_of_memory(c, "SERVER_ERROR out of memory preparing response");
4744│         return;
4745│     }
4746│
4747│     ntokens = tokenize_command(command, tokens, MAX_TOKENS);

5 add_msghdr (c=0x7ffff00109a0) at memcached.c:329

318│ /*
319│  * Adds a message header to a connection.
320│  *
321│  * Returns 0 on success, -1 on out-of-memory.
322│  */
323│ static int add_msghdr(conn *c)
324│ {
325│     struct msghdr *msg;
326│
327│     assert(c != NULL);
328│
329├───> if (c->msgsize == c->msgused) {
330│         msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
331│         if (! msg) {
332│      	  STATS_LOCK();
333│             stats.malloc_fails++;
334│             STATS_UNLOCK();
335│             return -1;
336│         }
337│         c->msglist = msg;
338│         c->msgsize *= 2;
339│     }
340│
341│     msg = c->msglist + c->msgused;
342│
343│     /* this wipes msg_iovlen, msg_control, msg_controllen, and
344│        msg_flags, the last 3 of which aren't defined on solaris: */
345│     memset(msg, 0, sizeof(struct msghdr));
346│
347│     msg->msg_iov = &c->iov[c->iovused];
348│
349│     if (IS_UDP(c->transport) && c->request_addr_size > 0) {
350│         msg->msg_name = &c->request_addr;
351│         msg->msg_namelen = c->request_addr_size;
352│     }
353│
354├───> c->msgbytes = 0;
355│     c->msgused++;
356│
357│     if (IS_UDP(c->transport)) {
358│         /* Leave room for the UDP header, which we'll fill in later. */
359│         return add_iov(c, NULL, UDP_HEADER_SIZE);
360│     }
361│
362│     return 0;
363│ }

5 tokenize_command (command=0x7ffff0010bd0 "set test1 0 0 7", tokens=0x7ffff6da8bd0, max_tokens=8) at memcached.c:3045

ntokensでは、"set test1 0 0 7 sample1"の要素数である6が返される。

3032│  *
3033│  * Usage example:
3034│  *
3035│  *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
3036│  *      for(int ix = 0; tokens[ix].length != 0; ix++) {
3037│  *          ...
3038│  *      }
3039│  *      ncommand = tokens[ix].value - command;
3040│  *      command  = tokens[ix].value;
3041│  *   }
3042│  */
3043│ static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
3044│     char *s, *e;
3045├───> size_t ntokens = 0;
3046│     size_t len = strlen(command);
3047│     unsigned int i = 0;
3048│
3049│     assert(command != NULL && tokens != NULL && max_tokens > 1);
3050│
3051│     s = e = command;
3052│     for (i = 0; i < len; i++) {
3053│         if (*e == ' ') {
3054│             if (s != e) {
3055│                 tokens[ntokens].value = s;
3056│                 tokens[ntokens].length = e - s;
3057│                 ntokens++;
3058│                 *e = '\0';
3059│                 if (ntokens == max_tokens - 1) {
3060│                     e++;
3061│                     s = e; /* so we don't add an extra token */
3062│                     break;
3063│                 }
3064│             }
3065│             s = e + 1;
3066│         }
3067├───────> e++;
3068│     }
3069│
3070│     if (s != e) {
3071│         tokens[ntokens].value = s;
3072│         tokens[ntokens].length = e - s;
3073├───────> ntokens++;
3074│     }
3075│
3076│     /*
3077│      * If we scanned the whole string, the terminal value pointer is null,
3078│      * otherwise it is the first unprocessed character.
3079│      */
3080│     tokens[ntokens].value =  *e == '\0' ? NULL : e;
3081│     tokens[ntokens].length = 0;
3082│     ntokens++;
3083│
3084│     return ntokens;
3085│ }

4

tokenize_command関数の実行結果を元に処理が実行される。ここではSETコマンドなのでprocess_update_command関数の実行

4747│     ntokens = tokenize_command(command, tokens, MAX_TOKENS);
4748│     if (ntokens >= 3 &&
4749│         ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
4750│          (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
4751│
4752│         process_get_command(c, tokens, ntokens, false, false);
4753│
4754├───> } else if ((ntokens == 6 || ntokens == 7) &&
4755│                ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
4756│                 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
4757│                 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
4758│                 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
4759│                 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
4760│
4761│         process_update_command(c, tokens, ntokens, comm, false);
4762│
4763│     } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm =
4764│
4765│         process_update_command(c, tokens, ntokens, comm, true);
4766│

5 process_update_command (c=0x7ffff00109a0, tokens=0x7ffff6da8bd0, ntokens=6, comm=2, handle_cas=false) at memcached.c:4107

4103│ static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas
4104│     char *key;
4105│     size_t nkey;
4106│     unsigned int flags;
4107│     int32_t exptime_int = 0;
4108│     time_t exptime;
4109│     int vlen;
4110├───> uint64_t req_cas_id=0;
4111│     item *it;
4112│
4113│     assert(c != NULL);
4114│
4115│     set_noreply_maybe(c, tokens, ntokens);
4116│
4117│     if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
4118│         out_string(c, "CLIENT_ERROR bad command line format");
4119│         return;
4120│     }
4121│

6 set_noreply_maybe (c=0x7ffff00109a0, tokens=0x7ffff6da8bd0, ntokens=6) at memcached.c:3102

ここではSETコマンド実行時にnoreplyオプションを指定していないので何も実行されない。

3100│ static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
3101│ {
3102├───> int noreply_index = ntokens - 2;
3103│
3104│     /*
3105│       NOTE: this function is not the first place where we are going to
3106│       send the reply.  We could send it instead from process_command()
3107│       if the request line has wrong number of tokens.  However parsing
3108│       malformed line for "noreply" option is not reliable anyway, so
3109│       it can't be helped.
3110│     */
3111│     if (tokens[noreply_index].value
3112│         && strcmp(tokens[noreply_index].value, "noreply") == 0) {
3113│         c->noreply = true;
3114│     }
3115│     return c->noreply;
3116│ }

5

item_allocが実行される

4122│     key = tokens[KEY_TOKEN].value;
4123│     nkey = tokens[KEY_TOKEN].length;
4124│
4125│     if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
4126│            && safe_strtol(tokens[3].value, &exptime_int)
4127│            && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
4128│         out_string(c, "CLIENT_ERROR bad command line format");
4129│         return;
4130│     }
4131│
4132│     /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
4133├───> exptime = exptime_int;
4134│
4135│     /* Negative exptimes can underflow and end up immortal. realtime() will
4136│        immediately expire values that are greater than REALTIME_MAXDELTA, but less
4137│        than process_started, so lets aim for that. */
4138│     if (exptime < 0)
4139│         exptime = REALTIME_MAXDELTA + 1;
4140│
4141│     // does cas value exist?
4142│     if (handle_cas) {
4143│         if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
4144│             out_string(c, "CLIENT_ERROR bad command line format");
4145│             return;
4146│         }
4147│     }
4148│
4149│     if (vlen < 0 || vlen > (INT_MAX - 2)) {
4150│         out_string(c, "CLIENT_ERROR bad command line format");
4151│         return;
4152│     }
4153│     vlen += 2;
4154│
4155├───> if (settings.detail_enabled) {
4156│         stats_prefix_record_set(key, nkey);
4157│     }
4158│
4159│     it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
4160│
4161│     if (it == 0) {
4162│         enum store_item_type status;
4163│         if (! item_size_ok(nkey, flags, vlen)) {
4164│             out_string(c, "SERVER_ERROR object too large for cache");
4165│             status = TOO_LARGE;
4166│         } else {
4167│             out_of_memory(c, "SERVER_ERROR out of memory storing object");

6 realtime (exptime=0) at memcached.c:208

 200│ /*
 201│  * given time value that's either unix time or delta from current unix time, return
 202│  * unix time. Use the fact that delta can't exceed one month (and real time value can't
 203│  * be that low).
 204│  */
 205│ static rel_time_t realtime(const time_t exptime) {
 206│     /* no. of seconds in 30 days - largest possible delta exptime */
 207│ 
 208│     if (exptime == 0) return 0; /* 0 means never expire */
 209│ 
 210│     if (exptime > REALTIME_MAXDELTA) {
 211│         /* if item expiration is at/before the server started, give it an
 212│            expiration time of 1 second after the server started.
 213│            (because 0 means don't expire).  without this, we'd
 214│            underflow and wrap around to some large value way in the
 215│            future, effectively making items expiring in the past
 216│            really expiring never */
 217│         if (exptime <= process_started)
 218│             return (rel_time_t)1;
 219│         return (rel_time_t)(exptime - process_started);
 220│     } else {
 221│         return (rel_time_t)(exptime + current_time);
 222│     }
 223├>}

6 item_alloc (key=0x7ffff0010bd4 "test1", nkey=5, flags=0, exptime=0, nbytes=9) at thread.c:579

571│ /********************************* ITEM ACCESS *******************************/
572│
573│ /*
574│  * Allocates a new item.
575│  */
576│ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
577│     item *it;
578│     /* do_item_alloc handles its own locks */
579├───> it = do_item_alloc(key, nkey, flags, exptime, nbytes);
580│     return it;
581│ }

7 do_item_alloc (key=0x7ffff0010bd4 "test1", nkey=5, flags=0, exptime=0, nbytes=9) at items.c:260

CASを有効にしていると、uint64_tのサイズ分の8バイトが全体サイズに追加されている処理も行われている。

 257│ item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
 258│                     const rel_time_t exptime, const int nbytes) {
 259│     uint8_t nsuffix;
 260│     item *it = NULL;
 261│     char suffix[40];
 262│     // Avoid potential underflows.
 263├───> if (nbytes < 2)
 264│         return 0;
 265│
 266│     size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
 267│     if (settings.use_cas) {
 268│         ntotal += sizeof(uint64_t);
 269│     }
 270│
 271│     unsigned int id = slabs_clsid(ntotal);
 272│     unsigned int hdr_id = 0;
 273│     if (id == 0)
 274│         return 0;
 275│

8 item_make_header (nkey=6 ‘\006’, flags=0, nbytes=9, suffix=0x7ffff6da8a70 "", nsuffix=0x7ffff6da8aa7 "") at items.c:171

 158│  * Generates the variable-sized part of the header for an object.
 159│  *
 160│  * key     - The key
 161│  * nkey    - The length of the key
 162│  * flags   - key flags
 163│  * nbytes  - Number of bytes to hold value and addition CRLF terminator
 164│  * suffix  - Buffer for the "VALUE" line suffix (flags, size).
 165│  * nsuffix - The length of the suffix is stored here.
 166│  *
 167│  * Returns the total size of the header.
 168│  */
 169│ static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
 170│                      char *suffix, uint8_t *nsuffix) {
 171├───> if (settings.inline_ascii_response) {
 172│         /* suffix is defined at 40 chars elsewhere.. */
 173│         *nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
 174│     } else {
 175│         if (flags == 0) {
 176│             *nsuffix = 0;
 177│         } else {
 178│             *nsuffix = sizeof(flags);
 179│         }
 180│     }
 181│     return sizeof(item) + nkey + *nsuffix + nbytes;
 182│ }

8 slabs_clsid (size=71) at slabs.c:92

  83│ /*
  84│  * Figures out which slab class (chunk size) is required to store an item of
  85│  * a given size.
  86│  *
  87│  * Given object size, return id to use when allocating/freeing memory for object
  88│  * 0 means error: can't store such a large object
  89│  */
  90│
  91│ unsigned int slabs_clsid(const size_t size) {
  92├───> int res = POWER_SMALLEST;
  93│
  94│     if (size == 0 || size > settings.item_size_max)
  95│         return 0;
  96│     while (size > slabclass[res].size)
  97│         if (res++ == power_largest)     /* won't fit in the biggest slab */
  98│             return power_largest;
  99│     return res;
 100│ }

7

elseのdo_item_alloc_pullが実行される

 276│     /* This is a large item. Allocate a header object now, lazily allocate
 277│      *  chunks while reading the upload.
 278│      */
 279├───> if (ntotal > settings.slab_chunk_size_max) {
 280│         /* We still link this item into the LRU for the larger slab class, but
 281│          * we're pulling a header from an entirely different slab class. The
 282│          * free routines handle large items specifically.
 283│          */
 284│         int htotal = nkey + 1 + nsuffix + sizeof(item) + sizeof(item_chunk);
 285│         if (settings.use_cas) {
 286│             htotal += sizeof(uint64_t);
 287│         }
 288│ #ifdef NEED_ALIGN
 289│         // header chunk needs to be padded on some systems
 290│         int remain = htotal % 8;
 291│         if (remain != 0) {
 292│             htotal += 8 - remain;
 293│         }
 294│ #endif
 295│         hdr_id = slabs_clsid(htotal);
 296│         it = do_item_alloc_pull(htotal, hdr_id);
 297│         /* setting ITEM_CHUNKED is fine here because we aren't LINKED yet. */
 298│         if (it != NULL)
 299│             it->it_flags |= ITEM_CHUNKED;
 300│     } else {
 301├───────> it = do_item_alloc_pull(ntotal, id);
 302│     }
 303│

8 do_item_alloc (key=0x7ffff0010bd4 "test1", nkey=5, flags=0, exptime=0, nbytes=9) at items.c:272

slabs_allocが実行される。

 184│ item *do_item_alloc_pull(const size_t ntotal, const unsigned int id) {
 185│     item *it = NULL;
 186│     int i;
 187│     /* If no memory is available, attempt a direct LRU juggle/eviction */
 188│     /* This is a race in order to simplify lru_pull_tail; in cases where
 189│      * locked items are on the tail, you want them to fall out and cause
 190│      * occasional OOM's, rather than internally work around them.
 191│      * This also gives one fewer code path for slab alloc/free
 192│      */
 193├───> for (i = 0; i < 10; i++) {
 194│         uint64_t total_bytes;
 195│         /* Try to reclaim memory first */
 196│         if (!settings.lru_segmented) {
 197│             lru_pull_tail(id, COLD_LRU, 0, 0, 0, NULL);
 198│         }
 199│         it = slabs_alloc(ntotal, id, &total_bytes, 0);
 200│
 201│         if (settings.temp_lru)
 202│             total_bytes -= temp_lru_size(id);
 203│
 204│         if (it == NULL) {
 205│             if (lru_pull_tail(id, COLD_LRU, total_bytes, LRU_PULL_EVICT, 0, NULL) <= 0) {
 206│                 if (settings.lru_segmented) {
 207│                     lru_pull_tail(id, HOT_LRU, total_bytes, 0, 0, NULL);
 208│                 } else {
 209│                     break;
 210│                 }
 211│             }

9 slabs_alloc (size=71, id=1, total_bytes=0x7ffff6da8a28, flags=0) at slabs.c:658

 654│ void *slabs_alloc(size_t size, unsigned int id, uint64_t *total_bytes,
 655│         unsigned int flags) {
 656│     void *ret;
 657│
 658├───> pthread_mutex_lock(&slabs_lock);
 659│     ret = do_slabs_alloc(size, id, total_bytes, flags);
 660│     pthread_mutex_unlock(&slabs_lock);
 661│     return ret;
 662│ }

10 do_slabs_alloc (size=71, id=1, total_bytes=0x7ffff6da8a28, flags=0) at slabs.c:337

do_slabs_newslabが実行される。

 333│ /*@null@*/
 334│ static void *do_slabs_alloc(const size_t size, unsigned int id, uint64_t *total_bytes,
 335│         unsigned int flags) {
 336│     slabclass_t *p;
 337│     void *ret = NULL;
 338│     item *it = NULL;
 339│
 340├───> if (id < POWER_SMALLEST || id > power_largest) {
 341│         MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);
 342│         return NULL;
 343│     }
 344│     p = &slabclass[id];
 345│     assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0);
 346│     if (total_bytes != NULL) {
 347│         *total_bytes = p->requested;
 348│     }
 349│
 350│     assert(size <= p->size);
 351│     /* fail unless we have space at the end of a recently allocated page,
 352│        we have something on our freelist, or we could allocate a new page */
 353│     if (p->sl_curr == 0 && flags != SLABS_ALLOC_NO_NEWPAGE) {
 354├───────> do_slabs_newslab(id);
 355│     }
 356│
 357│     if (p->sl_curr != 0) {
 358│         /* return off our freelist */
 359│         it = (item *)p->slots;
 360│         p->slots = it->next;
 361│         if (it->next) it->next->prev = 0;
 362│         /* Kill flag and initialize refcount here for lock safety in slab
 363│          * mover's freeness detection. */
 364│         it->it_flags &= ~ITEM_SLABBED;
 365│         it->refcount = 1;
 366│         p->sl_curr--;
:
:
 372│     if (ret) {
 373│         p->requested += size;
 374│         MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);
 375│     } else {
 376│         MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
 377│     }

11 do_slabs_newslab (id=1) at slabs.c:302

 301│ static int do_slabs_newslab(const unsigned int id) {
 302│     slabclass_t *p = &slabclass[id];
 303│     slabclass_t *g = &slabclass[SLAB_GLOBAL_PAGE_POOL];
 304│     int len = (settings.slab_reassign || settings.slab_chunk_size_max != settings.slab_page_size)
 305│         ? settings.slab_page_size
 306│         : p->size * p->perslab;
 307│     char *ptr;
 308│
 309├───> if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0
 310│          && g->slabs == 0)) {
 311│         mem_limit_reached = true;
 312│         MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
 313│         return 0;
 314│     }
 315│
 316│     if ((grow_slab_list(id) == 0) ||
 317│         (((ptr = get_page_from_global_pool()) == NULL) &&
 318│         ((ptr = memory_allocate((size_t)len)) == 0))) {
 319│
 320│         MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
 321│         return 0;
 322│     }
 323│
 324│     memset(ptr, 0, (size_t)len);
 325│     split_slab_page_into_freelist(ptr, id);
 326│
 327│     p->slab_list[p->slabs++] = ptr;
 328│     MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);

12 grow_slab_list (id=1) at slabs.c:270

 269│ static int grow_slab_list (const unsigned int id) {
 270├───> slabclass_t *p = &slabclass[id];
 271│     if (p->slabs == p->list_size) {
 272│         size_t new_size =  (p->list_size != 0) ? p->list_size * 2 : 16;
 273│         void *new_list = realloc(p->slab_list, new_size * sizeof(void *));
 274│         if (new_list == 0) return 0;
 275│         p->list_size = new_size;
 276│         p->slab_list = new_list;
 277│     }
 278│     return 1;
 279│ }

12 memory_allocate (size=1048576) at slabs.c:611

 608│ static void *memory_allocate(size_t size) {
 609│     void *ret;
 610│
 611├───> if (mem_base == NULL) {
 612│         /* We are not using a preallocated large memory chunk */
 613│         ret = malloc(size);
 614│     } else {
 615│         ret = mem_current;
 616│
 617│         if (size > mem_avail) {
 618│             return NULL;
 619│         }
 620│
 621│         /* mem_current pointer _must_ be aligned!!! */
 622│         if (size % CHUNK_ALIGN_BYTES) {
 623│             size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

12 split_slab_page_into_freelist (ptr=0x7ffff44a4010 "", id=1) at slabs.c:282

 281│ static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
 282├───> slabclass_t *p = &slabclass[id];
 283│     int x;
 284│     for (x = 0; x < p->perslab; x++) {
 285│         do_slabs_free(ptr, 0, id);
 286│         ptr += p->size;
 287│     }
 288│ }

13 do_slabs_free (ptr=0x7ffff44a4010, size=0, id=1) at slabs.c:447

 442│ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
 443│     slabclass_t *p;
 444│     item *it;
 445│
 446│     assert(id >= POWER_SMALLEST && id <= power_largest);
 447├───> if (id < POWER_SMALLEST || id > power_largest)
 448│         return;
 449│
 450│     MEMCACHED_SLABS_FREE(size, id, ptr);
 451│     p = &slabclass[id];
 452│
 453│     it = (item *)ptr;
 454│     if ((it->it_flags & ITEM_CHUNKED) == 0) {
 455│ #ifdef EXTSTORE
 456│         bool is_hdr = it->it_flags & ITEM_HDR;
 457│ #endif
 458│         it->it_flags = ITEM_SLABBED;
 459│         it->slabs_clsid = 0;
 460│         it->prev = 0;
 461│         it->next = p->slots;
 462│         if (it->next) it->next->prev = it;
 463│         p->slots = it;
 464│
 465├───────> p->sl_curr++;
 466│ #ifdef EXTSTORE
 467│         if (!is_hdr) {
 468│             p->requested -= size;
 469│         } else {
 470│             p->requested -= (size - it->nbytes) + sizeof(item_hdr);
 471│         }
 472│ #else
 473│         p->requested -= size;
 474│ #endif
 475│     } else {
 476│         do_slabs_free_chunked(it, size);
 477│     }
 478│     return;
 479│ }

8

 317│     /* Items are initially loaded into the HOT_LRU. This is '0' but I want at
 318│      * least a note here. Compiler (hopefully?) optimizes this out.
 319│      */
 320│     if (settings.temp_lru &&
 321│             exptime - current_time <= settings.temporary_ttl) {
 322│         id |= TEMP_LRU;
 323│     } else if (settings.lru_segmented) {
 324│         id |= HOT_LRU;
 325│     } else {
 326│         /* There is only COLD in compat-mode */
 327│         id |= COLD_LRU;
 328│     }
 329├───> it->slabs_clsid = id;
 330│
 331│     DEBUG_REFCNT(it, '*');
 332│     it->it_flags |= settings.use_cas ? ITEM_CAS : 0;
 333│     it->nkey = nkey;
 334│     it->nbytes = nbytes;
 335│     memcpy(ITEM_key(it), key, nkey);
 336│     it->exptime = exptime;
 337│     if (settings.inline_ascii_response) {
 338│         memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
 339│     } else if (nsuffix > 0) {
 340│         memcpy(ITEM_suffix(it), &flags, sizeof(flags));
 341│     }
 342│     it->nsuffix = nsuffix;
 343│
 344│     /* Initialize internal chunk. */
 345│     if (it->it_flags & ITEM_CHUNKED) {
 346│         item_chunk *chunk = (item_chunk *) ITEM_schunk(it);
 347│
 348│         chunk->next = 0;
 349│         chunk->prev = 0;
 350│         chunk->used = 0;
 351│         chunk->size = 0;
 352│         chunk->head = it;
 353│         chunk->orig_clsid = hdr_id;
 354│     }
 355├───> it->h_next = 0;
 356│
 357│     return it;
 358│ }

5

conn_nreadの状態に設定される。

4189├───> ITEM_set_cas(it, req_cas_id);
4190│
4191│     c->item = it;
4192│ #ifdef NEED_ALIGN
4193│     if (it->it_flags & ITEM_CHUNKED) {
4194│         c->ritem = ITEM_schunk(it);
4195│     } else {
4196│         c->ritem = ITEM_data(it);
4197│     }
4198│ #else
4199│     c->ritem = ITEM_data(it);
4200│ #endif
4201│     c->rlbytes = it->nbytes;
4202│     c->cmd = comm;
4203│     conn_set_state(c, conn_nread);
4204│ }

2

drive_machineに再度戻ってきて、conn_nreadの処理が実行される。1週目はc->rlbytes > 0で特に何も実行されないが、2週目で再度case文が実行されて、c->rlbytesが0なので、complete_nreadが実行される

5680│         case conn_nread:
5681├───────────> if (c->rlbytes == 0) {
5682│                 complete_nread(c);
5683│                 break;
5684│             }
5685│
5686│             /* Check if rbytes < 0, to prevent crash */
5687│             if (c->rlbytes < 0) {
5688│                 if (settings.verbose) {
5689│              	  fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);
5690│                 }
5691│                 conn_set_state(c, conn_closing);
5692│                 break;
5693│             }

3 complete_nread (c=0x7ffff00109a0) at memcached.c:2785

ASCIIモードなので、 complete_nread_asciiが実行される。

2780│ static void complete_nread(conn *c) {
2781│     assert(c != NULL);
2782│     assert(c->protocol == ascii_prot
2783│            || c->protocol == binary_prot);
2784│
2785├───> if (c->protocol == ascii_prot) {
2786│         complete_nread_ascii(c);
2787│     } else if (c->protocol == binary_prot) {
2788│         complete_nread_binary(c);
2789│     }
2790│ }

4 complete_nread_ascii (c=0x7ffff00109a0) at memcached.c:1231

1224│ /*
1225│  * we get here after reading the value in set/add/replace commands. The command
1226│  * has been stored in c->cmd, and the item is ready in c->item.
1227│  */
1228│ static void complete_nread_ascii(conn *c) {
1229│     assert(c != NULL);
1230│
1231│     item *it = c->item;
1232│     int comm = c->cmd;
1233│     enum store_item_type ret;
1234│     bool is_valid = false;
1235│
1236├───> pthread_mutex_lock(&c->thread->stats.mutex);
1237│     c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
1238│     pthread_mutex_unlock(&c->thread->stats.mutex);
1239│
1240│     if ((it->it_flags & ITEM_CHUNKED) == 0) {
1241│         if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {
1242│             is_valid = true;
1243│         }
1244│     } else {
1245│         char buf[2];
1246│         /* should point to the final item chunk */
1247│         item_chunk *ch = (item_chunk *) c->ritem;
1248│         assert(ch->used != 0);

対象のデータをキャッシュに保存するstore_item関数が実行される

1268│     if (!is_valid) {
1269│         out_string(c, "CLIENT_ERROR bad data chunk");
1270│     } else {
1271├─────> ret = store_item(it, comm, c);
1272│

5 store_item (item=0x7ffff45a3f70, comm=2, c=0x7ffff00109a0) at thread.c:689

682│ /*
683│  * Stores an item in the cache (high level, obeys set/add/replace semantics)
684│  */
685│ enum store_item_type store_item(item *item, int comm, conn* c) {
686│     enum store_item_type ret;
687│     uint32_t hv;
688│
689├───> hv = hash(ITEM_key(item), item->nkey);
690│     item_lock(hv);
691│     ret = do_store_item(item, comm, c, hv);
692│     item_unlock(hv);
693│     return ret;
694│ }

6 hash

MurmurHash3_x86_32関数が実行される。

 69│ /* Definition modified slightly from the public domain interface (no seed +
 70│  * return value */
 71│ uint32_t MurmurHash3_x86_32 ( const void * key, size_t length)
 72│ {
 73├─> const uint8_t * data = (const uint8_t*)key;
 74│   const int nblocks = length / 4;
 75│
 76│   uint32_t h1 = 0;
 77│
 78│   uint32_t c1 = 0xcc9e2d51;
 79│   uint32_t c2 = 0x1b873593;
 80│
 81│   //----------
 82│   // body
 83│
 84│   const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
 85│
 86│   for(int i = -nblocks; i; i++)
 87│   {
 88│     uint32_t k1 = getblock32(blocks,i);
 89│
 90│     k1 *= c1;
 91│     k1 = ROTL32(k1,15);
 92│     k1 *= c2;
 93│
 94│     h1 ^= k1;
 95│     h1 = ROTL32(h1,13);
 96│     h1 = h1*5+0xe6546b64;
 97│   }
 98│
 99│   //----------
100│   // tail
101│
102│   const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
103│
104│   uint32_t k1 = 0;
105│
106│   switch(length & 3)
107│   {
108│   case 3: k1 ^= tail[2] << 16;
109│   case 2: k1 ^= tail[1] << 8;
110├─> case 1: k1 ^= tail[0];
111│           k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
112│   };
113│
114│   //----------
115│   // finalization
116│
117│   h1 ^= length;
118│
119│   h1 = fmix32(h1);
120│
121│   //*(uint32_t*)out = h1;
122│   return h1;
123│ }

7 getblock32 (i=-1, p=0x7ffff45a3fac) at murmur3_hash.c:50

 44│ //-----------------------------------------------------------------------------
 45│ // Block read - if your platform needs to do endian-swapping or can only
 46│ // handle aligned reads, do the conversion here
 47│
 48│ static FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
 49│ {
 50├─> return p[i];
 51│ }
 52│
 53│ //-----------------------------------------------------------------------------

7 rotl32 (x=787512756, r=15 ‘\017’) at murmur3_hash.c:35

 33│ static inline uint32_t rotl32 ( uint32_t x, int8_t r )
 34│ {
 35├─> return (x << r) | (x >> (32 - r));
 36│ }
 37│
 38│ #define    ROTL32(x,y)    rotl32(x,y)

7 fmix32 (h=2252073768) at murmur3_hash.c:58

 54│ // Finalization mix - force all bits of a hash block to avalanche
 55│
 56│ static FORCE_INLINE uint32_t fmix32 ( uint32_t h )
 57│ {
 58├─> h ^= h >> 16;
 59│   h *= 0x85ebca6b;
 60│   h ^= h >> 13;
 61│   h *= 0xc2b2ae35;
 62│   h ^= h >> 16;
 63│
 64│   return h;
 65│ }
 66│
 67│ //-----------------------------------------------------------------------------
 68│

6 item_lock (hv=3155333867) at thread.c:104

6 item_unlock (hv=3155333867) at thread.c:120

 95│ /* item_lock() must be held for an item before any modifications to either its
 96│  * associated hash bucket, or the structure itself.
 97│  * LRU modifications must hold the item lock, and the LRU lock.
 98│  * LRU's accessing items must item_trylock() before modifying an item.
 99│  * Items accessible from an LRU must not be freed or modified
100│  * without first locking and removing from the LRU.
101│  */
102│
103│ void item_lock(uint32_t hv) {
104├───> mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
105│ }
119│ void item_unlock(uint32_t hv) {
120├───> mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
121│ }

6 do_store_item (it=0x7ffff45a3f70, comm=2, c=0x7ffff00109a0, hv=3155333867) at memcached.c:2897

2890│ /*
2891│  * Stores an item in the cache according to the semantics of one of the set
2892│  * commands. In threaded mode, this is protected by the cache lock.
2893│  *
2894│  * Returns the state of storage.
2895│  */
2896│ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
2897│     char *key = ITEM_key(it);
2898├───> item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
2899│     enum store_item_type stored = NOT_STORED;
2900│
2901│     item *new_it = NULL;
2902│     uint32_t flags;
2903│
2904│     if (old_it != NULL && comm == NREAD_ADD) {
2905│         /* add only adds a nonexistent item, but promote to head of LRU */
2906│         do_item_update(old_it);
2907│     } else if (!old_it && (comm == NREAD_REPLACE
2908│         || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2909│     {
2910│         /* replace only replaces an existing value; don't store */

7 do_item_get (key=0x7ffff45a3fa8 "test1", nkey=5, hv=3155333867, c=0x7ffff00109a0, do_update=false) at items.c:955

953│ /** wrapper around assoc_find which does the lazy expiration logic */
954│ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
955│     item *it = assoc_find(key, nkey, hv);
956├───> if (it != NULL) {
957│         refcount_incr(it);
958│         /* Optimization for slab reassignment. prevents popular items from
959│          * jamming in busy wait. Can only do this here to satisfy lock order
960│          * of item_lock, slabs_lock. */
961│         /* This was made unsafe by removal of the cache_lock:
962│          * slab_rebalance_signal and slab_rebal.* are modified in a separate
963│          * thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
964│          * NULL (0), but slab_end is still equal to some value, this would end
965│          * up unlinking every item fetched.
966│          * This is either an acceptable loss, or if slab_rebalance_signal is
967│          * true, slab_start/slab_end should be put behind the slabs_lock.
968│          * Which would cause a huge potential slowdown.
969│          * Could also use a specific lock for slab_rebal.* and
970│          * slab_rebalance_signal (shorter lock?)
971│          */
972│         /*if (slab_rebalance_signal &&
973│             ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
974│             do_item_unlink(it, hv);
975│             do_item_remove(it);
976│             it = NULL;
977│         }*/
978│     }

Lazy expirationですでにあるキー中のメモリは上書きされる。ここでは特に保存したこともないので、"> NOT FOUND ")が表示される。

979├───> int was_found = 0;
980│
981│     if (settings.verbose > 2) {
982│         int ii;
983│         if (it == NULL) {
984│             fprintf(stderr, "> NOT FOUND ");
985│         } else {
986│             fprintf(stderr, "> FOUND KEY ");
987│         }
988│         for (ii = 0; ii < nkey; ++ii) {
989│             fprintf(stderr, "%c", key[ii]);
990│         }
991│     }

6

2943│     } else {
2944│         int failed_alloc = 0;
2945│         /*
2946│          * Append - combine new and old record into single one. Here it's
2947│          * atomic and thread-safe.
2948│          */
2949├───────> if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2950│             /*
2951│              * Validate CAS
2952│              */
2953│             if (ITEM_get_cas(it) != 0) {
2954│                 // CAS much be equal
2955│                 if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2956│                     stored = EXISTS;
2957│                 }
2958│             }

do_item_linkが実行される。

2987├───────> if (stored == NOT_STORED && failed_alloc == 0) {
2988│             if (old_it != NULL) {
2989│                 STORAGE_delete(c->thread->storage, old_it);
2990│                 item_replace(old_it, it, hv);
2991│             } else {
2992│                 do_item_link(it, hv);
2993│             }
2994│
2995│             c->cas = ITEM_get_cas(it);
2996│
2997│             stored = STORED;
2998│         }
2999│     }

7 do_item_link (it=0x7ffff45a3f70, hv=3155333867) at items.c:474

471│ int do_item_link(item *it, const uint32_t hv) {
472│     MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
473│     assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
474│     it->it_flags |= ITEM_LINKED;
475│     it->time = current_time;
476│
477│     STATS_LOCK();
478├───> stats_state.curr_bytes += ITEM_ntotal(it);
479│     stats_state.curr_items += 1;
480│     stats.total_items += 1;
481│     STATS_UNLOCK();
482│
483│     /* Allocate a new CAS ID on link. */
484│     ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
485│     assoc_insert(it, hv);
486│     item_link_q(it);
487│     refcount_incr(it);
488│     item_stats_sizes_add(it);
489│
490│     return 1;
491│ }

8 get_cas_id () at items.c:112

 108│ /* Get the next CAS id for a new item. */
 109│ /* TODO: refactor some atomics for this. */
 110│ uint64_t get_cas_id(void) {
 111│     static uint64_t cas_id = 0;
 112├───> pthread_mutex_lock(&cas_id_lock);
 113│     uint64_t next_id = ++cas_id;
 114│     pthread_mutex_unlock(&cas_id_lock);
 115│     return next_id;
 116│ }

8 assoc_insert (it=0x7ffff45a3f70, hv=3155333867) at assoc.c:160

154│ /* Note: this isn't an assoc_update.  The key must not already exist to call this */
155│ int assoc_insert(item *it, const uint32_t hv) {
156│     unsigned int oldbucket;
157│
158│ //    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */
159│
160├───> if (expanding &&
161│         (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
162│     {
163│         it->h_next = old_hashtable[oldbucket];
164│         old_hashtable[oldbucket] = it;
165│     } else {
166│         it->h_next = primary_hashtable[hv & hashmask(hashpower)];
167│         primary_hashtable[hv & hashmask(hashpower)] = it;
168│     }
169│
170│     MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
171│     return 1;
172│ }

8 item_link_q (it=0x7ffff45a3f70) at items.c:421

420│ static void item_link_q(item *it) {
421├───> pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
422│     do_item_link_q(it);
423│     pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
424│ }

9 do_item_link_q (it=0x7ffff45a3f70) at items.c:397

393│ static void do_item_link_q(item *it) { /* item is the new head */
394│     item **head, **tail;
395│     assert((it->it_flags & ITEM_SLABBED) == 0);
396│
397│     head = &heads[it->slabs_clsid];
398│     tail = &tails[it->slabs_clsid];
399│     assert(it != *head);
400│     assert((*head && *tail) || (*head == 0 && *tail == 0));
401│     it->prev = 0;
402│     it->next = *head;
403├───> if (it->next) it->next->prev = it;
404│     *head = it;
405│     if (*tail == 0) *tail = it;
406│     sizes[it->slabs_clsid]++;
407│ #ifdef EXTSTORE
408│     if (it->it_flags & ITEM_HDR) {
409│         sizes_bytes[it->slabs_clsid] += (ITEM_ntotal(it) - it->nbytes) + sizeof(item_hdr);
410│     } else {
411│         sizes_bytes[it->slabs_clsid] += ITEM_ntotal(it);
412│     }
413│ #else
414│     sizes_bytes[it->slabs_clsid] += ITEM_ntotal(it);
415│ #endif
416│
417│     return;
418│ }

8 item_stats_sizes_add (it=0x7ffff45a3f70) at items.c:907

906│ void item_stats_sizes_add(item *it) {
907├───> if (stats_sizes_hist == NULL || stats_sizes_cas_min > ITEM_get_cas(it))
908│         return;
909│     int ntotal = ITEM_ntotal(it);
910│     int bucket = ntotal / 32;
911│     if ((ntotal % 32) != 0) bucket++;
912│     if (bucket < stats_sizes_buckets) stats_sizes_hist[bucket]++;
913│ }

6

3001├───> if (old_it != NULL)
3002│         do_item_remove(old_it);         /* release our reference */
3003│     if (new_it != NULL)
3004│         do_item_remove(new_it);
3005│
3006│     if (stored == STORED) {
3007│         c->cas = ITEM_get_cas(it);
3008│     }
3009│     LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
3010│             stored, comm, ITEM_key(it), it->nkey, it->exptime, ITEM_clsid(it));
3011│
3012│     return stored;
3013│ }

4

retがSTOREDとなっている。

1303│       switch (ret) {
1304│       case STORED:
1305├─────────> out_string(c, "STORED");
1306│           break;
1307│       case EXISTS:
1308│           out_string(c, "EXISTS");
1309│           break;
1310│       case NOT_FOUND:
1311│           out_string(c, "NOT_FOUND");
1312│           break;
1313│       case NOT_STORED:
1314│           out_string(c, "NOT_STORED");
1315│           break;
1316│       default:
1317│           out_string(c, "SERVER_ERROR Unhandled storage type.");

5 out_string (c=0x7ffff00109a0, str=0x424f99 "STORED") at memcached.c:1171

conn_writeに設定される。

1166│ static void out_string(conn *c, const char *str) {
1167│     size_t len;
1168│
1169│     assert(c != NULL);
1170│
1171├───> if (c->noreply) {
1172│         if (settings.verbose > 1)
1173│             fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
1174│         c->noreply = false;
1175│         conn_set_state(c, conn_new_cmd);
1176│         return;
1177│     }
1178│
1179│     if (settings.verbose > 1)
1180│         fprintf(stderr, ">%d %s\n", c->sfd, str);
1181│
1182│     /* Nuke a partial output... */
1183│     c->msgcurr = 0;
1184│     c->msgused = 0;
1185│     c->iovused = 0;
1186├───> add_msghdr(c);
1187│
1188│     len = strlen(str);
1189│     if ((len + 2) > c->wsize) {
1190│         /* ought to be always enough. just fail for simplicity */
1191│         str = "SERVER_ERROR output line too long";
1192│         len = strlen(str);
1193│     }
1194│
1195│     memcpy(c->wbuf, str, len);
1196│     memcpy(c->wbuf + len, "\r\n", 2);
1197│     c->wbytes = len + 2;
1198│     c->wcurr = c->wbuf;
1199│
1200│     conn_set_state(c, conn_write);
1201│     c->write_and_go = conn_new_cmd;
1202│     return;
1203│ }

5 item_remove (item=0x7ffff45a3f70) at thread.c:638

hash関数は、先程と同様MurmurHash3_x86_32関数が実行される。

632│ /*
633│  * Decrements the reference count on an item and adds it to the freelist if
634│  * needed.
635│  */
636│ void item_remove(item *item) {
637│     uint32_t hv;
638├───> hv = hash(ITEM_key(item), item->nkey);
639│
640│     item_lock(hv);
641│     do_item_remove(item);
642│     item_unlock(hv);
643│ }
644│

6 do_item_remove (it=0x7ffff45a3f70) at items.c:529

 524│ void do_item_remove(item *it) {
 525│     MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
 526│     assert((it->it_flags & ITEM_SLABBED) == 0);
 527│     assert(it->refcount > 0);
 528│
 529├───> if (refcount_decr(it) == 0) {
 530│         item_free(it);
 531│     }
 532│ }

2

drive_machineに戻ってくる。c->stateはconn_write

5810│         case conn_write:
5811│             /*
5812│              * We want to write out a simple response. If we haven't already,
5813│              * assemble it into a msgbuf list (this will be a single-entry
5814│              * list for TCP or a two-entry list for UDP).
5815│              */
5816├───────────> if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
5817│                 if (add_iov(c, c->wcurr, c->wbytes) != 0) {
5818│                     if (settings.verbose > 0)
5819│                         fprintf(stderr, "Couldn't build response\n");
5820│                     conn_set_state(c, conn_closing);
5821│                     break;
5822│                 }
5823│             }
5824│
5825│      	  /* fall through... */
5826│
:
:
5845├─────────> if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5846│             if (settings.verbose > 0)
5847│               fprintf(stderr, "Failed to build UDP headers\n");
5848│             conn_set_state(c, conn_closing);
5849│             break;
5850│           }

3 add_iov (c=0x7ffff00109a0, buf=0x7ffff00113e0, len=8) at memcached.c:1048

1035│  *
1036│  * Returns 0 on success, -1 on out-of-memory.
1037│  * Note: This is a hot path for at least ASCII protocol. While there is
1038│  * redundant code in splitting TCP/UDP handling, any reduction in steps has a
1039│  * large impact for TCP connections.
1040│  */
1041│
1042│ static int add_iov(conn *c, const void *buf, int len) {
1043│     struct msghdr *m;
1044│     int leftover;
1045│
1046│     assert(c != NULL);
1047│
1048├───> if (IS_UDP(c->transport)) {
1049│         do {
1050│             m = &c->msglist[c->msgused - 1];
1051│
1052│             /*
1053│              * Limit UDP packets to UDP_MAX_PAYLOAD_SIZE bytes.
1054│              */
1055│
1056│      	  /* We may need to start a new msghdr if this one is full. */
1057│             if (m->msg_iovlen == IOV_MAX ||
1058│                 (c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
1059│                 add_msghdr(c);
1060│                 m = &c->msglist[c->msgused - 1];

その後、ensure_iov_space関数が呼ばれる。ここでは特に何もしない。

1085│     } else {
1086│         /* Optimized path for TCP connections */
1087│         m = &c->msglist[c->msgused - 1];
1088│         if (m->msg_iovlen == IOV_MAX) {
1089│             add_msghdr(c);
1090│             m = &c->msglist[c->msgused - 1];
1091│         }
1092│
1093│         if (ensure_iov_space(c) != 0)
1094│             return -1;
1095│
1096├───────> m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1097│         m->msg_iov[m->msg_iovlen].iov_len = len;
1098│         c->msgbytes += len;
1099│         c->iovused++;
1100│         m->msg_iovlen++;
1101│     }
1102│
1103│     return 0;
1104│ }

4 ensure_iov_space (c=0x7ffff00109a0) at memcached.c:1008

 999│ /*
1000│  * Ensures that there is room for another struct iovec in a connection's
1001│  * iov list.
1002│  *
1003│  * Returns 0 on success, -1 on out-of-memory.
1004│  */
1005│ static int ensure_iov_space(conn *c) {
1006│     assert(c != NULL);
1007│
1008├───> if (c->iovused >= c->iovsize) {
1009│         int i, iovnum;
1010│         struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1011│                                 (c->iovsize * 2) * sizeof(struct iovec));
1012│         if (! new_iov) {
1013│             STATS_LOCK();
1014│             stats.malloc_fails++;
1015│             STATS_UNLOCK();
1016│      	  return -1;
1017│         }
1018│         c->iov = new_iov;
1019│         c->iovsize *= 2;
1020│

3 transmit (c=0x7ffff00109a0) at memcached.c:5361

TRANSMIT_INCOMPLETEをreturnする。その後、抜けて、drive_machineからc->stateはconn_writeで再度この関数が実行される。その時は、TRANSMIT_COMPLETEをreturnする。

5349│ /*
5350│  * Transmit the next chunk of data from our list of msgbuf structures.
5351│  *
5352│  * Returns:
5353│  *   TRANSMIT_COMPLETE   All done writing.
5354│  *   TRANSMIT_INCOMPLETE More data remaining to write.
5355│  *   TRANSMIT_SOFT_ERROR Can't write any more right now.
5356│  *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
5357│  */
5358│ static enum transmit_result transmit(conn *c) {
5359│     assert(c != NULL);
5360│
5361├───> if (c->msgcurr < c->msgused &&
5362│             c->msglist[c->msgcurr].msg_iovlen == 0) {
5363│         /* Finished writing the current msg; advance to the next. */
5364│         c->msgcurr++;
5365│     }
5366│     if (c->msgcurr < c->msgused) {
5367│         ssize_t res;
5368│         struct msghdr *m = &c->msglist[c->msgcurr];
5369│
5370│         res = c->sendmsg(c, m, 0);
5371│         if (res > 0) {
5372│             pthread_mutex_lock(&c->thread->stats.mutex);
5373│             c->thread->stats.bytes_written += res;
5374│             pthread_mutex_unlock(&c->thread->stats.mutex);
5375│
5376│             /* We've written some of the data. Remove the completed
5377│                iovec entries from the list of pending writes. */
5378│             while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
5379│                 res -= m->msg_iov->iov_len;
5380│                 m->msg_iovlen--;
5381│                 m->msg_iov++;
5382│             }
5383│
5384│             /* Might have written just part of the last iovec entry;
5385│                adjust it so the next write will do the rest. */
5386│             if (res > 0) {
5387│                 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
5388│                 m->msg_iov->iov_len -= res;
5389│             }
5390├───────────> return TRANSMIT_INCOMPLETE;
5391│         }
5392│         if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5393│             if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5394│                 if (settings.verbose > 0)
5395│                     fprintf(stderr, "Couldn't update event\n");
5396│                 conn_set_state(c, conn_closing);
5397│                 return TRANSMIT_HARD_ERROR;
5398│             }
5399│             return TRANSMIT_SOFT_ERROR;
5400│         }
5401│         /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
5402│            we have a real error, on which we close the connection */

4 tcp_sendmsg (c=0x7ffff00109a0, msg=0x7ffff0013e80, flags=0) at memcached.c:167

 165│ ssize_t tcp_sendmsg(conn *c, struct msghdr *msg, int flags) {
 166│     assert (c != NULL);
 167├───> return sendmsg(c->sfd, msg, flags);
 168│ }

2

c->write_and_goにはconn_new_cmdが設定されており、c->stateはこの状態に設定される。

5845│           if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5846│             if (settings.verbose > 0)
5847│               fprintf(stderr, "Failed to build UDP headers\n");
5848│             conn_set_state(c, conn_closing);
5849│             break;
5850│           }
5851│             switch (transmit(c)) {
5852│      	  case TRANSMIT_COMPLETE:
5853├───────────────> if (c->state == conn_mwrite) {
5854│                     conn_release_items(c);
5855│                     /* XXX:  I don't know why this wasn't the general case */
5856│                     if(c->protocol == binary_prot) {
5857│                         conn_set_state(c, c->write_and_go);
5858│                     } else {
5859│                         conn_set_state(c, conn_new_cmd);
5860│                     }
5861│                 } else if (c->state == conn_write) {
5862│                     if (c->write_and_free) {
5863│                         free(c->write_and_free);
5864│                         c->write_and_free = 0;
5865│                     }
5866├───────────────────> conn_set_state(c, c->write_and_go);
5867│                 } else {
5868│                     if (settings.verbose > 0)
5869│                         fprintf(stderr, "Unexpected state %d\n", c->state);
5870│                     conn_set_state(c, conn_closing);
5871│                 }
5872│                 break;
5873│

再びループでc->stateの状態に基づいた処理が実行され、ここでは、conn_new_cmd

5651│         case conn_new_cmd:
5652│             /* Only process nreqs at a time to avoid starving other
5653│                connections */
5654│
5655│             --nreqs;
5656├───────────> if (nreqs >= 0) {
5657│                 reset_cmd_handler(c);
5658│             } else {
5659│                 pthread_mutex_lock(&c->thread->stats.mutex);
5660│                 c->thread->stats.conn_yields++;
5661│                 pthread_mutex_unlock(&c->thread->stats.mutex);
5662│                 if (c->rbytes > 0) {
5663│              	  /* We have already read in data into the input buffer,
5664│                        so libevent will most likely not signal read events
5665│                        on the socket (unless more data is available. As a
5666│                        hack we should just put in a request to write data,
5667│                        because that should be possible ;-)
5668│                     */

3 reset_cmd_handler (c=0x7ffff00109a0) at memcached.c:2766

conn_waitingの状態に設定。

2765│ static void reset_cmd_handler(conn *c) {
2766├───> c->cmd = -1;
2767│     c->substate = bin_no_state;
2768│     if(c->item != NULL) {
2769│         item_remove(c->item);
2770│         c->item = NULL;
2771│     }
2772│     conn_shrink(c);
2773│     if (c->rbytes > 0) {
2774│         conn_set_state(c, conn_parse_cmd);
2775│     } else {
2776│         conn_set_state(c, conn_waiting);
2777│     }
2778│ }

2

update_event関数ではtrueが返され、conn_readが実行される。stop変数のフラグにtrueが設定されたことによりwhile文のループは終了する。

5612│         case conn_waiting:
5613├───────────> if (!update_event(c, EV_READ | EV_PERSIST)) {
5614│                 if (settings.verbose > 0)
5615│                     fprintf(stderr, "Couldn't update event\n");
5616│                 conn_set_state(c, conn_closing);
5617│                 break;
5618│             }
5619│
5620│             conn_set_state(c, conn_read);
5621│             stop = true;
5622│      	  break;
5623│

事前準備

Memcached

デバッグ用にビルドしておく。また、-O2で最適化しているとgdbの際に順にコードを追っていくと困難になるので-O0に変更しておく。そうでないと、デバッグの際に、順番に実行してもソースコード中の動きがばらばらになる。

$ git clone https://github.com/memcached/memcached.git
$ cd memcached
$ ./autogen.sh && ./configure && make CFLAGS="-g -O0" && sudo make install

cgdb

$ git clone https://github.com/cgdb/cgdb.git
$ cd cgdb
$ ./autogen.sh
$ CXXFLAGS='-std=c++11' ./configure --prefix=/usr/local
$ make
$ sudo make install

-std=c++11をつけないと、make時に"kui.cpp:310:15: error: ‘it’ does not name a type"のエラー https://github.com/cgdb/cgdb/issues/184

デバッグ

MemcachedはLibeventを利用しており、コマンド実行時に呼ばれる関数はevent_handlerを起点とする。そのため、ここにブレークポイントを打っておくと良い。

$ cgdb memcached

(gdb) b main
(gdb) b event_handler
(gdb) run -u ec2-user -vvv

event_base_loop の前まで実行

SETコマンド実行時、ここで別ターミナルから $ telnet localhost 11211 を実行。以下のコマンドを実行。

set test1 0 0 7
sample1

スレッド3に切り替えて、nextを続けていると、先程ブレークポイントを設定しておいたevent_handler関数で止まる。

(gdb) info threads
(gdb) thread 3
(gdb) next
:
:

繰り返し処理が多く、ループを飛ばすには以下を実行

(gdb) b linenum
(gdb) c
(gdb) del breaknum

定数確認にはシェルでgrep

(gdb) shell grep -r ITEMS_PER_ALLOC /tmp/memcached

My Twitter & RSS

Leave a Reply

Your email address will not be published. Required fields are marked *