Dive Deep Memcached ~ SETコマンド実行時の動作 ~
Redis本を執筆しました。Memcachedとの詳細な比較も付録で取り上げていますので、こちらも是非合わせてご覧ください。
———————————————————————————————————————————————————
Memcached 1.5.14をソースからビルドしてデフォルト設定時、SETコマンド実行時の動作をソースコードで確認します。 https://github.com/memcached/memcached
概要
最初の初期化
- 1 main at memcached.c
- 2 sanitycheck at memcached.c
- 2 settings_init at memcached.c
- 2 init_lru_maintainer at items.c
- 2 hash_init at hash.c
- 2 logger_init at logger.c
- 3 start_logger_thread at logger.c
- 3 STATS_LOCK, STATS_UNLOCK at thread.c
- 2 stats_init at memcached.c
- 3 stats_prefix_init at stats.c
- 3 assoc_init at assoc.c
- 2 conn_init at memcached.c
- 2 slabs_init at slabs.c
- 2 memcached_thread_init at thread.c
- 3 setup_thread at thread.c
- 4 cq_init at thread.c
- 4 cache_create at cache.c
- 3 create_worker at thread.c
- 3 setup_thread at thread.c
- 2 init_lru_crawler at crawler.c
- 2 start_assoc_maintenance_thread at assoc.c
- 2 start_item_crawler_thread at crawler.c
- 2 start_lru_maintainer_thread at items.c
- 2 start_slab_maintenance_thread at slabs.c
- 2 clock_handler at memcached.c
- 3 assoc_start_expand at assoc.c
- 2 server_sockets at memcached.c
- 3 server_socket at memcached.c
- 4 new_socket at memcached.c
- 4 conn_new at memcached.c
- 3 server_socket at memcached.c
- 2 uriencode_init at util.c
- 2 event_base_loop at memcached.c : main_baseに対するイベント処理のループ
SETコマンドの挙動
- 1 event_handler at memcached.c
- 2 drive_machine at memcached.c
- 3 dispatch_conn_new at thread.c
- 4 cqi_new at thread.c
- 4 cq_push at thread.c
- 3 reset_cmd_handler at memcached.c
- 4 conn_shrink at memcached.c
- 4 conn_set_state at memcached.c
- 3 update_event at memcached.c
- 3 try_read_network at memcached.c
- 4 tcp_read at memcached.c
- 3 try_read_command at memcached.c
- 4 process_command at memcached.c
- 5 add_msghdr at memcached.c
- 5 tokenize_command at memcached.c
- 5 process_update_command at memcached.c
- 6 set_noreply_maybe at memcached.c
- 6 realtime at memcached.c
- 6 item_alloc at thread.c
- 7 do_item_alloc at items.c
- 8 item_make_header at items.c
- 8 slabs_clsid at slabs.c
- 8 slabs_alloc at slabs.c
- 9 do_slabs_alloc at slabs.c
- 10 do_slabs_newslab at slabs.c
- 11 grow_slab_list at slabs.c
- 11 memory_allocate at slabs.c
- 11 split_slab_page_into_freelist at slabs.c
- 12 do_slabs_free at slabs.c
- 10 do_slabs_newslab at slabs.c
- 9 do_slabs_alloc at slabs.c
- 7 do_item_alloc at items.c
- 4 process_command at memcached.c
- 3 complete_nread at memcached.c
- 4 complete_nread_ascii at memcached.c
- 5 store_item at thread.c
- 6 hash(MurmurHash3_x86_32)
- 7 getblock32 at murmur3_hash.c
- 7 rotl32 at murmur3_hash.c
- 7 fmix32 at murmur3_hash.c
- 6 item_lock, item_unlock at thread.c
- 6 do_store_item at memcached.c
- 7 do_item_get at items.c
- 7 do_item_link at items.c
- 8 get_cas_id at items.c
- 8 assoc_insert at assoc.c
- 8 item_link_q at items.c
- 9 do_item_link_q at items.c
- 8 item_stats_sizes_add at items.c
- 6 hash(MurmurHash3_x86_32)
- 5 out_string at memcached.c
- 5 item_remove at thread.c
- 6 do_item_remove at items.c
- 5 store_item at thread.c
- 4 complete_nread_ascii at memcached.c
- 3 add_iov at memcached.c
- 4 ensure_iov_space at memcached.c
- 3 transmit at memcached.c
- 4 tcp_sendmsg at memcached.c
- 3 reset_cmd_handler at memcached.c
- 3 dispatch_conn_new at thread.c
- 2 drive_machine at memcached.c
詳細
最初の初期化
memcached -u ec2-user -vvv コマンドを実行した際の挙動
1 main at memcached.c
6762│ int main (int argc, char **argv) {
6763│ int c;
6764│ bool lock_memory = false;
6765│ bool do_daemonize = false;
6766│ bool preallocate = false;
6767│ int maxcore = 0;
6768│ char *username = NULL;
6769│ char *pid_file = NULL;
6770│ struct passwd *pw;
6771│ struct rlimit rlim;
6772│ char *buf;
6773│ char unit = '\0';
6774│ int size_max = 0;
6775├───> int retval = EXIT_SUCCESS;
6776│ bool protocol_specified = false;
6777│ bool tcp_specified = false;
6778│ bool udp_specified = false;
6779│ bool start_lru_maintainer = true;
6780│ bool start_lru_crawler = true;
6781│ bool start_assoc_maint = true;
6782│ enum hashfunc_type hash_type = MURMUR3_HASH;
6783│ uint32_t tocrawl;
6784│ uint32_t slab_sizes[MAX_NUMBER_OF_SLAB_CLASSES];
6785│ bool use_slab_sizes = false;
6786│ char *slab_sizes_unparsed = NULL;
6787│ bool slab_chunk_size_changed = false;
sanitycheck関数後に、signal関数。その後、settings_init
6929├───> if (!sanitycheck()) {
6930│ return EX_OSERR;
6931│ }
6932│
6933│ /* handle SIGINT, SIGTERM */
6934│ signal(SIGINT, sig_handler);
6935│ signal(SIGTERM, sig_handler);
6936│
6937│ /* init settings */
6938│ settings_init();
6939│ #ifdef EXTSTORE
6940│ settings.ext_item_size = 512;
6941│ settings.ext_item_age = UINT_MAX;
2 sanitycheck at memcached.c
event_get_versionはlibeventの関数。ここでは、"2.0.21-stable"という文字列を取得しeverに格納されている。
6702│ /**
6703│ * Do basic sanity check of the runtime environment
6704│ * @return true if no errors found, false if we can't use this env
6705│ */
6706│ static bool sanitycheck(void) {
6707│ /* One of our biggest problems is old and bogus libevents */
6708├───> const char *ever = event_get_version();
6709│ if (ever != NULL) {
6710│ if (strncmp(ever, "1.", 2) == 0) {
6711│ /* Require at least 1.3 (that's still a couple of years old) */
6712│ if (('0' <= ever[2] && ever[2] < '3') && !isdigit(ever[3])) {
6713│ fprintf(stderr, "You are using libevent %s.\nPlease upgrade to"
6714│ " a more recent version (1.3 or newer)\n",
6715│ event_get_version());
6716│ return false;
6717│ }
6718│ }
6719│ }
6720│
2 settings_init at memcached.c
パラメータのデフォルト値が設定されている
247│ static void settings_init(void) {
248│ settings.use_cas = true;
249│ settings.access = 0700;
250│ settings.port = 11211;
251├───> settings.udpport = 0;
252│ #ifdef TLS
253│ settings.ssl_enabled = false;
254│ settings.ssl_ctx = NULL;
255│ settings.ssl_chain_cert = NULL;
256│ settings.ssl_key = NULL;
257│ settings.ssl_verify_mode = SSL_VERIFY_NONE;
258│ settings.ssl_keyformat = SSL_FILETYPE_PEM;
259│ settings.ssl_ciphers = NULL;
260│ settings.ssl_ca_cert = NULL;
261│ settings.ssl_last_cert_refresh_time = current_time;
262│ settings.ssl_wbuf_size = 16 * 1024; // default is 16KB (SSL max frame size is 17KB)
263│ #endif
264│ /* By default this string should be NULL for getaddrinfo() */
265│ settings.inter = NULL;
266│ settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
267│ settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
268│ settings.verbose = 0;
269│ settings.oldest_live = 0;
270│ settings.oldest_cas = 0; /* supplements accuracy of oldest_live */
271│ settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
272│ settings.socketpath = NULL; /* by default, not using a unix socket */
273│ settings.factor = 1.25;
274│ settings.chunk_size = 48; /* space for a modest key and value */
275│ settings.num_threads = 4; /* N workers */
276│ settings.num_threads_per_udp = 0;
277├───> settings.prefix_delimiter = ':';
278│ settings.detail_enabled = 0;
279│ settings.reqs_per_event = 20;
280│ settings.backlog = 1024;
281│ settings.binding_protocol = negotiating_prot;
282│ settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
283│ settings.slab_page_size = 1024 * 1024; /* chunks are split from 1MB pages. */
284│ settings.slab_chunk_size_max = settings.slab_page_size / 2;
285│ settings.sasl = false;
286│ settings.maxconns_fast = true;
287│ settings.lru_crawler = false;
288│ settings.lru_crawler_sleep = 100;
289│ settings.lru_crawler_tocrawl = 0;
290│ settings.lru_maintainer_thread = false;
291│ settings.lru_segmented = true;
292│ settings.hot_lru_pct = 20;
293│ settings.warm_lru_pct = 40;
294│ settings.hot_max_factor = 0.2;
295│ settings.warm_max_factor = 2.0;
296│ settings.inline_ascii_response = false;
297│ settings.temp_lru = false;
298│ settings.temporary_ttl = 61;
299│ settings.idle_timeout = 0; /* disabled */
300│ settings.hashpower_init = 0;
301│ settings.slab_reassign = true;
302│ settings.slab_automove = 1;
303├───> settings.slab_automove_ratio = 0.8;
304│ settings.slab_automove_window = 30;
305│ settings.shutdown_command = false;
306│ settings.tail_repair_time = TAIL_REPAIR_TIME_DEFAULT;
307│ settings.flush_enabled = true;
308│ settings.dump_enabled = true;
309│ settings.crawls_persleep = 1000;
310│ settings.logger_watcher_buf_size = LOGGER_WATCHER_BUF_SIZE;
311│ settings.logger_buf_size = LOGGER_BUF_SIZE;
312│ settings.drop_privileges = false;
313│ #ifdef MEMCACHED_DEBUG
314│ settings.relaxed_privileges = false;
315│ #endif
1 main at memcached.c
6957│
6958│ /* Run regardless of initializing it later */
6959├───> init_lru_maintainer();
6960│
6961│ /* set stderr non-buffering (for running under, say, daemontools) */
6962│ setbuf(stderr, NULL);
6963│
6964│ char *shortopts =
6965│ "a:" /* access mask for unix socket */
6966│ "A" /* enable admin shutdown command */
6967│ "Z" /* enable SSL */
6968│ "p:" /* TCP port number to listen on */
6969│ "s:" /* unix socket path to listen on */
6970│ "U:" /* UDP port number to listen on */
6971│ "m:" /* max memory to use for items in megabytes */
2 init_lru_maintainer at items.c
1704│ int init_lru_maintainer(void) {
1705├───> if (lru_maintainer_initialized == 0) {
1706│ pthread_mutex_init(&lru_maintainer_lock, NULL);
1707│ lru_maintainer_initialized = 1;
1708│ }
1709│ return 0;
1710│ }
1 main at memcached.c
6998│ /* process arguments */
6999│ #ifdef HAVE_GETOPT_LONG
7000├───> const struct option longopts[] = {
7001│ {"unix-mask", required_argument, 0, 'a'},
7002│ {"enable-shutdown", no_argument, 0, 'A'},
7003│ {"enable-ssl", no_argument, 0, 'Z'},
7004│ {"port", required_argument, 0, 'p'},
7005│ {"unix-socket", required_argument, 0, 's'},
7006│ {"udp-port", required_argument, 0, 'U'},
7007│ {"memory-limit", required_argument, 0, 'm'},
7008│ {"disable-evictions", no_argument, 0, 'M'},
7009│ {"conn-limit", required_argument, 0, 'c'},
7010│ {"lock-memory", no_argument, 0, 'k'},
7011│ {"help", no_argument, 0, 'h'},
7012│ {"license", no_argument, 0, 'i'},
:
:
7028│ {"max-item-size", required_argument, 0, 'I'},
7029│ {"enable-sasl", no_argument, 0, 'S'},
7030│ {"disable-flush-all", no_argument, 0, 'F'},
7031│ {"disable-dumping", no_argument, 0, 'X'},
7032│ {"extended", required_argument, 0, 'o'},
7033│ {0, 0, 0, 0}
7034│ };
7035│ int optindex;
7036│ while (-1 != (c = getopt_long(argc, argv, shortopts,
7037│ longopts, &optindex))) {
7038│ #else
7039│ while (-1 != (c = getopt(argc, argv, shortopts))) {
7040│ #endif
7041├───────> switch (c) {
7042│ case 'A':
7043│ /* enables "shutdown" command */
7044│ settings.shutdown_command = true;
7045│ break;
7046│ case 'Z':
7047│ /* enable secure communication*/
7048│ #ifdef TLS
7049│ settings.ssl_enabled = true;
7050│ #else
7051│ fprintf(stderr, "This server is not built with TLS support.\n");
7052│ exit(EX_USAGE);
7053│ #endif
-u ec2-user のようにユーザー名を与えているので、username 変数に"ec2-user"が設定される。
7119│ case 'r':
7120│ maxcore = 1;
7121│ break;
7122│ case 'R':
7123│ settings.reqs_per_event = atoi(optarg);
7124│ if (settings.reqs_per_event == 0) {
7125│ fprintf(stderr, "Number of requests per event must be greater than 0\n");
7126│ return 1;
7127│ }
7128│ break;
7129│ case 'u':
7130├───────────> username = optarg;
7131│ break;
7132│ case 'P':
7133│ pid_file = optarg;
7134│ break;
7135│ case 'f':
7136│ settings.factor = atof(optarg);
7137│ if (settings.factor <= 1.0) {
7138│ fprintf(stderr, "Factor must be greater than 1\n");
7139│ return 1;
7140│ }
7141│ break;
-vvv と指定しているので、getopt_longの取得結果のvの処理はwhile文で3回繰り返される。
7083│ case 'h':
7084│ usage();
7085│ exit(EXIT_SUCCESS);
7086│ case 'i':
7087│ usage_license();
7088│ exit(EXIT_SUCCESS);
7089│ case 'V':
7090│ printf(PACKAGE " " VERSION "\n");
7091│ exit(EXIT_SUCCESS);
7092│ case 'k':
7093│ lock_memory = true;
7094│ break;
7095│ case 'v':
7096├───────────> settings.verbose++;
7097│ break;
7098│ case 'l':
7099│ if (settings.inter != NULL) {
7100│ if (strstr(settings.inter, optarg) != NULL) {
7101│ break;
7102│ }
7103│ size_t len = strlen(settings.inter) + strlen(optarg) + 2;
7104│ char *p = malloc(len);
7105│ if (p == NULL) {
7106│ fprintf(stderr, "Failed to allocate memory\n");
7107│ return 1;
7108│ }
その後、パラメータの値が、制限値外のものが指定されていたらエラーで終了する。
7746│ if (settings.item_size_max < 1024) {
7747│ fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n");
7748│ exit(EX_USAGE);
7749│ }
7750│ if (settings.item_size_max > (settings.maxbytes / 2)) {
7751│ fprintf(stderr, "Cannot set item size limit higher than 1/2 of memory max.\n");
7752│ exit(EX_USAGE);
7753│ }
7754│ if (settings.item_size_max > (1024 * 1024 * 1024)) {
7755│ fprintf(stderr, "Cannot set item size limit higher than a gigabyte.\n");
7756│ exit(EX_USAGE);
7757│ }
7758├───> if (settings.item_size_max > 1024 * 1024) {
7759│ if (!slab_chunk_size_changed) {
7760│ // Ideal new default is 16k, but needs stitching.
7761│ settings.slab_chunk_size_max = settings.slab_page_size / 2;
7762│ }
7763│ }
7764│
7765│ if (settings.slab_chunk_size_max > settings.item_size_max) {
7766│ fprintf(stderr, "slab_chunk_max (bytes: %d) cannot be larger than -I (item_size_max %d)\n",
7767│ settings.slab_chunk_size_max, settings.item_size_max);
7768│ exit(EX_USAGE);
7769│ }
7770│
7771├───> if (settings.item_size_max % settings.slab_chunk_size_max != 0) {
7772│ fprintf(stderr, "-I (item_size_max: %d) must be evenly divisible by slab_chunk_max (bytes: %d)\n",
7773│ settings.item_size_max, settings.slab_chunk_size_max);
7774│ exit(EX_USAGE);
7775│ }
7776│
7777│ if (settings.slab_page_size % settings.slab_chunk_size_max != 0) {
7778│ fprintf(stderr, "slab_chunk_max (bytes: %d) must divide evenly into %d (slab_page_size)\n",
7779│ settings.slab_chunk_size_max, settings.slab_page_size);
7780│ exit(EX_USAGE);
7781│ }
:
:
7811│ if (slab_sizes_unparsed != NULL) {
7812│ if (_parse_slab_sizes(slab_sizes_unparsed, slab_sizes)) {
7813│ use_slab_sizes = true;
7814│ } else {
7815│ exit(EX_USAGE);
7816│ }
7817│ }
7818│
7819│ if (settings.hot_lru_pct + settings.warm_lru_pct > 80) {
7820│ fprintf(stderr, "hot_lru_pct + warm_lru_pct cannot be more than 80%% combined\n");
7821│ exit(EX_USAGE);
7822│ }
7823│
7824├───> if (settings.temp_lru && !start_lru_maintainer) {
7825│ fprintf(stderr, "temporary_ttl requires lru_maintainer to be enabled\n");
7826│ exit(EX_USAGE);
7827│ }
7828│
7829│ if (hash_init(hash_type) != 0) {
7830│ fprintf(stderr, "Failed to initialize hash_algorithm!\n");
7831│ exit(EX_USAGE);
7832│ }
hash_init関数が実行されている
2 hash_init at hash.c
typeでMURMUR3_HASHが指定されている。
7│ int hash_init(enum hashfunc_type type) {
8├───> switch(type) {
9│ case JENKINS_HASH:
10│ hash = jenkins_hash;
11│ settings.hash_algorithm = "jenkins";
12│ break;
13│ case MURMUR3_HASH:
14│ hash = MurmurHash3_x86_32;
15│ settings.hash_algorithm = "murmur3";
16│ break;
17│ default:
18│ return -1;
19│ }
20│ return 0;
21│ }
1 main at memcacached.c
必要に応じて多くのコネクション数を許可するためにgetrlimit
7903│ /*
7904│ * If needed, increase rlimits to allow as many connections
7905│ * as needed.
7906│ */
7907│
7908├───> if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7909│ fprintf(stderr, "failed to getrlimit number of files\n");
7910│ exit(EX_OSERR);
7911│ } else {
7912│ rlim.rlim_cur = settings.maxconns;
7913│ rlim.rlim_max = settings.maxconns;
7914│ if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7915│ fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller
7916│ exit(EX_OSERR);
7917│ }
7918│ }
mainスレッドのlibeventインスタンスの初期化
7970│ /* initialize main thread libevent instance */
7971│ #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
7972│ /* If libevent version is larger/equal to 2.0.2-alpha, use newer version */
7973│ struct event_config *ev_config;
7974├───> ev_config = event_config_new();
7975│ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
7976│ main_base = event_base_new_with_config(ev_config);
7977│ event_config_free(ev_config);
7978│ #else
7979│ /* Otherwise, use older API */
7980│ main_base = event_init();
7981│ #endif
各種スレッド等の初期化
7983│ /* initialize other stuff */
7984├───> logger_init();
7985│ stats_init();
7986│ assoc_init(settings.hashpower_init);
7987│ conn_init();
7988│ slabs_init(settings.maxbytes, settings.factor, preallocate,
7989│ use_slab_sizes ? slab_sizes : NULL);
2 logger_init at logger.c
563│ /*************************
564│ * Public functions for submitting logs and starting loggers from workers.
565│ *************************/
566│
567│ /* Global logger thread start/init */
568│ void logger_init(void) {
569│ /* TODO: auto destructor when threads exit */
570│ /* TODO: error handling */
571│
572│ /* init stack for iterating loggers */
573│ logger_stack_head = 0;
574│ logger_stack_tail = 0;
575├───> pthread_key_create(&logger_key, NULL);
576│
577│ if (start_logger_thread() != 0) {
578│ abort();
579│ }
580│
581│ /* This can be removed once the global stats initializer is improved */
582│ STATS_LOCK();
583│ stats.log_worker_dropped = 0;
584│ stats.log_worker_written = 0;
585│ stats.log_watcher_skipped = 0;
586│ stats.log_watcher_sent = 0;
587│ STATS_UNLOCK();
588│ /* This is what adding a STDERR watcher looks like. should replace old
589│ * "verbose" settings. */
590│ //logger_add_watcher(NULL, 0);
591│ return;
592│ }
3 start_logger_thread at logger.c
545│ static int start_logger_thread(void) {
546│ int ret;
547├───> do_run_logger_thread = 1;
548│ if ((ret = pthread_create(&logger_tid, NULL,
549│ logger_thread, NULL)) != 0) {
550│ fprintf(stderr, "Can't start logger thread: %s\n", strerror(ret));
551│ return -1;
552│ }
553│ return 0;
554│ }
3 STATS_LOCK, STATS_UNLOCK at thread.c
696│ /******************************* GLOBAL STATS ******************************/
697│
698│ void STATS_LOCK() {
699│ pthread_mutex_lock(&stats_lock);
700│ }
701│
702│ void STATS_UNLOCK() {
703├───> pthread_mutex_unlock(&stats_lock);
704│ }
2 stats_init at memcached.c
225│ static void stats_init(void) {
226├───> memset(&stats, 0, sizeof(struct stats));
227│ memset(&stats_state, 0, sizeof(struct stats_state));
228│ stats_state.accepting_conns = true; /* assuming we start in this state. */
229│
230│ /* make the time we started always be 2 seconds before we really
231│ did, so time(0) - time.started is never zero. if so, things
232│ like 'settings.oldest_live' which act as booleans as well as
233│ values are now false in boolean context... */
234│ process_started = time(0) - ITEM_UPDATE_INTERVAL - 2;
235│ stats_prefix_init();
236│ }
3 stats_prefix_init at stats.c
38│ void stats_prefix_init() {
39├───> memset(prefix_stats, 0, sizeof(prefix_stats));
40│ }
3 assoc_init at assoc.c
59│ void assoc_init(const int hashtable_init) {
60├───> if (hashtable_init) {
61│ hashpower = hashtable_init;
62│ }
63│ primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
64│ if (! primary_hashtable) {
65│ fprintf(stderr, "Failed to init hashtable.\n");
66│ exit(EXIT_FAILURE);
67│ }
68│ STATS_LOCK();
69│ stats_state.hash_power_level = hashpower;
70│ stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);
71│ STATS_UNLOCK();
72│ }
2 conn_init at memcached.c
448│ /*
449│ * Initializes the connections array. We don't actually allocate connection
450│ * structures until they're needed, so as to avoid wasting memory when the
451│ * maximum connection count is much higher than the actual number of
452│ * connections.
453│ *
454│ * This does end up wasting a few pointers' worth of memory for FDs that are
455│ * used for things other than connections, but that's worth it in exchange for
456│ * being able to directly index the conns array by FD.
457│ */
458│ static void conn_init(void) {
459│ /* We're unlikely to see an FD much higher than maxconns. */
460│ int next_fd = dup(1);
461├───> int headroom = 10; /* account for extra unexpected open FDs */
462│ struct rlimit rl;
463│
464│ max_fds = settings.maxconns + headroom + next_fd;
465│
466│ /* But if possible, get the actual highest FD we can possibly ever see. */
467│ if (getrlimit(RLIMIT_NOFILE, &rl) == 0) {
468│ max_fds = rl.rlim_max;
469│ } else {
470│ fprintf(stderr, "Failed to query maximum file descriptor; "
471│ "falling back to maxconns\n");
472│ }
473│
2 slabs_init (limit=67108864, factor=1.25, prealloc=false, slab_sizes=0x0) at slabs.c:158
Slabの初期化 ★5
153│ /**
154│ * Determines the chunk sizes and initializes the slab class descriptors
155│ * accordingly.
156│ */
157│ void slabs_init(const size_t limit, const double factor, const bool prealloc, const uint32_t *slab_sizes) {
158│ int i = POWER_SMALLEST - 1;
159│ unsigned int size = sizeof(item) + settings.chunk_size;
160│
161│ /* Some platforms use runtime transparent hugepages. If for any reason
162│ * the initial allocation fails, the required settings do not persist
163│ * for remaining allocations. As such it makes little sense to do slab
164│ * preallocation. */
165├───> bool __attribute__ ((unused)) do_slab_prealloc = false;
166│
167│ mem_limit = limit;
168│
169│ if (prealloc) {
170│ #if defined(__linux__) && defined(MADV_HUGEPAGE)
171│ mem_base = alloc_large_chunk_linux(mem_limit);
172│ if (mem_base)
173│ do_slab_prealloc = true;
174│ #else
175│ /* Allocate everything in a big chunk with malloc */
176│ mem_base = malloc(mem_limit);
177│ do_slab_prealloc = true;
slabclass配列の初期化
190│ while (++i < MAX_NUMBER_OF_SLAB_CLASSES-1) {
191│ if (slab_sizes != NULL) {
192│ if (slab_sizes[i-1] == 0)
193│ break;
194│ size = slab_sizes[i-1];
195│ } else if (size >= settings.slab_chunk_size_max / factor) {
196│ break;
197│ }
198│ /* Make sure items are always n-byte aligned */
199│ if (size % CHUNK_ALIGN_BYTES)
200│ size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
201│
202├───────> slabclass[i].size = size;
203│ slabclass[i].perslab = settings.slab_page_size / slabclass[i].size;
204│ if (slab_sizes == NULL)
205│ size *= factor;
206│ if (settings.verbose > 1) {
207│ fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
208│ i, slabclass[i].size, slabclass[i].perslab);
209│ }
210│ }
220│ /* for the test suite: faking of how much we've already malloc'd */
221│ {
222│ char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
223│ if (t_initial_malloc) {
224│ mem_malloced = (size_t)atol(t_initial_malloc);
225│ }
226│
227│ }
228│
229│ if (prealloc && do_slab_prealloc) {
230│ slabs_preallocate(power_largest);
231│ }
232├>}
1 main at memcacached.c
workerスレッドの設定
8025│ /* start up worker threads if MT mode */
8026│ #ifdef EXTSTORE
8027│ slabs_set_storage(storage);
8028│ memcached_thread_init(settings.num_threads, storage);
8029│ init_lru_crawler(storage);
8030│ #else
8031├───> memcached_thread_init(settings.num_threads, NULL);
8032│ init_lru_crawler(NULL);
8033│ #endif
8034│
8035│ if (start_assoc_maint && start_assoc_maintenance_thread() == -1) {
8036│ exit(EXIT_FAILURE);
8037│ }
8038│ if (start_lru_crawler && start_item_crawler_thread() != 0) {
8039│ fprintf(stderr, "Failed to enable LRU crawler thread\n");
8040│ exit(EXIT_FAILURE);
8041│ }
8042│ #ifdef EXTSTORE
8043│ if (storage && start_storage_compact_thread(storage) != 0) {
8044│ fprintf(stderr, "Failed to start storage compaction thread\n");
8045│ exit(EXIT_FAILURE);
8046│ }
8047│ if (storage && start_storage_write_thread(storage) != 0) {
8048│ fprintf(stderr, "Failed to start storage writer thread\n");
8049│ exit(EXIT_FAILURE);
8050│ }
8051│
8052│ if (start_lru_maintainer && start_lru_maintainer_thread(storage) != 0) {
8053│ #else
8054├───> if (start_lru_maintainer && start_lru_maintainer_thread(NULL) != 0) {
8055│ #endif
8056│ fprintf(stderr, "Failed to enable LRU maintainer thread\n");
8057│ return 1;
8058│ }
8059│
8060│ if (settings.slab_reassign &&
8061├───────> start_slab_maintenance_thread() == -1) {
8062│ exit(EXIT_FAILURE);
8063│ }
8064│
8065│ if (settings.idle_timeout && start_conn_timeout_thread() == -1) {
8066│ exit(EXIT_FAILURE);
8067│ }
8068│
2 memcached_thread_init at thread.c
772│ /*
773│ * Initializes the thread subsystem, creating various worker threads.
774│ *
775│ * nthreads Number of worker event handler threads to spawn
776│ */
777│ void memcached_thread_init(int nthreads, void *arg) {
778│ int i;
779│ int power;
780│
781│ for (i = 0; i < POWER_LARGEST; i++) {
782├───────> pthread_mutex_init(&lru_locks[i], NULL);
783│ }
784│ pthread_mutex_init(&worker_hang_lock, NULL);
785│
786│ pthread_mutex_init(&init_lock, NULL);
787│ pthread_cond_init(&init_cond, NULL);
788│
789│ pthread_mutex_init(&cqi_freelist_lock, NULL);
790│ cqi_freelist = NULL;
791│
792│ /* Want a wide lock table, but don't waste memory */
793│ if (nthreads < 3) {
794│ power = 10;
815│ item_lock_count = hashsize(power);
816│ item_lock_hashpower = power;
817│
818│ item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
819│ if (! item_locks) {
820│ perror("Can't allocate item locks");
821│ exit(1);
822│ }
823│ for (i = 0; i < item_lock_count; i++) {
824│ pthread_mutex_init(&item_locks[i], NULL);
825│ }
826│
827├───> threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
828│ if (! threads) {
829│ perror("Can't allocate thread descriptors");
830│ exit(1);
831│ }
832│
833│ for (i = 0; i < nthreads; i++) {
834│ int fds[2];
835│ if (pipe(fds)) {
836│ perror("Can't create notify pipe");
837│ exit(1);
838│ }
839│
:
:
833│ for (i = 0; i < nthreads; i++) {
834│ int fds[2];
835│ if (pipe(fds)) {
836│ perror("Can't create notify pipe");
837│ exit(1);
838│ }
839│
840│ threads[i].notify_receive_fd = fds[0];
841│ threads[i].notify_send_fd = fds[1];
842│ #ifdef EXTSTORE
843│ threads[i].storage = arg;
844│ #endif
845├───────> setup_thread(&threads[i]);
846│ /* Reserve three fds for the libevent base, and two for the pipe */
847│ stats_state.reserved_fds += 5;
848│ }
849│
850│ /* Create threads after we've done all the libevent setup. */
851│ for (i = 0; i < nthreads; i++) {
852│ create_worker(worker_libevent, &threads[i]);
853│ }
854│
855│ /* Wait for all the threads to set themselves up before returning. */
856│ pthread_mutex_lock(&init_lock);
857│ wait_for_thread_registration(nthreads);
858│ pthread_mutex_unlock(&init_lock);
859│ }
3 setup_thread (me=0x6486f0) at thread.c:321
313│ /****************************** LIBEVENT THREADS *****************************/
314│
315│ /*
316│ * Set up a thread's information.
317│ */
318│ static void setup_thread(LIBEVENT_THREAD *me) {
319│ #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
320│ struct event_config *ev_config;
321│ ev_config = event_config_new();
322│ event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
323│ me->base = event_base_new_with_config(ev_config);
324├───> event_config_free(ev_config);
325│ #else
326│ me->base = event_init();
327│ #endif
328│
329│ if (! me->base) {
330│ fprintf(stderr, "Can't allocate event base\n");
331│ exit(1);
332│ }
333│
334│ /* Listen for notifications from other threads */
335│ event_set(&me->notify_event, me->notify_receive_fd,
336│ EV_READ | EV_PERSIST, thread_libevent_process, me);
337│ event_base_set(me->base, &me->notify_event);
338│
339│ if (event_add(&me->notify_event, 0) == -1) {
340│ fprintf(stderr, "Can't monitor libevent notify pipe\n");
341│ exit(1);
342│ }
343│
344│ me->new_conn_queue = malloc(sizeof(struct conn_queue));
345├───> if (me->new_conn_queue == NULL) {
346│ perror("Failed to allocate memory for connection queue");
347│ exit(EXIT_FAILURE);
348│ }
349│ cq_init(me->new_conn_queue);
350│
351│ if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
352│ perror("Failed to initialize mutex");
353│ exit(EXIT_FAILURE);
354│ }
355│
356│ me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
357│ NULL, NULL);
4 cq_init (cq=0x64fab0) at thread.c:196
192│ /*
193│ * Initializes a connection queue.
194│ */
195│ static void cq_init(CQ *cq) {
196├───> pthread_mutex_init(&cq->lock, NULL);
197│ cq->head = NULL;
198│ cq->tail = NULL;
199│ }
4 cache_create (name=0x42b442 "suffix", bufsize=50, align=8, constructor=0x0, destructor=0x0) at cache.c:22
19│ cache_t* cache_create(const char *name, size_t bufsize, size_t align,
20│ cache_constructor_t* constructor,
21│ cache_destructor_t* destructor) {
22│ cache_t* ret = calloc(1, sizeof(cache_t));
23│ char* nm = strdup(name);
24│ void** ptr = calloc(initial_pool_size, sizeof(void*));
25├───> if (ret == NULL || nm == NULL || ptr == NULL ||
26│ pthread_mutex_init(&ret->mutex, NULL) == -1) {
27│ free(ret);
28│ free(nm);
29│ free(ptr);
30│ return NULL;
31│ }
32│
33│ ret->name = nm;
34│ ret->ptr = ptr;
35│ ret->freetotal = initial_pool_size;
36│ ret->constructor = constructor;
37│ ret->destructor = destructor;
38│
39│ #ifndef NDEBUG
40│ ret->bufsize = bufsize + 2 * sizeof(redzone_pattern);
41│ #else
42│ ret->bufsize = bufsize;
43│ #endif
44│
45├───> return ret;
46│ }
3 create_worker (func=0x41dddf <worker_libevent>, arg=0x6486f0) at thread.c:296
289│ /*
290│ * Creates a worker thread.
291│ */
292│ static void create_worker(void *(*func)(void *), void *arg) {
293│ pthread_attr_t attr;
294│ int ret;
295│
296├───> pthread_attr_init(&attr);
297│
298│ if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
299│ fprintf(stderr, "Can't create thread: %s\n",
300│ strerror(ret));
301│ exit(1);
302│ }
303│ }
2 init_lru_crawler (arg=0x0) at crawler.c:686
685│ int init_lru_crawler(void *arg) {
686│ if (lru_crawler_initialized == 0) {
687│ #ifdef EXTSTORE
688│ storage = arg;
689│ #endif
690├───────> if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
691│ fprintf(stderr, "Can't initialize lru crawler condition\n");
692│ return -1;
693│ }
694│ pthread_mutex_init(&lru_crawler_lock, NULL);
695│ active_crawler_mod.c.c = NULL;
696│ active_crawler_mod.mod = NULL;
697│ active_crawler_mod.data = NULL;
698│ lru_crawler_initialized = 1;
699│ }
700│ return 0;
701│ }
2 start_assoc_maintenance_thread () at assoc.c:270
268│ int start_assoc_maintenance_thread() {
269│ int ret;
270│ char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
271│ if (env != NULL) {
272│ hash_bulk_move = atoi(env);
273│ if (hash_bulk_move == 0) {
274│ hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
275│ }
276│ }
277├───> pthread_mutex_init(&maintenance_lock, NULL);
278│ if ((ret = pthread_create(&maintenance_tid, NULL,
279│ assoc_maintenance_thread, NULL)) != 0) {
280│ fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
281│ return -1;
282│ }
283│ return 0;
284│ }
2 start_item_crawler_thread () at crawler.c:497
484│ * caller locks mtx. caller spawns thread.
485│ * thread blocks on mutex.
486│ * caller waits on condition, releases lock.
487│ * thread gets lock, sends signal.
488│ * caller can't wait, as thread has lock.
489│ * thread waits on condition, releases lock
490│ * caller wakes on condition, gets lock.
491│ * caller immediately releases lock.
492│ * thread is now safely waiting on condition before the caller returns.
493│ */
494│ int start_item_crawler_thread(void) {
495│ int ret;
496│
497├───> if (settings.lru_crawler)
498│ return -1;
499│ pthread_mutex_lock(&lru_crawler_lock);
500│ do_run_lru_crawler_thread = 1;
501│ if ((ret = pthread_create(&item_crawler_tid, NULL,
502│ item_crawler_thread, NULL)) != 0) {
503│ fprintf(stderr, "Can't create LRU crawler thread: %s\n",
504│ strerror(ret));
505│ pthread_mutex_unlock(&lru_crawler_lock);
506│ return -1;
507│ }
508│ /* Avoid returning until the crawler has actually started */
509│ pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
510│ pthread_mutex_unlock(&lru_crawler_lock);
511│
512│ return 0;
513│ }
2 start_lru_maintainer_thread (arg=0x0) at items.c:1680
1677│ int start_lru_maintainer_thread(void *arg) {
1678│ int ret;
1679│
1680│ pthread_mutex_lock(&lru_maintainer_lock);
1681├───> do_run_lru_maintainer_thread = 1;
1682│ settings.lru_maintainer_thread = true;
1683│ if ((ret = pthread_create(&lru_maintainer_tid, NULL,
1684│ lru_maintainer_thread, arg)) != 0) {
1685│ fprintf(stderr, "Can't create LRU maintainer thread: %s\n",
1686│ strerror(ret));
1687│ pthread_mutex_unlock(&lru_maintainer_lock);
1688│ return -1;
1689│ }
1690│ pthread_mutex_unlock(&lru_maintainer_lock);
1691│
1692│ return 0;
1693│ }
2 start_slab_maintenance_thread () at slabs.c:1311
1309│ int start_slab_maintenance_thread(void) {
1310│ int ret;
1311│ slab_rebalance_signal = 0;
1312│ slab_rebal.slab_start = NULL;
1313│ char *env = getenv("MEMCACHED_SLAB_BULK_CHECK");
1314│ if (env != NULL) {
1315│ slab_bulk_check = atoi(env);
1316│ if (slab_bulk_check == 0) {
1317│ slab_bulk_check = DEFAULT_SLAB_BULK_CHECK;
1318│ }
1319│ }
1320│
1321├───> if (pthread_cond_init(&slab_rebalance_cond, NULL) != 0) {
1322│ fprintf(stderr, "Can't initialize rebalance condition\n");
1323│ return -1;
1324│ }
1325│ pthread_mutex_init(&slabs_rebalance_lock, NULL);
1326│
1327│ if ((ret = pthread_create(&rebalance_tid, NULL,
1328│ slab_rebalance_thread, NULL)) != 0) {
1329│ fprintf(stderr, "Can't create rebal thread: %s\n", strerror(ret));
1330│ return -1;
1331│ }
1332│ return 0;
1333│ }
2 clock_handler (fd=0, which=0, arg=0x0) at memcached.c:6321
6316│ /* libevent uses a monotonic clock when available for event scheduling. Aside
6317│ * from jitter, simply ticking our internal timer here is accurate enough.
6318│ * Note that users who are setting explicit dates for expiration times *must*
6319│ * ensure their clocks are correct before starting memcached. */
6320│ static void clock_handler(const int fd, const short which, void *arg) {
6321├───> struct timeval t = {.tv_sec = 1, .tv_usec = 0};
6322│ static bool initialized = false;
6323│ #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
6324│ static bool monotonic = false;
6325│ static time_t monotonic_start;
6326│ #endif
6327│
6328│ if (initialized) {
6329│ /* only delete the event if it's actually there. */
6330│ evtimer_del(&clockevent);
6331│ } else {
6332│ initialized = true;
6333│ /* process_started is initialized to time() - 2. We initialize to 1 so
6334│ * flush_all won't underflow during tests. */
6335│ #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
6336│ struct timespec ts;
6337│ if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
6338│ monotonic = true;
6339│ monotonic_start = ts.tv_sec - ITEM_UPDATE_INTERVAL - 2;
6340│ }
6341│ #endif
6342│ }
6343│
6344│ // While we're here, check for hash table expansion.
6345│ // This function should be quick to avoid delaying the timer.
6346├───> assoc_start_expand(stats_state.curr_items);
6347│
6348│ evtimer_set(&clockevent, clock_handler, 0);
6349│ event_base_set(main_base, &clockevent);
6350│ evtimer_add(&clockevent, &t);
6351│
6352│ #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
6353│ if (monotonic) {
6354│ struct timespec ts;
6355│ if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
6356│ return;
6357│ current_time = (rel_time_t) (ts.tv_sec - monotonic_start);
6358│ return;
6359│ }
6360│ #endif
6361│ {
6362│ struct timeval tv;
6363│ gettimeofday(&tv, NULL);
6364│ current_time = (rel_time_t) (tv.tv_sec - process_started);
6365│ }
6366│ }
6358行目で関数がreturnされる。
3 assoc_start_expand (curr_items=0) at assoc.c:144
143│ void assoc_start_expand(uint64_t curr_items) {
144├───> if (started_expanding)
145│ return;
146│
147│ if (curr_items > (hashsize(hashpower) * 3) / 2 &&
148│ hashpower < HASHPOWER_MAX) {
149│ started_expanding = true;
150│ pthread_cond_signal(&maintenance_cond);
151│ }
152│ }
1 main at memcached.c
8072│ /* create unix mode sockets after dropping privileges */
8073│ if (settings.socketpath != NULL) {
8074│ errno = 0;
8075│ if (server_socket_unix(settings.socketpath,settings.access)) {
8076│ vperror("failed to listen on UNIX socket: %s", settings.socketpath);
8077│ exit(EX_OSERR);
8078│ }
8079│ }
8080│
8081│ /* create the listening socket, bind it, and init */
8082├───> if (settings.socketpath == NULL) {
8083│ const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
8084│ char *temp_portnumber_filename = NULL;
8085│ size_t len;
8086│ FILE *portnumber_file = NULL;
8087│
8088│ if (portnumber_filename != NULL) {
8089│ len = strlen(portnumber_filename)+4+1;
8090│ temp_portnumber_filename = malloc(len);
8091│ snprintf(temp_portnumber_filename,
8092│ len,
8093│ "%s.lck", portnumber_filename);
8094│
8095│ portnumber_file = fopen(temp_portnumber_filename, "a");
8096│ if (portnumber_file == NULL) {
8097│ fprintf(stderr, "Failed to open \"%s\": %s\n",
8098│ temp_portnumber_filename, strerror(errno));
8099│ }
8100│ }
8101│
8102├───────> errno = 0;
8103│ if (settings.port && server_sockets(settings.port, tcp_transport,
8104│ portnumber_file)) {
8105│ vperror("failed to listen on TCP port %d", settings.port);
8106│ exit(EX_OSERR);
8107│ }
8108│
8109│ /*
8110│ * initialization order: first create the listening sockets
8111│ * (may need root on low ports), then drop root if needed,
8112│ * then daemonize if needed, then init libevent (in some cases
8113│ * descriptors created by libevent wouldn't survive forking).
8114│ */
8115│
8116│ /* create the UDP listening socket and bind it */
8117│ errno = 0;
8118├───────> if (settings.udpport && server_sockets(settings.udpport, udp_transport,
8119│ portnumber_file)) {
8120│ vperror("failed to listen on UDP port %d", settings.udpport);
8121│ exit(EX_OSERR);
8122│ }
8123│
8124│ if (portnumber_file) {
8125│ fclose(portnumber_file);
8126│ rename(temp_portnumber_filename, portnumber_filename);
8127│ }
8128│ if (temp_portnumber_filename)
8129│ free(temp_portnumber_filename);
8130│ }
2 server_sockets (port=11211, transport=tcp_transport, portnumber_file=0x0) at memcached.c:6146
6144│ static int server_sockets(int port, enum network_transport transport,
6145│ FILE *portnumber_file) {
6146├───> bool ssl_enabled = false;
6147│
6148│ #ifdef TLS
6149│ const char *notls = "notls";
6150│ ssl_enabled = settings.ssl_enabled;
6151│ #endif
6152│
6153│ if (settings.inter == NULL) {
6154│ return server_socket(settings.inter, port, transport, portnumber_file, ssl_enabled);
6155│ } else {
6156│ // tokenize them and bind to each one of them..
6157│ char *b;
6158│ int ret = 0;
3 server_socket (interface=0x0, port=11211, transport=tcp_transport, portnumber_file=0x0, ssl_enabled=false) at memcached.c:5998
5988│ * @param transport the transport protocol (TCP / UDP)
5989│ * @param portnumber_file A filepointer to write the port numbers to
5990│ * when they are successfully added to the list of ports we
5991│ * listen on.
5992│ */
5993│ static int server_socket(const char *interface,
5994│ int port,
5995│ enum network_transport transport,
5996│ FILE *portnumber_file, bool ssl_enabled) {
5997│ int sfd;
5998│ struct linger ling = {0, 0};
5999│ struct addrinfo *ai;
6000│ struct addrinfo *next;
6001├───> struct addrinfo hints = { .ai_flags = AI_PASSIVE,
6002│ .ai_family = AF_UNSPEC };
6003│ char port_buf[NI_MAXSERV];
6004│ int error;
6005│ int success = 0;
6006│ int flags =1;
6007│
6008│ hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
6009│
6010│ if (port == -1) {
6011│ port = 0;
6012│ }
6013│ snprintf(port_buf, sizeof(port_buf), "%d", port);
6014│ error= getaddrinfo(interface, port_buf, &hints, &ai);
6015│ if (error != 0) {
6016│ if (error != EAI_SYSTEM)
6017│ fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
6018│ else
6019│ perror("getaddrinfo()");
6020│ return 1;
6021│ }
6022│
6023├───> for (next= ai; next; next= next->ai_next) {
6024│ conn *listen_conn_add;
6025│ if ((sfd = new_socket(next)) == -1) {
6026│ /* getaddrinfo can return "junk" addresses,
6027│ * we make sure at least one works before erroring.
6028│ */
6029│ if (errno == EMFILE) {
6030│ /* ...unless we're out of fds */
6031│ perror("server_socket");
6032│ exit(EX_OSERR);
6033│ }
6034│ continue;
6035│ }
4 new_socket (ai=0x6520f0) at memcached.c:5936
5932│ static int new_socket(struct addrinfo *ai) {
5933│ int sfd;
5934│ int flags;
5935│
5936├───> if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
5937│ return -1;
5938│ }
5939│
5940│ if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
5941│ fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
5942│ perror("setting O_NONBLOCK");
5943│ close(sfd);
5944│ return -1;
5945│ }
5946│ return sfd;
5947│ }
3
ソケットオプションの設定
6048│ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
6049│ if (IS_UDP(transport)) {
6050│ maximize_sndbuf(sfd);
6051│ } else {
6052├───────────> error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
6053│ if (error != 0)
6054│ perror("setsockopt");
6055│
6056│ error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
6057│ if (error != 0)
6058│ perror("setsockopt");
6059│
6060│ error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
6061│ if (error != 0)
6062│ perror("setsockopt");
6063│ }
6064│
bind処理
6065│ if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
6066│ if (errno != EADDRINUSE) {
6067│ perror("bind()");
6068│ close(sfd);
6069│ freeaddrinfo(ai);
6070│ return 1;
6071│ }
6072│ close(sfd);
6073│ continue;
6074│ } else {
6075├───────────> success++;
6076│ if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
6077│ perror("listen()");
6078│ close(sfd);
6079│ freeaddrinfo(ai);
6080│ return 1;
6081│ }
6082│ if (portnumber_file != NULL &&
6083│ (next->ai_addr->sa_family == AF_INET ||
6084│ next->ai_addr->sa_family == AF_INET6)) {
6085│ union {
6086│ struct sockaddr_in in;
6087│ struct sockaddr_in6 in6;
conn_new関数が呼ばれる
6109│ * this allows "stats conns" to separately list multiple
6110│ * parallel UDP requests in progress.
6111│ *
6112│ * The dispatch code round-robins new connection requests
6113│ * among threads, so this is guaranteed to assign one
6114│ * FD to each thread.
6115│ */
6116│ int per_thread_fd = c ? dup(sfd) : sfd;
6117│ dispatch_conn_new(per_thread_fd, conn_read,
6118│ EV_READ | EV_PERSIST,
6119│ UDP_READ_BUFFER_SIZE, transport, NULL);
6120│ }
6121│ } else {
6122├───────────> if (!(listen_conn_add = conn_new(sfd, conn_listening,
6123│ EV_READ | EV_PERSIST, 1,
6124│ transport, main_base, NULL))) {
6125│ fprintf(stderr, "failed to create listening connection\n");
6126│ exit(EXIT_FAILURE);
6127│ }
6128│ #ifdef TLS
6129│ listen_conn_add->ssl_enabled = ssl_enabled;
6130│ #else
6131│ assert(ssl_enabled == false);
6132│ #endif
6133│ listen_conn_add->next = listen_conn;
6134│ listen_conn = listen_conn_add;
6135│ }
6136│ }
6137│
6138│ freeaddrinfo(ai);
6139│
6140│ /* Return zero iff we detected no errors in starting up connections */
6141│ return success == 0;
6142│ }
4 conn_new (sfd=35, init_state=conn_listening, event_flags=18, read_buffer_size=1, transport=tcp_transport, base=0x646040, ssl=0x0) at memcached.c:547
540│ conn *conn_new(const int sfd, enum conn_states init_state,
541│ const int event_flags,
542│ const int read_buffer_size, enum network_transport transport,
543│ struct event_base *base, void *ssl) {
544│ conn *c;
545│
546│ assert(sfd >= 0 && sfd < max_fds);
547├───> c = conns[sfd];
548│
549│ if (NULL == c) {
550│ if (!(c = (conn *)calloc(1, sizeof(conn)))) {
551│ STATS_LOCK();
552│ stats.malloc_fails++;
553│ STATS_UNLOCK();
554│ fprintf(stderr, "Failed to allocate connection object\n");
555│ return NULL;
556│ }
557│ MEMCACHED_CONN_CREATE(c);
558│ c->read = NULL;
559│ c->sendmsg = NULL;
560│ c->write = NULL;
561│ c->rbuf = c->wbuf = 0;
562│ c->ilist = 0;
563│ c->suffixlist = 0;
564│ c->iov = 0;
565│ c->msglist = 0;
566│ c->hdrbuf = 0;
567│
568│ c->rsize = read_buffer_size;
569│ c->wsize = DATA_BUFFER_SIZE;
570│ c->isize = ITEM_LIST_INITIAL;
571│ c->suffixsize = SUFFIX_LIST_INITIAL;
572│ c->iovsize = IOV_LIST_INITIAL;
573├───────> c->msgsize = MSG_LIST_INITIAL;
574│ c->hdrsize = 0;
575│
576│ c->rbuf = (char *)malloc((size_t)c->rsize);
577│ c->wbuf = (char *)malloc((size_t)c->wsize);
578│ c->ilist = (item **)malloc(sizeof(item *) * c->isize);
579│ c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
580│ c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
581│ c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
582│
583│ if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
584│ c->msglist == 0 || c->suffixlist == 0) {
585│ conn_free(c);
586│ STATS_LOCK();
587│ stats.malloc_fails++;
588│ STATS_UNLOCK();
589│ fprintf(stderr, "Failed to allocate buffers for connection\n");
590│ return NULL;
591│ }
592│
593│ STATS_LOCK();
594│ stats_state.conn_structs++;
595│ STATS_UNLOCK();
596│
597│ c->sfd = sfd;
598│ conns[sfd] = c;
599│ }
600│
601│ c->transport = transport;
602│ c->protocol = settings.binding_protocol;
603│
604│ /* unix socket mode doesn't need this, so zeroed out. but why
605│ * is this done for every command? presumably for UDP
606│ * mode. */
607│ if (!settings.socketpath) {
608│ c->request_addr_size = sizeof(c->request_addr);
609│ } else {
610│ c->request_addr_size = 0;
611│ }
612│
613├───> if (transport == tcp_transport && init_state == conn_new_cmd) {
614│ if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
615│ &c->request_addr_size)) {
616│ perror("getpeername");
617│ memset(&c->request_addr, 0, sizeof(c->request_addr));
618│ }
619│ }
620│
621│ if (settings.verbose > 1) {
622│ if (init_state == conn_listening) {
623│ fprintf(stderr, "<%d server listening (%s)\n", sfd,
624│ prot_text(c->protocol));
625│ } else if (IS_UDP(transport)) {
ここで event_set で event_handler が設定されている。この event_handler関数が、後ほどコマンド実行される度に呼び出される関数。
691│
692├───> event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
693│ event_base_set(base, &c->event);
694│ c->ev_flags = event_flags;
695│
696│ if (event_add(&c->event, 0) == -1) {
697│ perror("event_add");
698│ return NULL;
699│ }
700│
701│ STATS_LOCK();
702│ stats_state.curr_conns++;
703│ stats.total_conns++;
704│ STATS_UNLOCK();
705│
706│ MEMCACHED_CONN_ALLOCATE(c->sfd);
707│
708│ return c;
709│ }
1
8132│ /* Give the sockets a moment to open. I know this is dumb, but the error
8133│ * is only an advisory.
8134│ */
8135│ usleep(1000);
8136├───> if (stats_state.curr_conns + stats_state.reserved_fds >= settings.maxconns - 1) {
8137│ fprintf(stderr, "Maxconns setting is too low, use -c to increase.\n");
8138│ exit(EXIT_FAILURE);
8139│ }
8140│
8141│ if (pid_file != NULL) {
8142│ save_pid(pid_file);
8143│ }
8144│
8145│ /* Drop privileges no longer needed */
8146│ if (settings.drop_privileges) {
8147│ drop_privileges();
8148│ }
8149│
8150│ /* Initialize the uriencode lookup table. */
8151│ uriencode_init();
8152│
8153│ /* enter the event loop */
8154├───> if (event_base_loop(main_base, 0) != 0) {
8155│ retval = EXIT_FAILURE;
8156│ }
8157│
8158│ stop_assoc_maintenance_thread();
8159│
8160│ /* remove the PID file if we're a daemon */
8161│ if (do_daemonize)
8162│ remove_pidfile(pid_file);
8163│ /* Clean up strdup() call for bind() address */
8164│ if (settings.inter)
8165│ free(settings.inter);
8166│
最後にmain_baseがevent_base_loopによりイベントのループ処理が始まる。
2 uriencode_init () at util.c:16
14│ void uriencode_init(void) {
15│ int x;
16├───> char *str = uriencode_str;
17│ for (x = 0; x < 256; x++) {
18│ if (isalnum(x) || x == '-' || x == '.' || x == '_' || x == '~') {
19│ uriencode_map[x] = NULL;
20│ } else {
21│ snprintf(str, 4, "%%%02hhX", (unsigned char)x);
22│ uriencode_map[x] = str;
23│ str += 3; /* lobbing off the \0 is fine */
24│ }
25│ }
26│ }
SETコマンドの挙動
1 event_handler (fd=35, which=2, arg=0x6521e0) at memcached.c:5913
5910│ void event_handler(const int fd, const short which, void *arg) {
5911│ conn *c;
5912│
5913├───> c = (conn *)arg;
5914│ assert(c != NULL);
5915│
5916│ c->which = which;
5917│
5918│ /* sanity */
5919│ if (fd != c->sfd) {
5920│ if (settings.verbose > 0)
5921│ fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
5922│ conn_close(c);
5923│ return;
5924│ }
5925│
5926│ drive_machine(c);
5927│
5928│ /* wait for next event */
5929│ return;
5930│ }
2 drive_machine (c=0x6521e0) at memcached.c:5500
5499│ static void drive_machine(conn *c) {
5500│ bool stop = false;
5501│ int sfd;
5502│ socklen_t addrlen;
5503│ struct sockaddr_storage addr;
5504│ int nreqs = settings.reqs_per_event;
5505│ int res;
5506│ const char *str;
5507│ #ifdef HAVE_ACCEPT4
5508│ static int use_accept4 = 1;
5509│ #else
5510│ static int use_accept4 = 0;
5511│ #endif
5512│
5513│ assert(c != NULL);
5514│
c->stateがconn_listening
5515├───> while (!stop) {
5517│ switch(c->state) {
5518│ case conn_listening:
5519│ addrlen = sizeof(addr);
5520│ #ifdef HAVE_ACCEPT4
5521│ if (use_accept4) {
5522│ sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);
5523│ } else {
5524│ sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
5525│ }
5526│ #else
5527│ sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
5528│ #endif
5529├───────────> if (sfd == -1) {
5530│ if (use_accept4 && errno == ENOSYS) {
5531│ use_accept4 = 0;
5532│ continue;
5533│ }
5534│ perror(use_accept4 ? "accept4()" : "accept()");
5535│ if (errno == EAGAIN || errno == EWOULDBLOCK) {
5536│ /* these are transient, so don't log anything */
5537│ stop = true;
5538│ } else if (errno == EMFILE) {
5539│ if (settings.verbose > 0)
5540│ fprintf(stderr, "Too many open connections\n");
5541│ accept_new_conns(false);
5605├───────────────> dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
5606│ DATA_BUFFER_SIZE, c->transport, ssl_v);
3 dispatch_conn_new (sfd=37, init_state=conn_new_cmd, event_flags=18, read_buffer_size=2048, transport=tcp_transport
, ssl=0x0) at thread.c:491
484│ /*
485│ * Dispatches a new connection to another thread. This is only ever called
486│ * from the main thread, either during initialization (for UDP) or because
487│ * of an incoming connection.
488│ */
489│ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
490│ int read_buffer_size, enum network_transport transport, void *ssl) {
491├───> CQ_ITEM *item = cqi_new();
492│ char buf[1];
493│ if (item == NULL) {
494│ close(sfd);
495│ /* given that malloc failed this may also fail, but let's try */
496│ fprintf(stderr, "Failed to allocate memory for connection object\n");
497│ return ;
498│ }
499│
500│ int tid = (last_thread + 1) % settings.num_threads;
501│
502│ LIBEVENT_THREAD *thread = threads + tid;
503│
503│
504│ last_thread = tid;
505│
506│ item->sfd = sfd;
507│ item->init_state = init_state;
508│ item->event_flags = event_flags;
509│ item->read_buffer_size = read_buffer_size;
510│ item->transport = transport;
511│ item->mode = queue_new_conn;
512├───> item->ssl = ssl;
513│
514│ cq_push(thread->new_conn_queue, item);
515│
516│ MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
517│ buf[0] = 'c';
518│ if (write(thread->notify_send_fd, buf, 1) != 1) {
519│ perror("Writing to thread notify pipe");
520│ }
521│ }
thread->notify_send_fdにはbufに格納された"c"をwriteする。
4 cqi_new () at thread.c:240
236│ /*
237│ * Returns a fresh connection queue item.
238│ */
239│ static CQ_ITEM *cqi_new(void) {
240├───> CQ_ITEM *item = NULL;
241│ pthread_mutex_lock(&cqi_freelist_lock);
242│ if (cqi_freelist) {
243│ item = cqi_freelist;
244│ cqi_freelist = item->next;
245│ }
246│ pthread_mutex_unlock(&cqi_freelist_lock);
247│
248│ if (NULL == item) {
249│ int i;
250│
251│ /* Allocate a bunch of items at once to reduce fragmentation */
252│ item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
253├───────> if (NULL == item) {
254│ STATS_LOCK();
255│ stats.malloc_fails++;
256│ STATS_UNLOCK();
257│ return NULL;
258│ }
259│
260│ /*
261│ * Link together all the new items except the first one
262│ * (which we'll return to the caller) for placement on
263│ * the freelist.
264│ */
265│ for (i = 2; i < ITEMS_PER_ALLOC; i++)
266│ item[i - 1].next = &item[i];
267│
268│ pthread_mutex_lock(&cqi_freelist_lock);
269│ item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
270│ cqi_freelist = &item[1];
271│ pthread_mutex_unlock(&cqi_freelist_lock);
272│ }
273│
274│ return item;
275│ }
4 cq_push (cq=0x64f250, item=0x658270) at thread.c:225
221│ /*
222│ * Adds an item to a connection queue.
223│ */
224│ static void cq_push(CQ *cq, CQ_ITEM *item) {
225├───> item->next = NULL;
226│
227│ pthread_mutex_lock(&cq->lock);
228│ if (NULL == cq->tail)
229│ cq->head = item;
230│ else
231│ cq->tail->next = item;
232│ cq->tail = item;
233│ pthread_mutex_unlock(&cq->lock);
234│ }
1 event_handler
2 drive_machine
その後一旦libeventの処理に戻り、event_handlerからdrive_machine
c->stateがconn_new_cmdとなっている。
5651│ case conn_new_cmd:
5652│ /* Only process nreqs at a time to avoid starving other
5653│ connections */
5654│
5655│ --nreqs;
5656├───────────> if (nreqs >= 0) {
5657│ reset_cmd_handler(c);
5658│ } else {
5659│ pthread_mutex_lock(&c->thread->stats.mutex);
5660│ c->thread->stats.conn_yields++;
5661│ pthread_mutex_unlock(&c->thread->stats.mutex);
5662│ if (c->rbytes > 0) {
5663│ /* We have already read in data into the input buffer,
5664│ so libevent will most likely not signal read events
5665│ on the socket (unless more data is available. As a
5666│ hack we should just put in a request to write data,
5667│ because that should be possible ;-)
5668│ */
3 reset_cmd_handler (c=0x7ffff00109a0) at memcached.c:2766
最後にconn_set_state(c, conn_waiting)が実行される。
2765│ static void reset_cmd_handler(conn *c) {
2766├───> c->cmd = -1;
2767│ c->substate = bin_no_state;
2768│ if(c->item != NULL) {
2769│ item_remove(c->item);
2770│ c->item = NULL;
2771│ }
2772│ conn_shrink(c);
2773│ if (c->rbytes > 0) {
2774│ conn_set_state(c, conn_parse_cmd);
2775│ } else {
2776│ conn_set_state(c, conn_waiting);
2777│ }
2778│ }
4 conn_shrink (c=0x7ffff00109a0) at memcached.c:910
899│ /*
900│ * Shrinks a connection's buffers if they're too big. This prevents
901│ * periodic large "get" requests from permanently chewing lots of server
902│ * memory.
903│ *
904│ * This should only be called in between requests since it can wipe output
905│ * buffers!
906│ */
907│ static void conn_shrink(conn *c) {
908│ assert(c != NULL);
909│
910├───> if (IS_UDP(c->transport))
911│ return;
912│
913│ if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
914│ char *newbuf;
915│
916│ if (c->rcurr != c->rbuf)
917│ memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
918│
919│ newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
920│
921│ if (newbuf) {
922│ c->rbuf = newbuf;
923│ c->rsize = DATA_BUFFER_SIZE;
924│ }
925│ /* TODO check other branch... */
926│ c->rcurr = c->rbuf;
927│ }
928│
929├───> if (c->isize > ITEM_LIST_HIGHWAT) {
930│ item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
931│ if (newbuf) {
932│ c->ilist = newbuf;
933│ c->isize = ITEM_LIST_INITIAL;
934│ }
935│ /* TODO check error condition? */
936│ }
937│
938│ if (c->msgsize > MSG_LIST_HIGHWAT) {
939│ struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->m
940│ if (newbuf) {
941│ c->msglist = newbuf;
942│ c->msgsize = MSG_LIST_INITIAL;
943│ }
944│ /* TODO check error condition? */
945│ }
946│
947├───> if (c->iovsize > IOV_LIST_HIGHWAT) {
948│ struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0])
949│ if (newbuf) {
950│ c->iov = newbuf;
951│ c->iovsize = IOV_LIST_INITIAL;
952│ }
953│ /* TODO check return value */
954│ }
955│ }
4 conn_set_state (c=0x7ffff00109a0, state=conn_waiting) at memcached.c:985
976│ /*
977│ * Sets a connection's current state in the state machine. Any special
978│ * processing that needs to happen on certain state transitions can
979│ * happen here.
980│ */
981│ static void conn_set_state(conn *c, enum conn_states state) {
982│ assert(c != NULL);
983│ assert(state >= conn_listening && state < conn_max_state);
984│
985├───> if (state != c->state) {
986│ if (settings.verbose > 2) {
987│ fprintf(stderr, "%d: going from %s to %s\n",
988│ c->sfd, state_text(c->state),
989│ state_text(state));
990│ }
991│
992│ if (state == conn_write || state == conn_mwrite) {
993│ MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
994│ }
995│ c->state = state;
996│ }
997│ }
1 event_handler
2 drive_machine
c->stateがconn_waitingになっている。最後にconn_set_state関数でconn_readに設定している。
5612│ case conn_waiting:
5613├───────────> if (!update_event(c, EV_READ | EV_PERSIST)) {
5614│ if (settings.verbose > 0)
5615│ fprintf(stderr, "Couldn't update event\n");
5616│ conn_set_state(c, conn_closing);
5617│ break;
5618│ }
5619│
5620│ conn_set_state(c, conn_read);
5621│ stop = true;
5622│ break;
5623│
3 update_event (c=0x7ffff00109a0, new_flags=18) at memcached.c:5295
5292│ static bool update_event(conn *c, const int new_flags) {
5293│ assert(c != NULL);
5294│
5295├───> struct event_base *base = c->event.ev_base;
5296│ if (c->ev_flags == new_flags)
5297│ return true;
5298│ if (event_del(&c->event) == -1) return false;
5299│ event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
5300│ event_base_set(base, &c->event);
5301│ c->ev_flags = new_flags;
5302│ if (event_add(&c->event, 0) == -1) return false;
5303│ return true;
5304│ }
1 event_handler
2 drive_machine
再びlibeventの処理に戻る。event.cのevent_process_active関数中でevent_process_activeが実行されると再びevent_handlerに戻ってくる c->stateがconn_readに設定されている。
resがREAD_DATA_RECEIVEDより、conn_parse_cmdに設定される。
5624│ case conn_read:
5625├───────────> res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
5626│
5627│ switch (res) {
5628│ case READ_NO_DATA_RECEIVED:
5629│ conn_set_state(c, conn_waiting);
5630│ break;
5631│ case READ_DATA_RECEIVED:
5632│ conn_set_state(c, conn_parse_cmd);
5633│ break;
5634│ case READ_ERROR:
5635│ conn_set_state(c, conn_closing);
5636│ break;
5637│ case READ_MEMORY_ERROR: /* Failed to allocate more memory */
5638│ /* State already set by try_read_network */
5639│ break;
5640│ }
5641│ break;
5642│
3 try_read_network (c=0x7ffff00109a0) at memcached.c:5231
5218│ /*
5219│ * read from network as much as we can, handle buffer overflow and connection
5220│ * close.
5221│ * before reading, move the remaining incomplete fragment of a command
5222│ * (if any) to the beginning of the buffer.
5223│ *
5224│ * To protect us from someone flooding a connection with bogus data causing
5225│ * the connection to eat up all available memory, break out and start looking
5226│ * at the data I've got after a number of reallocs...
5227│ *
5228│ * @return enum try_read_result
5229│ */
5230│ static enum try_read_result try_read_network(conn *c) {
5231├───> enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
5232│ int res;
5233│ int num_allocs = 0;
5234│ assert(c != NULL);
5235│
5236│ if (c->rcurr != c->rbuf) {
5237│ if (c->rbytes != 0) /* otherwise there's nothing to copy */
5238│ memmove(c->rbuf, c->rcurr, c->rbytes);
5239│ c->rcurr = c->rbuf;
5240│ }
5241│
5242│ while (1) {
5243│ if (c->rbytes >= c->rsize) {
5276行目でbreak。その後、関数もretrunされる。
5265│ int avail = c->rsize - c->rbytes;
5266├───────> res = c->read(c, c->rbuf + c->rbytes, avail);
5267│ if (res > 0) {
5268│ pthread_mutex_lock(&c->thread->stats.mutex);
5269│ c->thread->stats.bytes_read += res;
5270│ pthread_mutex_unlock(&c->thread->stats.mutex);
5271│ gotdata = READ_DATA_RECEIVED;
5272│ c->rbytes += res;
5273│ if (res == avail) {
5274│ continue;
5275│ } else {
5276│ break;
5277│ }
5278│ }
5279│ if (res == 0) {
5280│ return READ_ERROR;
5281│ }
5282│ if (res == -1) {
5283│ if (errno == EAGAIN || errno == EWOULDBLOCK) {
5284│ break;
5285│ }
5286│ return READ_ERROR;
5287│ }
5288│ }
4 tcp_read (c=0x7ffff00109a0, buf=0x7ffff0010bd0, count=2048) at memcached.c:162
159│ /* Default methods to read from/ write to a socket */
160│ ssize_t tcp_read(conn *c, void *buf, size_t count) {
161│ assert (c != NULL);
162├───> return read(c->sfd, buf, count);
163│ }
2
5643│ case conn_parse_cmd :
5644├───────────> if (try_read_command(c) == 0) {
5645│ /* wee need more data! */
5646│ conn_set_state(c, conn_waiting);
5647│ }
5648│
5649│ break;
5650│
3 try_read_command (c=0x7ffff00109a0) at memcached.c:5053
実際に入力した内容を読み込む処理。
5045│ /*
5046│ * if we have a complete line in the buffer, process it.
5047│ */
5048│ static int try_read_command(conn *c) {
5049│ assert(c != NULL);
5050│ assert(c->rcurr <= (c->rbuf + c->rsize));
5051│ assert(c->rbytes > 0);
5052│
5053├───> if (c->protocol == negotiating_prot || c->transport == udp_transport) {
5054│ if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
5055│ c->protocol = binary_prot;
5056│ } else {
5057│ c->protocol = ascii_prot;
5058│ }
5059│
5060│ if (settings.verbose > 1) {
5061│ fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
5062│ prot_text(c->protocol));
5063│ }
5064│ }
5065│
MemcachedではASCIIモードとバイナリーモードが利用できるが、ここではASCIIモードを利用しているので、そちらの処理を実行。
5132│ } else {
5133│ char *el, *cont;
5134│
5135│ if (c->rbytes == 0)
5136│ return 0;
5137│
5138│ el = memchr(c->rcurr, '\n', c->rbytes);
5139├───────> if (!el) {
5140│ if (c->rbytes > 1024) {
5141│ /*
5142│ * We didn't have a '\n' in the first k. This _has_ to be a
5143│ * large multiget, if not we should just nuke the connection.
5144│ */
5145│ char *ptr = c->rcurr;
5146│ while (*ptr == ' ') { /* ignore leading whitespaces */
5147│ ++ptr;
5148│ }
5149│
5150│ if (ptr - c->rcurr > 100 ||
5151│ (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
5152│
5153│ conn_set_state(c, conn_closing);
5154│ return 1;
5155│ }
5156│ }
5157│
5158│ return 0;
5159│ }
5160├───────> cont = el + 1;
5161│ if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
5162│ el--;
5163│ }
5164│ *el = '\0';
5165│
5166│ assert(cont <= (c->rcurr + c->rbytes));
5167│
5168│ c->last_cmd_time = current_time;
5169│ process_command(c, c->rcurr);
5170│
5171│ c->rbytes -= (cont - c->rcurr);
5172│ c->rcurr = cont;
4
483│ static const char *prot_text(enum protocol prot) {
484│ char *rv = "unknown";
485├───> switch(prot) {
486│ case ascii_prot:
487│ rv = "ascii";
488│ break;
489│ case binary_prot:
490│ rv = "binary";
491│ break;
492│ case negotiating_prot:
493│ rv = "auto-negotiate";
494│ break;
495│ }
496│ return rv;
497│ }
4 process_command (c=0x7ffff00109a0, command=0x7ffff0010bd0 "set test1 0 0 7") at memcached.c:4731
4721│ static void process_command(conn *c, char *command) {
4722│
4723│ token_t tokens[MAX_TOKENS];
4724│ size_t ntokens;
4725│ int comm;
4726│
4727│ assert(c != NULL);
4728│
4729│ MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
4730│
4731├───> if (settings.verbose > 1)
4732│ fprintf(stderr, "<%d %s\n", c->sfd, command);
4733│
4734│ /*
4735│ * for commands set/add/replace, we build an item and read the data
4736│ * directly into it, then continue in nread_complete().
4737│ */
4738│
4739│ c->msgcurr = 0;
4740│ c->msgused = 0;
4741│ c->iovused = 0;
4742│ if (add_msghdr(c) != 0) {
4743│ out_of_memory(c, "SERVER_ERROR out of memory preparing response");
4744│ return;
4745│ }
4746│
4747│ ntokens = tokenize_command(command, tokens, MAX_TOKENS);
5 add_msghdr (c=0x7ffff00109a0) at memcached.c:329
318│ /*
319│ * Adds a message header to a connection.
320│ *
321│ * Returns 0 on success, -1 on out-of-memory.
322│ */
323│ static int add_msghdr(conn *c)
324│ {
325│ struct msghdr *msg;
326│
327│ assert(c != NULL);
328│
329├───> if (c->msgsize == c->msgused) {
330│ msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
331│ if (! msg) {
332│ STATS_LOCK();
333│ stats.malloc_fails++;
334│ STATS_UNLOCK();
335│ return -1;
336│ }
337│ c->msglist = msg;
338│ c->msgsize *= 2;
339│ }
340│
341│ msg = c->msglist + c->msgused;
342│
343│ /* this wipes msg_iovlen, msg_control, msg_controllen, and
344│ msg_flags, the last 3 of which aren't defined on solaris: */
345│ memset(msg, 0, sizeof(struct msghdr));
346│
347│ msg->msg_iov = &c->iov[c->iovused];
348│
349│ if (IS_UDP(c->transport) && c->request_addr_size > 0) {
350│ msg->msg_name = &c->request_addr;
351│ msg->msg_namelen = c->request_addr_size;
352│ }
353│
354├───> c->msgbytes = 0;
355│ c->msgused++;
356│
357│ if (IS_UDP(c->transport)) {
358│ /* Leave room for the UDP header, which we'll fill in later. */
359│ return add_iov(c, NULL, UDP_HEADER_SIZE);
360│ }
361│
362│ return 0;
363│ }
5 tokenize_command (command=0x7ffff0010bd0 "set test1 0 0 7", tokens=0x7ffff6da8bd0, max_tokens=8) at memcached.c:3045
ntokensでは、"set test1 0 0 7 sample1"の要素数である6が返される。
3032│ *
3033│ * Usage example:
3034│ *
3035│ * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
3036│ * for(int ix = 0; tokens[ix].length != 0; ix++) {
3037│ * ...
3038│ * }
3039│ * ncommand = tokens[ix].value - command;
3040│ * command = tokens[ix].value;
3041│ * }
3042│ */
3043│ static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
3044│ char *s, *e;
3045├───> size_t ntokens = 0;
3046│ size_t len = strlen(command);
3047│ unsigned int i = 0;
3048│
3049│ assert(command != NULL && tokens != NULL && max_tokens > 1);
3050│
3051│ s = e = command;
3052│ for (i = 0; i < len; i++) {
3053│ if (*e == ' ') {
3054│ if (s != e) {
3055│ tokens[ntokens].value = s;
3056│ tokens[ntokens].length = e - s;
3057│ ntokens++;
3058│ *e = '\0';
3059│ if (ntokens == max_tokens - 1) {
3060│ e++;
3061│ s = e; /* so we don't add an extra token */
3062│ break;
3063│ }
3064│ }
3065│ s = e + 1;
3066│ }
3067├───────> e++;
3068│ }
3069│
3070│ if (s != e) {
3071│ tokens[ntokens].value = s;
3072│ tokens[ntokens].length = e - s;
3073├───────> ntokens++;
3074│ }
3075│
3076│ /*
3077│ * If we scanned the whole string, the terminal value pointer is null,
3078│ * otherwise it is the first unprocessed character.
3079│ */
3080│ tokens[ntokens].value = *e == '\0' ? NULL : e;
3081│ tokens[ntokens].length = 0;
3082│ ntokens++;
3083│
3084│ return ntokens;
3085│ }
4
tokenize_command関数の実行結果を元に処理が実行される。ここではSETコマンドなのでprocess_update_command関数の実行
4747│ ntokens = tokenize_command(command, tokens, MAX_TOKENS);
4748│ if (ntokens >= 3 &&
4749│ ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
4750│ (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
4751│
4752│ process_get_command(c, tokens, ntokens, false, false);
4753│
4754├───> } else if ((ntokens == 6 || ntokens == 7) &&
4755│ ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
4756│ (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
4757│ (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
4758│ (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
4759│ (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
4760│
4761│ process_update_command(c, tokens, ntokens, comm, false);
4762│
4763│ } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm =
4764│
4765│ process_update_command(c, tokens, ntokens, comm, true);
4766│
5 process_update_command (c=0x7ffff00109a0, tokens=0x7ffff6da8bd0, ntokens=6, comm=2, handle_cas=false) at memcached.c:4107
4103│ static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas
4104│ char *key;
4105│ size_t nkey;
4106│ unsigned int flags;
4107│ int32_t exptime_int = 0;
4108│ time_t exptime;
4109│ int vlen;
4110├───> uint64_t req_cas_id=0;
4111│ item *it;
4112│
4113│ assert(c != NULL);
4114│
4115│ set_noreply_maybe(c, tokens, ntokens);
4116│
4117│ if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
4118│ out_string(c, "CLIENT_ERROR bad command line format");
4119│ return;
4120│ }
4121│
6 set_noreply_maybe (c=0x7ffff00109a0, tokens=0x7ffff6da8bd0, ntokens=6) at memcached.c:3102
ここではSETコマンド実行時にnoreplyオプションを指定していないので何も実行されない。
3100│ static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
3101│ {
3102├───> int noreply_index = ntokens - 2;
3103│
3104│ /*
3105│ NOTE: this function is not the first place where we are going to
3106│ send the reply. We could send it instead from process_command()
3107│ if the request line has wrong number of tokens. However parsing
3108│ malformed line for "noreply" option is not reliable anyway, so
3109│ it can't be helped.
3110│ */
3111│ if (tokens[noreply_index].value
3112│ && strcmp(tokens[noreply_index].value, "noreply") == 0) {
3113│ c->noreply = true;
3114│ }
3115│ return c->noreply;
3116│ }
5
item_allocが実行される
4122│ key = tokens[KEY_TOKEN].value;
4123│ nkey = tokens[KEY_TOKEN].length;
4124│
4125│ if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
4126│ && safe_strtol(tokens[3].value, &exptime_int)
4127│ && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
4128│ out_string(c, "CLIENT_ERROR bad command line format");
4129│ return;
4130│ }
4131│
4132│ /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
4133├───> exptime = exptime_int;
4134│
4135│ /* Negative exptimes can underflow and end up immortal. realtime() will
4136│ immediately expire values that are greater than REALTIME_MAXDELTA, but less
4137│ than process_started, so lets aim for that. */
4138│ if (exptime < 0)
4139│ exptime = REALTIME_MAXDELTA + 1;
4140│
4141│ // does cas value exist?
4142│ if (handle_cas) {
4143│ if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
4144│ out_string(c, "CLIENT_ERROR bad command line format");
4145│ return;
4146│ }
4147│ }
4148│
4149│ if (vlen < 0 || vlen > (INT_MAX - 2)) {
4150│ out_string(c, "CLIENT_ERROR bad command line format");
4151│ return;
4152│ }
4153│ vlen += 2;
4154│
4155├───> if (settings.detail_enabled) {
4156│ stats_prefix_record_set(key, nkey);
4157│ }
4158│
4159│ it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
4160│
4161│ if (it == 0) {
4162│ enum store_item_type status;
4163│ if (! item_size_ok(nkey, flags, vlen)) {
4164│ out_string(c, "SERVER_ERROR object too large for cache");
4165│ status = TOO_LARGE;
4166│ } else {
4167│ out_of_memory(c, "SERVER_ERROR out of memory storing object");
6 realtime (exptime=0) at memcached.c:208
200│ /*
201│ * given time value that's either unix time or delta from current unix time, return
202│ * unix time. Use the fact that delta can't exceed one month (and real time value can't
203│ * be that low).
204│ */
205│ static rel_time_t realtime(const time_t exptime) {
206│ /* no. of seconds in 30 days - largest possible delta exptime */
207│
208│ if (exptime == 0) return 0; /* 0 means never expire */
209│
210│ if (exptime > REALTIME_MAXDELTA) {
211│ /* if item expiration is at/before the server started, give it an
212│ expiration time of 1 second after the server started.
213│ (because 0 means don't expire). without this, we'd
214│ underflow and wrap around to some large value way in the
215│ future, effectively making items expiring in the past
216│ really expiring never */
217│ if (exptime <= process_started)
218│ return (rel_time_t)1;
219│ return (rel_time_t)(exptime - process_started);
220│ } else {
221│ return (rel_time_t)(exptime + current_time);
222│ }
223├>}
6 item_alloc (key=0x7ffff0010bd4 "test1", nkey=5, flags=0, exptime=0, nbytes=9) at thread.c:579
571│ /********************************* ITEM ACCESS *******************************/
572│
573│ /*
574│ * Allocates a new item.
575│ */
576│ item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
577│ item *it;
578│ /* do_item_alloc handles its own locks */
579├───> it = do_item_alloc(key, nkey, flags, exptime, nbytes);
580│ return it;
581│ }
7 do_item_alloc (key=0x7ffff0010bd4 "test1", nkey=5, flags=0, exptime=0, nbytes=9) at items.c:260
CASを有効にしていると、uint64_tのサイズ分の8バイトが全体サイズに追加されている処理も行われている。
257│ item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
258│ const rel_time_t exptime, const int nbytes) {
259│ uint8_t nsuffix;
260│ item *it = NULL;
261│ char suffix[40];
262│ // Avoid potential underflows.
263├───> if (nbytes < 2)
264│ return 0;
265│
266│ size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
267│ if (settings.use_cas) {
268│ ntotal += sizeof(uint64_t);
269│ }
270│
271│ unsigned int id = slabs_clsid(ntotal);
272│ unsigned int hdr_id = 0;
273│ if (id == 0)
274│ return 0;
275│
8 item_make_header (nkey=6 ‘\006’, flags=0, nbytes=9, suffix=0x7ffff6da8a70 "", nsuffix=0x7ffff6da8aa7 "") at items.c:171
158│ * Generates the variable-sized part of the header for an object.
159│ *
160│ * key - The key
161│ * nkey - The length of the key
162│ * flags - key flags
163│ * nbytes - Number of bytes to hold value and addition CRLF terminator
164│ * suffix - Buffer for the "VALUE" line suffix (flags, size).
165│ * nsuffix - The length of the suffix is stored here.
166│ *
167│ * Returns the total size of the header.
168│ */
169│ static size_t item_make_header(const uint8_t nkey, const unsigned int flags, const int nbytes,
170│ char *suffix, uint8_t *nsuffix) {
171├───> if (settings.inline_ascii_response) {
172│ /* suffix is defined at 40 chars elsewhere.. */
173│ *nsuffix = (uint8_t) snprintf(suffix, 40, " %u %d\r\n", flags, nbytes - 2);
174│ } else {
175│ if (flags == 0) {
176│ *nsuffix = 0;
177│ } else {
178│ *nsuffix = sizeof(flags);
179│ }
180│ }
181│ return sizeof(item) + nkey + *nsuffix + nbytes;
182│ }
8 slabs_clsid (size=71) at slabs.c:92
83│ /*
84│ * Figures out which slab class (chunk size) is required to store an item of
85│ * a given size.
86│ *
87│ * Given object size, return id to use when allocating/freeing memory for object
88│ * 0 means error: can't store such a large object
89│ */
90│
91│ unsigned int slabs_clsid(const size_t size) {
92├───> int res = POWER_SMALLEST;
93│
94│ if (size == 0 || size > settings.item_size_max)
95│ return 0;
96│ while (size > slabclass[res].size)
97│ if (res++ == power_largest) /* won't fit in the biggest slab */
98│ return power_largest;
99│ return res;
100│ }
7
elseのdo_item_alloc_pullが実行される
276│ /* This is a large item. Allocate a header object now, lazily allocate
277│ * chunks while reading the upload.
278│ */
279├───> if (ntotal > settings.slab_chunk_size_max) {
280│ /* We still link this item into the LRU for the larger slab class, but
281│ * we're pulling a header from an entirely different slab class. The
282│ * free routines handle large items specifically.
283│ */
284│ int htotal = nkey + 1 + nsuffix + sizeof(item) + sizeof(item_chunk);
285│ if (settings.use_cas) {
286│ htotal += sizeof(uint64_t);
287│ }
288│ #ifdef NEED_ALIGN
289│ // header chunk needs to be padded on some systems
290│ int remain = htotal % 8;
291│ if (remain != 0) {
292│ htotal += 8 - remain;
293│ }
294│ #endif
295│ hdr_id = slabs_clsid(htotal);
296│ it = do_item_alloc_pull(htotal, hdr_id);
297│ /* setting ITEM_CHUNKED is fine here because we aren't LINKED yet. */
298│ if (it != NULL)
299│ it->it_flags |= ITEM_CHUNKED;
300│ } else {
301├───────> it = do_item_alloc_pull(ntotal, id);
302│ }
303│
8 do_item_alloc (key=0x7ffff0010bd4 "test1", nkey=5, flags=0, exptime=0, nbytes=9) at items.c:272
slabs_allocが実行される。
184│ item *do_item_alloc_pull(const size_t ntotal, const unsigned int id) {
185│ item *it = NULL;
186│ int i;
187│ /* If no memory is available, attempt a direct LRU juggle/eviction */
188│ /* This is a race in order to simplify lru_pull_tail; in cases where
189│ * locked items are on the tail, you want them to fall out and cause
190│ * occasional OOM's, rather than internally work around them.
191│ * This also gives one fewer code path for slab alloc/free
192│ */
193├───> for (i = 0; i < 10; i++) {
194│ uint64_t total_bytes;
195│ /* Try to reclaim memory first */
196│ if (!settings.lru_segmented) {
197│ lru_pull_tail(id, COLD_LRU, 0, 0, 0, NULL);
198│ }
199│ it = slabs_alloc(ntotal, id, &total_bytes, 0);
200│
201│ if (settings.temp_lru)
202│ total_bytes -= temp_lru_size(id);
203│
204│ if (it == NULL) {
205│ if (lru_pull_tail(id, COLD_LRU, total_bytes, LRU_PULL_EVICT, 0, NULL) <= 0) {
206│ if (settings.lru_segmented) {
207│ lru_pull_tail(id, HOT_LRU, total_bytes, 0, 0, NULL);
208│ } else {
209│ break;
210│ }
211│ }
9 slabs_alloc (size=71, id=1, total_bytes=0x7ffff6da8a28, flags=0) at slabs.c:658
654│ void *slabs_alloc(size_t size, unsigned int id, uint64_t *total_bytes,
655│ unsigned int flags) {
656│ void *ret;
657│
658├───> pthread_mutex_lock(&slabs_lock);
659│ ret = do_slabs_alloc(size, id, total_bytes, flags);
660│ pthread_mutex_unlock(&slabs_lock);
661│ return ret;
662│ }
10 do_slabs_alloc (size=71, id=1, total_bytes=0x7ffff6da8a28, flags=0) at slabs.c:337
do_slabs_newslabが実行される。
333│ /*@null@*/
334│ static void *do_slabs_alloc(const size_t size, unsigned int id, uint64_t *total_bytes,
335│ unsigned int flags) {
336│ slabclass_t *p;
337│ void *ret = NULL;
338│ item *it = NULL;
339│
340├───> if (id < POWER_SMALLEST || id > power_largest) {
341│ MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);
342│ return NULL;
343│ }
344│ p = &slabclass[id];
345│ assert(p->sl_curr == 0 || ((item *)p->slots)->slabs_clsid == 0);
346│ if (total_bytes != NULL) {
347│ *total_bytes = p->requested;
348│ }
349│
350│ assert(size <= p->size);
351│ /* fail unless we have space at the end of a recently allocated page,
352│ we have something on our freelist, or we could allocate a new page */
353│ if (p->sl_curr == 0 && flags != SLABS_ALLOC_NO_NEWPAGE) {
354├───────> do_slabs_newslab(id);
355│ }
356│
357│ if (p->sl_curr != 0) {
358│ /* return off our freelist */
359│ it = (item *)p->slots;
360│ p->slots = it->next;
361│ if (it->next) it->next->prev = 0;
362│ /* Kill flag and initialize refcount here for lock safety in slab
363│ * mover's freeness detection. */
364│ it->it_flags &= ~ITEM_SLABBED;
365│ it->refcount = 1;
366│ p->sl_curr--;
:
:
372│ if (ret) {
373│ p->requested += size;
374│ MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);
375│ } else {
376│ MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);
377│ }
11 do_slabs_newslab (id=1) at slabs.c:302
301│ static int do_slabs_newslab(const unsigned int id) {
302│ slabclass_t *p = &slabclass[id];
303│ slabclass_t *g = &slabclass[SLAB_GLOBAL_PAGE_POOL];
304│ int len = (settings.slab_reassign || settings.slab_chunk_size_max != settings.slab_page_size)
305│ ? settings.slab_page_size
306│ : p->size * p->perslab;
307│ char *ptr;
308│
309├───> if ((mem_limit && mem_malloced + len > mem_limit && p->slabs > 0
310│ && g->slabs == 0)) {
311│ mem_limit_reached = true;
312│ MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
313│ return 0;
314│ }
315│
316│ if ((grow_slab_list(id) == 0) ||
317│ (((ptr = get_page_from_global_pool()) == NULL) &&
318│ ((ptr = memory_allocate((size_t)len)) == 0))) {
319│
320│ MEMCACHED_SLABS_SLABCLASS_ALLOCATE_FAILED(id);
321│ return 0;
322│ }
323│
324│ memset(ptr, 0, (size_t)len);
325│ split_slab_page_into_freelist(ptr, id);
326│
327│ p->slab_list[p->slabs++] = ptr;
328│ MEMCACHED_SLABS_SLABCLASS_ALLOCATE(id);
12 grow_slab_list (id=1) at slabs.c:270
269│ static int grow_slab_list (const unsigned int id) {
270├───> slabclass_t *p = &slabclass[id];
271│ if (p->slabs == p->list_size) {
272│ size_t new_size = (p->list_size != 0) ? p->list_size * 2 : 16;
273│ void *new_list = realloc(p->slab_list, new_size * sizeof(void *));
274│ if (new_list == 0) return 0;
275│ p->list_size = new_size;
276│ p->slab_list = new_list;
277│ }
278│ return 1;
279│ }
12 memory_allocate (size=1048576) at slabs.c:611
608│ static void *memory_allocate(size_t size) {
609│ void *ret;
610│
611├───> if (mem_base == NULL) {
612│ /* We are not using a preallocated large memory chunk */
613│ ret = malloc(size);
614│ } else {
615│ ret = mem_current;
616│
617│ if (size > mem_avail) {
618│ return NULL;
619│ }
620│
621│ /* mem_current pointer _must_ be aligned!!! */
622│ if (size % CHUNK_ALIGN_BYTES) {
623│ size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);
12 split_slab_page_into_freelist (ptr=0x7ffff44a4010 "", id=1) at slabs.c:282
281│ static void split_slab_page_into_freelist(char *ptr, const unsigned int id) {
282├───> slabclass_t *p = &slabclass[id];
283│ int x;
284│ for (x = 0; x < p->perslab; x++) {
285│ do_slabs_free(ptr, 0, id);
286│ ptr += p->size;
287│ }
288│ }
13 do_slabs_free (ptr=0x7ffff44a4010, size=0, id=1) at slabs.c:447
442│ static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {
443│ slabclass_t *p;
444│ item *it;
445│
446│ assert(id >= POWER_SMALLEST && id <= power_largest);
447├───> if (id < POWER_SMALLEST || id > power_largest)
448│ return;
449│
450│ MEMCACHED_SLABS_FREE(size, id, ptr);
451│ p = &slabclass[id];
452│
453│ it = (item *)ptr;
454│ if ((it->it_flags & ITEM_CHUNKED) == 0) {
455│ #ifdef EXTSTORE
456│ bool is_hdr = it->it_flags & ITEM_HDR;
457│ #endif
458│ it->it_flags = ITEM_SLABBED;
459│ it->slabs_clsid = 0;
460│ it->prev = 0;
461│ it->next = p->slots;
462│ if (it->next) it->next->prev = it;
463│ p->slots = it;
464│
465├───────> p->sl_curr++;
466│ #ifdef EXTSTORE
467│ if (!is_hdr) {
468│ p->requested -= size;
469│ } else {
470│ p->requested -= (size - it->nbytes) + sizeof(item_hdr);
471│ }
472│ #else
473│ p->requested -= size;
474│ #endif
475│ } else {
476│ do_slabs_free_chunked(it, size);
477│ }
478│ return;
479│ }
8
317│ /* Items are initially loaded into the HOT_LRU. This is '0' but I want at
318│ * least a note here. Compiler (hopefully?) optimizes this out.
319│ */
320│ if (settings.temp_lru &&
321│ exptime - current_time <= settings.temporary_ttl) {
322│ id |= TEMP_LRU;
323│ } else if (settings.lru_segmented) {
324│ id |= HOT_LRU;
325│ } else {
326│ /* There is only COLD in compat-mode */
327│ id |= COLD_LRU;
328│ }
329├───> it->slabs_clsid = id;
330│
331│ DEBUG_REFCNT(it, '*');
332│ it->it_flags |= settings.use_cas ? ITEM_CAS : 0;
333│ it->nkey = nkey;
334│ it->nbytes = nbytes;
335│ memcpy(ITEM_key(it), key, nkey);
336│ it->exptime = exptime;
337│ if (settings.inline_ascii_response) {
338│ memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
339│ } else if (nsuffix > 0) {
340│ memcpy(ITEM_suffix(it), &flags, sizeof(flags));
341│ }
342│ it->nsuffix = nsuffix;
343│
344│ /* Initialize internal chunk. */
345│ if (it->it_flags & ITEM_CHUNKED) {
346│ item_chunk *chunk = (item_chunk *) ITEM_schunk(it);
347│
348│ chunk->next = 0;
349│ chunk->prev = 0;
350│ chunk->used = 0;
351│ chunk->size = 0;
352│ chunk->head = it;
353│ chunk->orig_clsid = hdr_id;
354│ }
355├───> it->h_next = 0;
356│
357│ return it;
358│ }
5
conn_nreadの状態に設定される。
4189├───> ITEM_set_cas(it, req_cas_id);
4190│
4191│ c->item = it;
4192│ #ifdef NEED_ALIGN
4193│ if (it->it_flags & ITEM_CHUNKED) {
4194│ c->ritem = ITEM_schunk(it);
4195│ } else {
4196│ c->ritem = ITEM_data(it);
4197│ }
4198│ #else
4199│ c->ritem = ITEM_data(it);
4200│ #endif
4201│ c->rlbytes = it->nbytes;
4202│ c->cmd = comm;
4203│ conn_set_state(c, conn_nread);
4204│ }
2
drive_machineに再度戻ってきて、conn_nreadの処理が実行される。1週目はc->rlbytes > 0で特に何も実行されないが、2週目で再度case文が実行されて、c->rlbytesが0なので、complete_nreadが実行される
5680│ case conn_nread:
5681├───────────> if (c->rlbytes == 0) {
5682│ complete_nread(c);
5683│ break;
5684│ }
5685│
5686│ /* Check if rbytes < 0, to prevent crash */
5687│ if (c->rlbytes < 0) {
5688│ if (settings.verbose) {
5689│ fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);
5690│ }
5691│ conn_set_state(c, conn_closing);
5692│ break;
5693│ }
3 complete_nread (c=0x7ffff00109a0) at memcached.c:2785
ASCIIモードなので、 complete_nread_asciiが実行される。
2780│ static void complete_nread(conn *c) {
2781│ assert(c != NULL);
2782│ assert(c->protocol == ascii_prot
2783│ || c->protocol == binary_prot);
2784│
2785├───> if (c->protocol == ascii_prot) {
2786│ complete_nread_ascii(c);
2787│ } else if (c->protocol == binary_prot) {
2788│ complete_nread_binary(c);
2789│ }
2790│ }
4 complete_nread_ascii (c=0x7ffff00109a0) at memcached.c:1231
1224│ /*
1225│ * we get here after reading the value in set/add/replace commands. The command
1226│ * has been stored in c->cmd, and the item is ready in c->item.
1227│ */
1228│ static void complete_nread_ascii(conn *c) {
1229│ assert(c != NULL);
1230│
1231│ item *it = c->item;
1232│ int comm = c->cmd;
1233│ enum store_item_type ret;
1234│ bool is_valid = false;
1235│
1236├───> pthread_mutex_lock(&c->thread->stats.mutex);
1237│ c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;
1238│ pthread_mutex_unlock(&c->thread->stats.mutex);
1239│
1240│ if ((it->it_flags & ITEM_CHUNKED) == 0) {
1241│ if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {
1242│ is_valid = true;
1243│ }
1244│ } else {
1245│ char buf[2];
1246│ /* should point to the final item chunk */
1247│ item_chunk *ch = (item_chunk *) c->ritem;
1248│ assert(ch->used != 0);
対象のデータをキャッシュに保存するstore_item関数が実行される
1268│ if (!is_valid) {
1269│ out_string(c, "CLIENT_ERROR bad data chunk");
1270│ } else {
1271├─────> ret = store_item(it, comm, c);
1272│
5 store_item (item=0x7ffff45a3f70, comm=2, c=0x7ffff00109a0) at thread.c:689
682│ /*
683│ * Stores an item in the cache (high level, obeys set/add/replace semantics)
684│ */
685│ enum store_item_type store_item(item *item, int comm, conn* c) {
686│ enum store_item_type ret;
687│ uint32_t hv;
688│
689├───> hv = hash(ITEM_key(item), item->nkey);
690│ item_lock(hv);
691│ ret = do_store_item(item, comm, c, hv);
692│ item_unlock(hv);
693│ return ret;
694│ }
6 hash
MurmurHash3_x86_32関数が実行される。
69│ /* Definition modified slightly from the public domain interface (no seed +
70│ * return value */
71│ uint32_t MurmurHash3_x86_32 ( const void * key, size_t length)
72│ {
73├─> const uint8_t * data = (const uint8_t*)key;
74│ const int nblocks = length / 4;
75│
76│ uint32_t h1 = 0;
77│
78│ uint32_t c1 = 0xcc9e2d51;
79│ uint32_t c2 = 0x1b873593;
80│
81│ //----------
82│ // body
83│
84│ const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
85│
86│ for(int i = -nblocks; i; i++)
87│ {
88│ uint32_t k1 = getblock32(blocks,i);
89│
90│ k1 *= c1;
91│ k1 = ROTL32(k1,15);
92│ k1 *= c2;
93│
94│ h1 ^= k1;
95│ h1 = ROTL32(h1,13);
96│ h1 = h1*5+0xe6546b64;
97│ }
98│
99│ //----------
100│ // tail
101│
102│ const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
103│
104│ uint32_t k1 = 0;
105│
106│ switch(length & 3)
107│ {
108│ case 3: k1 ^= tail[2] << 16;
109│ case 2: k1 ^= tail[1] << 8;
110├─> case 1: k1 ^= tail[0];
111│ k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
112│ };
113│
114│ //----------
115│ // finalization
116│
117│ h1 ^= length;
118│
119│ h1 = fmix32(h1);
120│
121│ //*(uint32_t*)out = h1;
122│ return h1;
123│ }
7 getblock32 (i=-1, p=0x7ffff45a3fac) at murmur3_hash.c:50
44│ //-----------------------------------------------------------------------------
45│ // Block read - if your platform needs to do endian-swapping or can only
46│ // handle aligned reads, do the conversion here
47│
48│ static FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
49│ {
50├─> return p[i];
51│ }
52│
53│ //-----------------------------------------------------------------------------
7 rotl32 (x=787512756, r=15 ‘\017’) at murmur3_hash.c:35
33│ static inline uint32_t rotl32 ( uint32_t x, int8_t r )
34│ {
35├─> return (x << r) | (x >> (32 - r));
36│ }
37│
38│ #define ROTL32(x,y) rotl32(x,y)
7 fmix32 (h=2252073768) at murmur3_hash.c:58
54│ // Finalization mix - force all bits of a hash block to avalanche
55│
56│ static FORCE_INLINE uint32_t fmix32 ( uint32_t h )
57│ {
58├─> h ^= h >> 16;
59│ h *= 0x85ebca6b;
60│ h ^= h >> 13;
61│ h *= 0xc2b2ae35;
62│ h ^= h >> 16;
63│
64│ return h;
65│ }
66│
67│ //-----------------------------------------------------------------------------
68│
6 item_lock (hv=3155333867) at thread.c:104
6 item_unlock (hv=3155333867) at thread.c:120
95│ /* item_lock() must be held for an item before any modifications to either its
96│ * associated hash bucket, or the structure itself.
97│ * LRU modifications must hold the item lock, and the LRU lock.
98│ * LRU's accessing items must item_trylock() before modifying an item.
99│ * Items accessible from an LRU must not be freed or modified
100│ * without first locking and removing from the LRU.
101│ */
102│
103│ void item_lock(uint32_t hv) {
104├───> mutex_lock(&item_locks[hv & hashmask(item_lock_hashpower)]);
105│ }
119│ void item_unlock(uint32_t hv) {
120├───> mutex_unlock(&item_locks[hv & hashmask(item_lock_hashpower)]);
121│ }
6 do_store_item (it=0x7ffff45a3f70, comm=2, c=0x7ffff00109a0, hv=3155333867) at memcached.c:2897
2890│ /*
2891│ * Stores an item in the cache according to the semantics of one of the set
2892│ * commands. In threaded mode, this is protected by the cache lock.
2893│ *
2894│ * Returns the state of storage.
2895│ */
2896│ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
2897│ char *key = ITEM_key(it);
2898├───> item *old_it = do_item_get(key, it->nkey, hv, c, DONT_UPDATE);
2899│ enum store_item_type stored = NOT_STORED;
2900│
2901│ item *new_it = NULL;
2902│ uint32_t flags;
2903│
2904│ if (old_it != NULL && comm == NREAD_ADD) {
2905│ /* add only adds a nonexistent item, but promote to head of LRU */
2906│ do_item_update(old_it);
2907│ } else if (!old_it && (comm == NREAD_REPLACE
2908│ || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2909│ {
2910│ /* replace only replaces an existing value; don't store */
7 do_item_get (key=0x7ffff45a3fa8 "test1", nkey=5, hv=3155333867, c=0x7ffff00109a0, do_update=false) at items.c:955
953│ /** wrapper around assoc_find which does the lazy expiration logic */
954│ item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {
955│ item *it = assoc_find(key, nkey, hv);
956├───> if (it != NULL) {
957│ refcount_incr(it);
958│ /* Optimization for slab reassignment. prevents popular items from
959│ * jamming in busy wait. Can only do this here to satisfy lock order
960│ * of item_lock, slabs_lock. */
961│ /* This was made unsafe by removal of the cache_lock:
962│ * slab_rebalance_signal and slab_rebal.* are modified in a separate
963│ * thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =
964│ * NULL (0), but slab_end is still equal to some value, this would end
965│ * up unlinking every item fetched.
966│ * This is either an acceptable loss, or if slab_rebalance_signal is
967│ * true, slab_start/slab_end should be put behind the slabs_lock.
968│ * Which would cause a huge potential slowdown.
969│ * Could also use a specific lock for slab_rebal.* and
970│ * slab_rebalance_signal (shorter lock?)
971│ */
972│ /*if (slab_rebalance_signal &&
973│ ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
974│ do_item_unlink(it, hv);
975│ do_item_remove(it);
976│ it = NULL;
977│ }*/
978│ }
Lazy expirationですでにあるキー中のメモリは上書きされる。ここでは特に保存したこともないので、"> NOT FOUND ")が表示される。
979├───> int was_found = 0;
980│
981│ if (settings.verbose > 2) {
982│ int ii;
983│ if (it == NULL) {
984│ fprintf(stderr, "> NOT FOUND ");
985│ } else {
986│ fprintf(stderr, "> FOUND KEY ");
987│ }
988│ for (ii = 0; ii < nkey; ++ii) {
989│ fprintf(stderr, "%c", key[ii]);
990│ }
991│ }
6
2943│ } else {
2944│ int failed_alloc = 0;
2945│ /*
2946│ * Append - combine new and old record into single one. Here it's
2947│ * atomic and thread-safe.
2948│ */
2949├───────> if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2950│ /*
2951│ * Validate CAS
2952│ */
2953│ if (ITEM_get_cas(it) != 0) {
2954│ // CAS much be equal
2955│ if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2956│ stored = EXISTS;
2957│ }
2958│ }
do_item_linkが実行される。
2987├───────> if (stored == NOT_STORED && failed_alloc == 0) {
2988│ if (old_it != NULL) {
2989│ STORAGE_delete(c->thread->storage, old_it);
2990│ item_replace(old_it, it, hv);
2991│ } else {
2992│ do_item_link(it, hv);
2993│ }
2994│
2995│ c->cas = ITEM_get_cas(it);
2996│
2997│ stored = STORED;
2998│ }
2999│ }
7 do_item_link (it=0x7ffff45a3f70, hv=3155333867) at items.c:474
471│ int do_item_link(item *it, const uint32_t hv) {
472│ MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
473│ assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
474│ it->it_flags |= ITEM_LINKED;
475│ it->time = current_time;
476│
477│ STATS_LOCK();
478├───> stats_state.curr_bytes += ITEM_ntotal(it);
479│ stats_state.curr_items += 1;
480│ stats.total_items += 1;
481│ STATS_UNLOCK();
482│
483│ /* Allocate a new CAS ID on link. */
484│ ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
485│ assoc_insert(it, hv);
486│ item_link_q(it);
487│ refcount_incr(it);
488│ item_stats_sizes_add(it);
489│
490│ return 1;
491│ }
8 get_cas_id () at items.c:112
108│ /* Get the next CAS id for a new item. */
109│ /* TODO: refactor some atomics for this. */
110│ uint64_t get_cas_id(void) {
111│ static uint64_t cas_id = 0;
112├───> pthread_mutex_lock(&cas_id_lock);
113│ uint64_t next_id = ++cas_id;
114│ pthread_mutex_unlock(&cas_id_lock);
115│ return next_id;
116│ }
8 assoc_insert (it=0x7ffff45a3f70, hv=3155333867) at assoc.c:160
154│ /* Note: this isn't an assoc_update. The key must not already exist to call this */
155│ int assoc_insert(item *it, const uint32_t hv) {
156│ unsigned int oldbucket;
157│
158│ // assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */
159│
160├───> if (expanding &&
161│ (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
162│ {
163│ it->h_next = old_hashtable[oldbucket];
164│ old_hashtable[oldbucket] = it;
165│ } else {
166│ it->h_next = primary_hashtable[hv & hashmask(hashpower)];
167│ primary_hashtable[hv & hashmask(hashpower)] = it;
168│ }
169│
170│ MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
171│ return 1;
172│ }
8 item_link_q (it=0x7ffff45a3f70) at items.c:421
420│ static void item_link_q(item *it) {
421├───> pthread_mutex_lock(&lru_locks[it->slabs_clsid]);
422│ do_item_link_q(it);
423│ pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
424│ }
9 do_item_link_q (it=0x7ffff45a3f70) at items.c:397
393│ static void do_item_link_q(item *it) { /* item is the new head */
394│ item **head, **tail;
395│ assert((it->it_flags & ITEM_SLABBED) == 0);
396│
397│ head = &heads[it->slabs_clsid];
398│ tail = &tails[it->slabs_clsid];
399│ assert(it != *head);
400│ assert((*head && *tail) || (*head == 0 && *tail == 0));
401│ it->prev = 0;
402│ it->next = *head;
403├───> if (it->next) it->next->prev = it;
404│ *head = it;
405│ if (*tail == 0) *tail = it;
406│ sizes[it->slabs_clsid]++;
407│ #ifdef EXTSTORE
408│ if (it->it_flags & ITEM_HDR) {
409│ sizes_bytes[it->slabs_clsid] += (ITEM_ntotal(it) - it->nbytes) + sizeof(item_hdr);
410│ } else {
411│ sizes_bytes[it->slabs_clsid] += ITEM_ntotal(it);
412│ }
413│ #else
414│ sizes_bytes[it->slabs_clsid] += ITEM_ntotal(it);
415│ #endif
416│
417│ return;
418│ }
8 item_stats_sizes_add (it=0x7ffff45a3f70) at items.c:907
906│ void item_stats_sizes_add(item *it) {
907├───> if (stats_sizes_hist == NULL || stats_sizes_cas_min > ITEM_get_cas(it))
908│ return;
909│ int ntotal = ITEM_ntotal(it);
910│ int bucket = ntotal / 32;
911│ if ((ntotal % 32) != 0) bucket++;
912│ if (bucket < stats_sizes_buckets) stats_sizes_hist[bucket]++;
913│ }
6
3001├───> if (old_it != NULL)
3002│ do_item_remove(old_it); /* release our reference */
3003│ if (new_it != NULL)
3004│ do_item_remove(new_it);
3005│
3006│ if (stored == STORED) {
3007│ c->cas = ITEM_get_cas(it);
3008│ }
3009│ LOGGER_LOG(c->thread->l, LOG_MUTATIONS, LOGGER_ITEM_STORE, NULL,
3010│ stored, comm, ITEM_key(it), it->nkey, it->exptime, ITEM_clsid(it));
3011│
3012│ return stored;
3013│ }
4
retがSTOREDとなっている。
1303│ switch (ret) {
1304│ case STORED:
1305├─────────> out_string(c, "STORED");
1306│ break;
1307│ case EXISTS:
1308│ out_string(c, "EXISTS");
1309│ break;
1310│ case NOT_FOUND:
1311│ out_string(c, "NOT_FOUND");
1312│ break;
1313│ case NOT_STORED:
1314│ out_string(c, "NOT_STORED");
1315│ break;
1316│ default:
1317│ out_string(c, "SERVER_ERROR Unhandled storage type.");
5 out_string (c=0x7ffff00109a0, str=0x424f99 "STORED") at memcached.c:1171
conn_writeに設定される。
1166│ static void out_string(conn *c, const char *str) {
1167│ size_t len;
1168│
1169│ assert(c != NULL);
1170│
1171├───> if (c->noreply) {
1172│ if (settings.verbose > 1)
1173│ fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
1174│ c->noreply = false;
1175│ conn_set_state(c, conn_new_cmd);
1176│ return;
1177│ }
1178│
1179│ if (settings.verbose > 1)
1180│ fprintf(stderr, ">%d %s\n", c->sfd, str);
1181│
1182│ /* Nuke a partial output... */
1183│ c->msgcurr = 0;
1184│ c->msgused = 0;
1185│ c->iovused = 0;
1186├───> add_msghdr(c);
1187│
1188│ len = strlen(str);
1189│ if ((len + 2) > c->wsize) {
1190│ /* ought to be always enough. just fail for simplicity */
1191│ str = "SERVER_ERROR output line too long";
1192│ len = strlen(str);
1193│ }
1194│
1195│ memcpy(c->wbuf, str, len);
1196│ memcpy(c->wbuf + len, "\r\n", 2);
1197│ c->wbytes = len + 2;
1198│ c->wcurr = c->wbuf;
1199│
1200│ conn_set_state(c, conn_write);
1201│ c->write_and_go = conn_new_cmd;
1202│ return;
1203│ }
5 item_remove (item=0x7ffff45a3f70) at thread.c:638
hash関数は、先程と同様MurmurHash3_x86_32関数が実行される。
632│ /*
633│ * Decrements the reference count on an item and adds it to the freelist if
634│ * needed.
635│ */
636│ void item_remove(item *item) {
637│ uint32_t hv;
638├───> hv = hash(ITEM_key(item), item->nkey);
639│
640│ item_lock(hv);
641│ do_item_remove(item);
642│ item_unlock(hv);
643│ }
644│
6 do_item_remove (it=0x7ffff45a3f70) at items.c:529
524│ void do_item_remove(item *it) {
525│ MEMCACHED_ITEM_REMOVE(ITEM_key(it), it->nkey, it->nbytes);
526│ assert((it->it_flags & ITEM_SLABBED) == 0);
527│ assert(it->refcount > 0);
528│
529├───> if (refcount_decr(it) == 0) {
530│ item_free(it);
531│ }
532│ }
2
drive_machineに戻ってくる。c->stateはconn_write
5810│ case conn_write:
5811│ /*
5812│ * We want to write out a simple response. If we haven't already,
5813│ * assemble it into a msgbuf list (this will be a single-entry
5814│ * list for TCP or a two-entry list for UDP).
5815│ */
5816├───────────> if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
5817│ if (add_iov(c, c->wcurr, c->wbytes) != 0) {
5818│ if (settings.verbose > 0)
5819│ fprintf(stderr, "Couldn't build response\n");
5820│ conn_set_state(c, conn_closing);
5821│ break;
5822│ }
5823│ }
5824│
5825│ /* fall through... */
5826│
:
:
5845├─────────> if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5846│ if (settings.verbose > 0)
5847│ fprintf(stderr, "Failed to build UDP headers\n");
5848│ conn_set_state(c, conn_closing);
5849│ break;
5850│ }
3 add_iov (c=0x7ffff00109a0, buf=0x7ffff00113e0, len=8) at memcached.c:1048
1035│ *
1036│ * Returns 0 on success, -1 on out-of-memory.
1037│ * Note: This is a hot path for at least ASCII protocol. While there is
1038│ * redundant code in splitting TCP/UDP handling, any reduction in steps has a
1039│ * large impact for TCP connections.
1040│ */
1041│
1042│ static int add_iov(conn *c, const void *buf, int len) {
1043│ struct msghdr *m;
1044│ int leftover;
1045│
1046│ assert(c != NULL);
1047│
1048├───> if (IS_UDP(c->transport)) {
1049│ do {
1050│ m = &c->msglist[c->msgused - 1];
1051│
1052│ /*
1053│ * Limit UDP packets to UDP_MAX_PAYLOAD_SIZE bytes.
1054│ */
1055│
1056│ /* We may need to start a new msghdr if this one is full. */
1057│ if (m->msg_iovlen == IOV_MAX ||
1058│ (c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
1059│ add_msghdr(c);
1060│ m = &c->msglist[c->msgused - 1];
その後、ensure_iov_space関数が呼ばれる。ここでは特に何もしない。
1085│ } else {
1086│ /* Optimized path for TCP connections */
1087│ m = &c->msglist[c->msgused - 1];
1088│ if (m->msg_iovlen == IOV_MAX) {
1089│ add_msghdr(c);
1090│ m = &c->msglist[c->msgused - 1];
1091│ }
1092│
1093│ if (ensure_iov_space(c) != 0)
1094│ return -1;
1095│
1096├───────> m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
1097│ m->msg_iov[m->msg_iovlen].iov_len = len;
1098│ c->msgbytes += len;
1099│ c->iovused++;
1100│ m->msg_iovlen++;
1101│ }
1102│
1103│ return 0;
1104│ }
4 ensure_iov_space (c=0x7ffff00109a0) at memcached.c:1008
999│ /*
1000│ * Ensures that there is room for another struct iovec in a connection's
1001│ * iov list.
1002│ *
1003│ * Returns 0 on success, -1 on out-of-memory.
1004│ */
1005│ static int ensure_iov_space(conn *c) {
1006│ assert(c != NULL);
1007│
1008├───> if (c->iovused >= c->iovsize) {
1009│ int i, iovnum;
1010│ struct iovec *new_iov = (struct iovec *)realloc(c->iov,
1011│ (c->iovsize * 2) * sizeof(struct iovec));
1012│ if (! new_iov) {
1013│ STATS_LOCK();
1014│ stats.malloc_fails++;
1015│ STATS_UNLOCK();
1016│ return -1;
1017│ }
1018│ c->iov = new_iov;
1019│ c->iovsize *= 2;
1020│
3 transmit (c=0x7ffff00109a0) at memcached.c:5361
TRANSMIT_INCOMPLETEをreturnする。その後、抜けて、drive_machineからc->stateはconn_writeで再度この関数が実行される。その時は、TRANSMIT_COMPLETEをreturnする。
5349│ /*
5350│ * Transmit the next chunk of data from our list of msgbuf structures.
5351│ *
5352│ * Returns:
5353│ * TRANSMIT_COMPLETE All done writing.
5354│ * TRANSMIT_INCOMPLETE More data remaining to write.
5355│ * TRANSMIT_SOFT_ERROR Can't write any more right now.
5356│ * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
5357│ */
5358│ static enum transmit_result transmit(conn *c) {
5359│ assert(c != NULL);
5360│
5361├───> if (c->msgcurr < c->msgused &&
5362│ c->msglist[c->msgcurr].msg_iovlen == 0) {
5363│ /* Finished writing the current msg; advance to the next. */
5364│ c->msgcurr++;
5365│ }
5366│ if (c->msgcurr < c->msgused) {
5367│ ssize_t res;
5368│ struct msghdr *m = &c->msglist[c->msgcurr];
5369│
5370│ res = c->sendmsg(c, m, 0);
5371│ if (res > 0) {
5372│ pthread_mutex_lock(&c->thread->stats.mutex);
5373│ c->thread->stats.bytes_written += res;
5374│ pthread_mutex_unlock(&c->thread->stats.mutex);
5375│
5376│ /* We've written some of the data. Remove the completed
5377│ iovec entries from the list of pending writes. */
5378│ while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
5379│ res -= m->msg_iov->iov_len;
5380│ m->msg_iovlen--;
5381│ m->msg_iov++;
5382│ }
5383│
5384│ /* Might have written just part of the last iovec entry;
5385│ adjust it so the next write will do the rest. */
5386│ if (res > 0) {
5387│ m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
5388│ m->msg_iov->iov_len -= res;
5389│ }
5390├───────────> return TRANSMIT_INCOMPLETE;
5391│ }
5392│ if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5393│ if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5394│ if (settings.verbose > 0)
5395│ fprintf(stderr, "Couldn't update event\n");
5396│ conn_set_state(c, conn_closing);
5397│ return TRANSMIT_HARD_ERROR;
5398│ }
5399│ return TRANSMIT_SOFT_ERROR;
5400│ }
5401│ /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
5402│ we have a real error, on which we close the connection */
4 tcp_sendmsg (c=0x7ffff00109a0, msg=0x7ffff0013e80, flags=0) at memcached.c:167
165│ ssize_t tcp_sendmsg(conn *c, struct msghdr *msg, int flags) {
166│ assert (c != NULL);
167├───> return sendmsg(c->sfd, msg, flags);
168│ }
2
c->write_and_goにはconn_new_cmdが設定されており、c->stateはこの状態に設定される。
5845│ if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5846│ if (settings.verbose > 0)
5847│ fprintf(stderr, "Failed to build UDP headers\n");
5848│ conn_set_state(c, conn_closing);
5849│ break;
5850│ }
5851│ switch (transmit(c)) {
5852│ case TRANSMIT_COMPLETE:
5853├───────────────> if (c->state == conn_mwrite) {
5854│ conn_release_items(c);
5855│ /* XXX: I don't know why this wasn't the general case */
5856│ if(c->protocol == binary_prot) {
5857│ conn_set_state(c, c->write_and_go);
5858│ } else {
5859│ conn_set_state(c, conn_new_cmd);
5860│ }
5861│ } else if (c->state == conn_write) {
5862│ if (c->write_and_free) {
5863│ free(c->write_and_free);
5864│ c->write_and_free = 0;
5865│ }
5866├───────────────────> conn_set_state(c, c->write_and_go);
5867│ } else {
5868│ if (settings.verbose > 0)
5869│ fprintf(stderr, "Unexpected state %d\n", c->state);
5870│ conn_set_state(c, conn_closing);
5871│ }
5872│ break;
5873│
再びループでc->stateの状態に基づいた処理が実行され、ここでは、conn_new_cmd
5651│ case conn_new_cmd:
5652│ /* Only process nreqs at a time to avoid starving other
5653│ connections */
5654│
5655│ --nreqs;
5656├───────────> if (nreqs >= 0) {
5657│ reset_cmd_handler(c);
5658│ } else {
5659│ pthread_mutex_lock(&c->thread->stats.mutex);
5660│ c->thread->stats.conn_yields++;
5661│ pthread_mutex_unlock(&c->thread->stats.mutex);
5662│ if (c->rbytes > 0) {
5663│ /* We have already read in data into the input buffer,
5664│ so libevent will most likely not signal read events
5665│ on the socket (unless more data is available. As a
5666│ hack we should just put in a request to write data,
5667│ because that should be possible ;-)
5668│ */
3 reset_cmd_handler (c=0x7ffff00109a0) at memcached.c:2766
conn_waitingの状態に設定。
2765│ static void reset_cmd_handler(conn *c) {
2766├───> c->cmd = -1;
2767│ c->substate = bin_no_state;
2768│ if(c->item != NULL) {
2769│ item_remove(c->item);
2770│ c->item = NULL;
2771│ }
2772│ conn_shrink(c);
2773│ if (c->rbytes > 0) {
2774│ conn_set_state(c, conn_parse_cmd);
2775│ } else {
2776│ conn_set_state(c, conn_waiting);
2777│ }
2778│ }
2
update_event関数ではtrueが返され、conn_readが実行される。stop変数のフラグにtrueが設定されたことによりwhile文のループは終了する。
5612│ case conn_waiting:
5613├───────────> if (!update_event(c, EV_READ | EV_PERSIST)) {
5614│ if (settings.verbose > 0)
5615│ fprintf(stderr, "Couldn't update event\n");
5616│ conn_set_state(c, conn_closing);
5617│ break;
5618│ }
5619│
5620│ conn_set_state(c, conn_read);
5621│ stop = true;
5622│ break;
5623│
事前準備
Memcached
デバッグ用にビルドしておく。また、-O2で最適化しているとgdbの際に順にコードを追っていくと困難になるので-O0に変更しておく。そうでないと、デバッグの際に、順番に実行してもソースコード中の動きがばらばらになる。
$ git clone https://github.com/memcached/memcached.git
$ cd memcached
$ ./autogen.sh && ./configure && make CFLAGS="-g -O0" && sudo make install
cgdb
$ git clone https://github.com/cgdb/cgdb.git
$ cd cgdb
$ ./autogen.sh
$ CXXFLAGS='-std=c++11' ./configure --prefix=/usr/local
$ make
$ sudo make install
-std=c++11をつけないと、make時に"kui.cpp:310:15: error: ‘it’ does not name a type"のエラー https://github.com/cgdb/cgdb/issues/184
デバッグ
MemcachedはLibeventを利用しており、コマンド実行時に呼ばれる関数はevent_handlerを起点とする。そのため、ここにブレークポイントを打っておくと良い。
$ cgdb memcached
(gdb) b main
(gdb) b event_handler
(gdb) run -u ec2-user -vvv
event_base_loop の前まで実行
SETコマンド実行時、ここで別ターミナルから $ telnet localhost 11211 を実行。以下のコマンドを実行。
set test1 0 0 7
sample1
スレッド3に切り替えて、nextを続けていると、先程ブレークポイントを設定しておいたevent_handler関数で止まる。
(gdb) info threads
(gdb) thread 3
(gdb) next
:
:
繰り返し処理が多く、ループを飛ばすには以下を実行
(gdb) b linenum
(gdb) c
(gdb) del breaknum
定数確認にはシェルでgrep
(gdb) shell grep -r ITEMS_PER_ALLOC /tmp/memcached