Dive Deep Memcached ~ 入門から実装の確認まで ~

Redis本を執筆しました。Memcachedとの詳細な比較も付録で取り上げていますので、こちらも是非合わせてご覧ください。

---------------------------------------------------------------------------------------------------------------------------------------------------------

Memcached

Memcachedは以下の特徴を持ったインメモリのKVSとなります。

  • シンプル
  • マルチスレッド処理
  • ロジックの半分はサーバで、半分はクライアント
  • サーバ同士では通信は行わず、互いに独立
  • コマンドはO(1)
  • TTLに従った失効と、LRUによるeviction
  • libeventによるイベント駆動処理

なお、合わせてMemcachedのソースコードの各種ファイルの概要についての詳細は以下の記事をご覧ください。

MemcachedのSETコマンド実行時のソースコード中のフローの詳細については、以下の記事をご覧ください。

レポジトリはこちらにあります。

コマンド

一覧

  • コマンド 24個 (ASCIIモード。バイナリモードの場合は後述)
  1. set
  2. add/replace
  3. append/prepend
  4. get/gets
  5. gat/gats
  6. delete
  7. flush_all
  8. incr/decr
  9. cas
  10. touch
  11. stats
  12. slabs
  13. lru <tune|mode|temp_ttl> <option list>
  14. lru_crawler
  15. watch <fetchers|mutations|evictions>
  16. version
  17. verbosity
  18. misbehave
  19. quit
  • 統計確認

    • stats
    1. stats items
    2. stats slabs
    3. stats sizes
    4. stats detail on|off|dump
    5. stats settings
    6. stats cachedump
    7. stats conns
    8. stats extstore
    9. stats reset
  • Slabs

    1. slabs reassign <source class> <dest class>\r\n
    2. slabs automove <0|1|2>
  • LRU Crawler

    1. lru_crawler <enable|disable>
    2. lru_crawler sleep <microseconds>
    3. lru_crawler tocrawl <32u>
    4. lru_crawler crawl <classid,classid,classid|all>
    5. lru_crawler metadump <classid,classid,classid|all>

実行例

set hoge 0 900 4
fuga
STORED
get hoge
VALUE hoge 0 4
fuga
END
add hoge1 0 900 5
fuga1
STORED
replace hoge1 0 900 5
fuga2
STORED
append hoge1 0 900 4
test
STORED
prepend hoge1 0 900 6
sample
STORED
get hoge1      
VALUE hoge1 0 15
samplefuga2test
END

incr/decrは引数で与えた数値の分だけ値を増加/減少させる。

set hoge2 0 0 2
10
STORED
incr hoge2 3
13
decr hoge2 4
9

あるユーザーがデータを取得してから書き換えるまでに、他のユーザーによって値が書き換えられていないことを確認するためには、getsでCAS IDを取得しておき、casコマンドでCAS IDを添えて実行することで、書き換えられてないことを確認しながら値を設定できる。CAS(Check-and-Set, Compare-and-Swap)の略。

set hoge2 0 0 5
fuga2
STORED
gets hoge2
VALUE hoge2 0 5 13
fuga2
END
cas hoge2 0 0 5 13
piyo2
STORED

Memcachedのflush_allは、TTLを付与するだけなので、サーバーに影響は与えない。

delete hoge2
DELETED
flush_all
OK

gat(Get And Touch)コマンド、gets、touchコマンドはTTLの更新。ASCIIモードでは1.5.3で追加されました。バイナリモードでは1.4.8(1.6からのバックポート)で利用できるようになっています。

set hoge1 0 900 5
fuga1
STORED
touch hoge1 60
TOUCHED
gat 700 hoge1
VALUE hoge1 0 5
fuga1
END
gats 800 hoge1
VALUE hoge1 0 5 1
fuga1
END

メモリ管理

Slab Allocation

動作

Slab Allocationとは、予め決められた大きさごとにSlab Classというグループを用意して、ある大きさのメモリーをSlab class毎に決められた大きさをChunkとして分割していきます。

アイテムを保存する際には、そのサイズより大きい、最も小さいChunkのサイズをもったSlab classに保存されます。Slab classの対象のページに空きChunkがあれば保存され、なければメモリープール(-mで指定したバイト数)からそのSlab class用にページ(-Iで指定したバイト数)を確保する動作となります。

確保したメモリは基本的に解放せずにchunkを再利用する挙動になります。

Chunkの様子は、memcachedコマンドの-vvオプション、もしくは-vvvオプションで確認することができます。また、付属のmemcached-toolを利用することでも確認することができます。

Slab classのChunkのサイズ

Chunkの大きさについては、最小のChunkのサイズ(chunk_size)からSlab Classが変わる毎に、前のクラスの大きさからchunk_size_growth_factorを掛けた大きさとなっていきます。 Chunkのサイズがmax_item_sizeの値を超えない最も大きいところまでクラスが用意されることになります。 なお、データとして利用できるメモリー量は、Memcached全体として利用できるメモリー(max_cache_memory)と接続に伴うオーバーヘッド(memcached_connections_overhead)を除いた分となり、そちらも考慮されます。

背景

Slab Allocation導入前のメモリ確保では、アイテムの保存の度に、mallocとfreeを行うもので、メモリーフラグメンテーションを引き起こしやすいものでした。そのため、フラグメンテーションの対策として、アプリケーションレイヤーでメモリーの管理しながらメモリーを再利用するようになりました。

上記より、メリット/デメリットとして以下のことが挙げられます。

  • メリット : メモリー再利用によるメモリーフラグメンテーションの抑制
  • デメリット : 固定長による無駄なメモリーを消費することがある

Redisではメモリ確保で頻繁にmallocの呼び出しを行っており、OS内部の状況によってはレスポンスの処理速度にブレが出てくる可能性が大きくなっていましたが、Memcachedではメモリ管理はOSに依存しすぎず自身でメモリ管理を行うようにし、フラグメンテーション肥大化防止のために取られたSlab Allocationで予め用意したスラブのメモリにデータを保存するので、レスポンスに安定感が期待できます。

Linux kernelのSlab Allocation

Linux kernelでもSlab Allocationという仕組みがあり、これはページ単位でメモリ確保を行うBuddy allocationとは別に、より小さな単位でメモリを管理を行うものとなっております。

Linuxでは3つの種類があり、現在は主にデフォルトでslub allocatorがデフォルトで使用されるようになっています。

  • slub allocator
  • slab allocator
  • slob allocator

Slab Allocator

Linuxカーネル4.1のSLUBアローケータ(ドラフト) https://kernhack.hatenablog.com/entry/2019/05/10/001425

LRU

Memcachedには、Redis同様にアイテムの削除はActive/Passiveな方法があり、以下の方法となります。

  • Active: Lazy Expiration
  • Passive: LRU Crawler

ただし、メモリーの再利用が間に合わず、メモリー確保の際に不足した場合には、削除対象としてLRUの挙動に従ってevictionする挙動となります。 MemcachedのLRUは、Slab class単位となるため、全体としてみるとLRUの挙動から離れることもあります。 また、MemcachedはTTLの切れたキーをevictionするようにベストエフォートで行われますが、キューの末尾の削除されることもあります。

管理方法

1.4以前

双方向Linked Listを使用。 この構成では8スレッドまでが限界で、read intensiveな状況で48スレッドまででLRUロックの都合上スケールの限界。 また、一度に数百のキーを取り出すと、mutex競合や、不均一なレイテンシー、CPUを多く使用する状況が発生した。

最適化として60秒に1回、アイテムをbumpさせ、頻繁にアクセスするアイテムのmutexロックやmutationを避ける工夫がされていました。

それ以降

サブLRUとしてHOT/WARM/COLDのようにセグメント化されたLRUを使用。 各セグメントが自身のmutexロックを持ちます。

各アイテムの状態は、以下の2つに分類されます。

  • FETCHED : キーがリクエストされたことがある。
  • ACTIVE : 1秒以内にアクセスされ、アイテムがはじき出されたか移動するときに取り除かれる

各セグメントの状態は、以下の通りとなります。これらの状態遷移の管理は、LRU Maintainerが1つのバックグラウンドスレッドとして実行します。

  • HOT : 新規アイテムが最初に分類される状態。アイテムは強い局所性と短いTTLを持つことが多いので、一度キューの最後まで達すると、WARMでアイテムがbumpされず、アイテムがACTIVEであればWARMへ移動し、INACTIVEであればCOLDに移動する。確率的なキューとして振る舞う。
  • WARM : ワークロードをスキャンするバッファとして振る舞う。2度以上アクセスされた場合にこの状態になる。ロックの競合を減らしている状態であればTTLだけ生き残る。キューの末尾まで移動したアイテムがACTIVEであればキューの先頭に移動。そうでなければCOLDに移動
  • COLD : 最もINACTIVEなアイテムを含む。HOT/WARMの状態から移動してくる。メモリーがいっぱいのときはevictされる。

また、TEMPセグメントもあります。

  • TEMP : 新規アイテムが分類できる状態。TEMPに入れられたアイテムは指定した時間だけbumpさせないように設定することができる。

上記のままでは、アクセスパターンに一貫性がない場合、キューの真ん中の状態のアイテムが失効しているとサイズが特に大きな場合にはスペースが無駄になり、サイジングが難しい。

それを克服するためにLRU Crawlerというバックグラウンドスレッドにより、非同期にアイテムを一通り確認し、失効しているものはreclaimする。 Slab Classが大きいほど、アイテム数が少ないので、すばやくスキャンできる。 LRU CrawlerはHOT状態はスキャンする価値がないことを知っていることもある一方で、HOTに大量の短いTTLのキーがあれば、比較的大きなCOLDをスキャンすることを避けている間は、消去することもある。 10秒以上TTLのある1MBのアイテムは参照されるまで、LRU Crawlerは見ない動作をします。

HOT/WARM/COLD LRUのメリット/デメリット

  • メリット

    • LRUロックの影響が小さい : アイテムが直接取り出したときにLRUのキューから他のアイテムがdumpされない。一度に数百のキーでも取り出せる。
    • アイテムが非同期にbump : 短時間のSET/GETはHOT/WARMのバランスの悪さが発生
    • 追加のmutexでwrite操作のスケール
    • アイテムあたりのメタデータサイズが増加しない
  • デメリット

    • あるSlab classで大量のSETがあったときに、COLDがオーバーフローし、“direct reclaim”モードが発生する。HOTやWARMからアイテムを移動している間、ワーカースレッドがブロックされる。
    • TTLの異なるアイテムが混在すると、アイテムが取り出されるかキューの末尾に移動するまで短いTTLのメモリーが無駄になる

Replacing the cache replacement algorithm in memcached - Dormando (October 15, 2018)

Memcached for Dummies

https://work.tinou.com/2011/04/memcached-for-dummies.html

memcachedを知り尽くす http://gihyo.jp/dev/feature/01/memcached

キー分散

Memcachedでは、クラスター内の各ノードはそれぞれ独立しており、互いにレプリケーションといった挙動はしません。キャッシュノード間で、対象のキーが保存されたノードの指定は、クライアント側に依存しています。クライアント側からのキーの分散方法としては、主に以下の2つの方法が考えられます。

  • Modulo
  • Consistent Hashing

Modulo

単純なハッシュ計算で hash(key) mod n によりノードを決めます。 ノードの追加/削除時に、移動する必要があるキーの数は、n は新しいノード数として、(n - 1)/n となります。

Consistent Hashing

円上の0から2^32までの均等な点に対して、各キャッシュノードのハッシュ値を求め、配置します。キーに対しても同様にハッシュ値を求め、キーの保存の際には最も近いサーバのところに保存されます。 ノードの追加、削除の際には、そのノードに入れ替えに伴い、アイテムを移動させます。

ノードの追加/削除時に、移動する必要のあるキーの数は約 1/nとなります。

イベント駆動処理

Memcachedのイベント駆動処理には、libeventが利用されています。 主に memcached.c/thread.c で使われており、以下の種類の関数が利用されています。

event_init() : event API の初期化。event APIを利用する前にはこの関数を実行する必要がある。

evtimer_set(&maxconnsevent, maxconns_handler, 0) タイマーイベントの設定。タイムアウトが必要な場面のみで使用
evtimer_add(&maxconnsevent, &t) : タイマーイベントの追加
evtimer_del(&maxconnsevent) : タイマーイベントの削除

event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c) : event構造体の準備。現在は、新規コードでは多くは安全に利用するためにはevent_base_setが必要となり推奨されていない。代わりに、event_assign(), event_new()を使うことが推奨されている。
event_base_set(main_base, &maxconnsevent) : eventと異なるevent baseの関連付け
event_add(&c->event, 0) : イベントの実行をスケジュール
event_del(&c->event) : イベントの削除

event_config_new() : イベント設定オブジェクトの割当て。event baseの挙動を変更することに利用可能
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK): event baseのどの部分を結果的に初期化するか、挙動をフラグで設定
event_base_new_with_config(ev_config) : event baseの初期化
event_config_free(ev_config) : 新しいevent baseをevent_base_new_with_configで取得後、event_configをこの関数で解放する必要がある
event_base_loop(main_base, 0) : イベント処理
event_base_free(main_base) : event baseに割り当てられた全メモリの割当てを解放し、baseを解放

event_get_version() : libeventのバージョン取得

関数の詳細は、ソースコード

libevent/include/event2/event.h at master · libevent/libevent · GitHub

Event notification library. Contribute to libevent/libevent development by creating an account on GitHub.

https://github.com/libevent/libevent/blob/master/include/event2/event_compat.h

event.h File Reference

http://www.wangafu.net/~nickm/libevent-2.0/doxygen/html/event_8h.html

EVENT(3) http://man7.org/linux/man-pages/man3/event.3.html

libevent – an event notification library https://libevent.org/

Memcachedのマルチスレッド処理

POSIXスレッド(pthread)

Memcachedではマルチスレッド処理にpthreadが利用されており、以下の関数が使用されています。

pthread_mutex_init(&e->io_threads[i].mutex, NULL) : mutexロックの初期化
pthread_create(&thread, NULL, extstore_io_thread, &e->io_threads[i]) : スレッド生成

pthread_cond_init(&e->io_threads[i].cond, NULL) : 条件変数の初期化
pthread_cond_signal(&e->maint_thread->cond) : 条件変数のシグナル
pthread_cond_wait(&me->cond, &me->mutex) : 条件変数で待機

pthread_mutex_lock(x) : mutexロックの獲得(ブロック)
pthread_mutex_trylock(&slabs_rebalance_lock) : mutexロックの獲得(ノンブロック)
pthread_mutex_unlock(x) : mutexロックの解放

pthread_join(logger_tid, NULL) : 現在のスレッドを他のスレッドが終了するまでブロック
pthread_key_create(&logger_key, NULL) : スレッドローカルデータと関連付けるキーを生成
pthread_getspecific(logger_key) : キーを指定してスレッドローカルデータを検索
pthread_setspecific(logger_key, l) : キーとスレッドローカルデータの関連付け

pthread_mutex_destroy(&cache->mutex) : mutexロックの破棄

pthread_once(&crc32c_once_sw, crc32c_init_sw) : 登録された関数はプロセスで必ず1回だけ呼ばれる事を保証
pthread_self() : 自身のスレッドID取得

処理内容

デフォルト状態でworkerスレッドの起動数が4つのため、以下のようにスレッドIDが順番に割り当てられていき、workerスレッド間はリクエストをラウンドロビンで割り当てられます。特に連続した処理でなければ、手元の環境ではコマンド処理がThread 3で実行されることが多いことを確認しています。

Thread  1		: event_base_loop : mainスレッド
Thread  2		: logger
Thread  3,4,5,6	: memcached_thread : workerスレッド, デフォルトで4つ
Thread  7		: assoc_maintenance
Thread  8		: item_crawler
Thread  9		: lru_maintainer
Thread 10		: slab_maintenance

Redisのリクエストのシングルスレッドで処理する性質とは対照的に、Memcachedではリクエストをマルチスレッドで処理を行い、デフォルトでは4つのworkerスレッド(memcached_thread)を起動するようになっています。 ワーカースレッドが多すぎてもロックの取り合いで、パフォーマンスが上がるわけではありません。

Memcachedは1つの大きなHash Tableでデータを管理しているが、拡張の際に、assoc_maintenanceスレッドが使用されている。それ以外のスレッドについては後述。

info threadsの出力例

(gdb) info threads
  Id   Target Id         Frame
  10   Thread 0x7fffe6ffd700 (LWP 4892) "memcached" pthread_cond_wait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv
/linux/x86_64/pthread_cond_wait.S:185
  9    Thread 0x7fffe77fe700 (LWP 4800) "memcached" 0x00007ffff766fe9d in nanosleep () at ../sysdeps/unix/syscall-
template.S:81
  8    Thread 0x7fffe7fff700 (LWP 4730) "memcached" pthread_cond_wait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv
/linux/x86_64/pthread_cond_wait.S:185
  7    Thread 0x7ffff4da5700 (LWP 4676) "memcached" pthread_cond_wait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv
/linux/x86_64/pthread_cond_wait.S:185
  6    Thread 0x7ffff55a6700 (LWP 4500) "memcached" 0x00007ffff76a94f3 in epoll_wait () at ../sysdeps/unix/syscall
-template.S:81
  5    Thread 0x7ffff5da7700 (LWP 4499) "memcached" 0x00007ffff76a94f3 in epoll_wait () at ../sysdeps/unix/syscall
-template.S:81
  4    Thread 0x7ffff65a8700 (LWP 4498) "memcached" 0x00007ffff76a94f3 in epoll_wait () at ../sysdeps/unix/syscall
-template.S:81
  3    Thread 0x7ffff6da9700 (LWP 4497) "memcached" 0x00007ffff76a94f3 in epoll_wait () at ../sysdeps/unix/syscall
-template.S:81
  2    Thread 0x7ffff75aa700 (LWP 3099) "memcached" 0x00007ffff766fe9d in nanosleep () at ../sysdeps/unix/syscall-
template.S:81
* 1    Thread 0x7ffff7fe7740 (LWP 2871) "memcached" main (argc=4, argv=0x7fffffffe198) at memcached.c:8154

ロック種類

Memcachedではスレッドセーフな実装のために以下のロックを使い分けています。

  • tls.c

    • ssl_ctx_lock
  • logger.h/logger.c

    • mutex in _logger struct
    • logger_stack_lock
    • logger_atomics_mutex
  • extstore.c

    • mutex in _store_page
    • mutex in store_io_thread
    • mutex in store_maint_thread
    • mutex in store_engine
    • stats_mutex in store_engine
  • slabs.c

    • slabs_lock
    • slabs_rebalance_lock
  • cache.h

    • mutex in cache_t
  • memcached.h/memcached.c

    • mutex in thread_stats
    • conn_lock
  • storage.c

    • storage_write_plock
    • storage_compact_plock
    • lock in storage_compact_wrap
  • assoc.c

    • maintenance_lock
  • crawler.h/crawler.c

    • lock in crawler_expired_data
    • lru_crawler_lock
  • thread.c

    • lock in conn_queue
    • lru_locks[POWER_LARGEST]
    • conn_lock
    • atomics_mutex
    • stats_lock
    • worker_hang_lock
    • cqi_freelist_lock
    • item_locks
    • init_lock
  • items.h/items.c

    • lru_locks[POWER_LARGEST]
    • lru_maintainer_lock
    • cas_id_lock
    • stats_sizes_lock
    • mutex in _lru_bump_buf
    • bump_buf_lock

以下も参考にしていただけるかと存じます。

Multithreading in memcached was originally simple: https://github.com/memcached/memcached/blob/master/doc/threads.txt

スレッド内容

Slabs Automove

Slab class間でメモリを移動するバックグラウンドスレッド。以下のコマンドで実行。slab_maintenanceスレッドで実行。

slabs automove <0|1|2>
  • 0: Slabs Automoveのスレッドの無効化
  • 1: Slab class中で、空きchunkが3ページ以上あるときに、ページをグローバルプールに返す。ページは必要に応じて他のSlab classに再割当てが行われるようになる。
  • 2: evictionがある度にメモリを移動するという非常にアグレッシブな方法。ワークロードのアクセスパターンをよく理解していない限り、こちらを有効にしたままにすることは推奨されない。

Slabs Reassign のコマンドにより手動でSlab class間でメモリの再分散させることも可能。その場合、パラメータ slab_reassign を有効にしておく必要がある。

LRU Tuning

  • ~1.4.14: 以前のLRUの管理方法は異なり、1つの双方向Linked Listにより、末尾からLRUで追い出される対象。
  • 1.4.24~: LRUをHOT,WARM,COLDのセグメントに分割して、lru_maintainerがバックグラウンドスレッドで、アイテムをセグメント間で移動させる。COLDがevictionの対象となります。HOT/WARMのセグメントの領域の割合はチューニング可能。lru_maintainer, lru_crawlerがデフォルトで無効
  • 1.4.33~: lru_maintainer, lru_crawlerがデフォルトで有効

temporary_ttlで4つ目のTEMP領域を作ることも可能。TEMP領域でLRU内のアイテムを追い出したり、他のSlab classに移動することを防ぎ、LRU crawlerの問題や負荷を減らす。

詳細は以下をご覧ください。 https://github.com/memcached/memcached/blob/master/doc/new_lru.txt

LRU Crawler

要求されたSlab classの末尾から先頭に向けてアイテムを確認していき、失効したものがあればメモリを解放します。TTLで長いものと短いものが混在している状況だが滅多にアクセスされない場合に特に有効です。こちらの機能を使用するとレイテンシーやCPU使用量が増加する可能性があります。lru_crawlerのパラメータでLRU Crawlerは有効になります。ソースコード中では、スレッド名はitem_crawlerという名前が使われています。

  • メリット: ActiveにTTLが切れたアイテムのメモリーを解放することができる。
  • デメリット: レイテンシーとCPU使用率の増加

Watchers

Memcachedへ接続し内部で起こっていることの調査に利用できる。以下のようにして有効化する。loggerスレッドで実行されている。

watch <fetchers|mutations|evictions>
  • fetchers : アイテムを内部でfetchする度にログ出力。setコマンドでもアイテムの置き換え前にitem_getするので出力する。Multigetsだと複数回出力
  • mutations : アイテムを保存するほとんどの場合に出力
  • evictions : キャッシュからアイテムがevictionされたときに情報を出力。

仕組み

Memcachedでは1つのMain baseとデフォルトで4つのWorker Threadのthread baseがあり、処理が行われます。 Slabという低レイヤーでのメモリー管理とItemという高レイヤーでの管理の層に分けて管理を行い、それらをコネクションを管理する部分を通してこれらを扱います。 また、Memcachedでは処理中、内部で状態遷移の仕組みを使って各状態における処理完了後に別の状態に遷移して条件に応じた処理を行っています。

状態遷移

Memcachedで利用される状態遷移の各状態は以下のものがあります。

  • conn_listening
  • conn_new_cmd
  • conn_waiting
  • conn_read
  • conn_parse_cmd
  • conn_write
  • conn_nread
  • conn_swallow
  • conn_closing
  • conn_mwrite
  • conn_closed
  • conn_watch
  • conn_max_state

https://github.com/memcached/memcached/blob/master/memcached.h#L171-L191 上記状態は、conn_statesのenumとして、memcached.hで定義されています。

enum conn_states {
    conn_listening,  /**< the socket which listens for connections */
    conn_new_cmd,    /**< Prepare connection for next command */
    conn_waiting,    /**< waiting for a readable socket */
    conn_read,       /**< reading in a command line */
    conn_parse_cmd,  /**< try to parse a command from the input buffer */
    conn_write,      /**< writing out a simple response */
    conn_nread,      /**< reading in a fixed number of bytes */
    conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
    conn_closing,    /**< closing this connection */
    conn_mwrite,     /**< writing out many items sequentially */
    conn_closed,     /**< connection is closed */
    conn_watch,      /**< held by the logger thread as a watcher */
    conn_max_state   /**< Max state value (used for assertion) */
};

リクエスト受付時

処理の流れとしては以下の通りとなります。

  1. リクエスト受付時にはMain baseでサーバソケット上でListenしてlibevent経由でWorker Threadにコネクションを渡します。入ってきた接続はラウンドロビンで各Worker Threadのコネクションのキューへと入れます。
  2. コネクションキューに新規アイテムが追加されるとreadイベントがアクティベートされ、Worker Thread上でコールバック関数(event_handler)が呼ばれます。
  3. event_handler関数からdrive_machine関数が実行され、ここでコネクションにおける状態遷移等を管理しながら、各種コマンドの実行の呼び出し等を行います。この仕組みにより1つのWorker Thread上でも複数のコネクションを処理することができます。

コマンド処理時

main baseでの処理

main baseでのイベントループの処理について、リクエストを受け付ける際には、最初にdrive_machine関数が実行し、コネクションの状態(conn_state)がconn_listeningになっています。ここでコネクションの受付とworker threadの1つのコネクションキューへタスクの追加を行います。

Worker Threadのbaseでの処理

ここではGETコマンド処理の例とします。

  1. Worker Thread上では、ソケット受付時のreadイベント作成時、コネクションキューからタスクを取り出し後、conn_stateの状態をconn_new_cmdに変更します。
  2. その後、コネクションの枯渇を避けるために少しずつ様子を見ながら、conn_stateをconn_waitingへ変更。
  3. conn_stateをconn_readへと変更します。
  4. conn_stateがconn_readへと変更されたら、ソケットからの読み込みを行い、conn_stateをconn_parse_cmdへと変更します。
  5. conn_stateがconn_parse_cmdと変更されたら、入力されたコマンドのパース処理を行い、入力された命令毎に関数の実行を行います。
  6. その後、conn_stateがconn_mwriteへ遷移し、クライアントへリクエストされたアイテムを返す準備ができる状態となります。
  7. conn_stateがconn_mwriteへ変更されたらsendmsgのシステムコール関数を使ってクライアントへ結果を返します。
  8. conn_stateをconn_new_cmdへ変更し、新規コマンドを受け付ける状態となります。

SETコマンド処理も類似した内容となります。読み込み完了時にはcomplete_nread関数から(ASCIIモードを利用時には)complete_nread_ascii関数が実行され、ここでout_string関数を通して"STORED"の文字が返される挙動となります。

フロー

リクエスト受付処理

TCP/UDP利用時

ソケット処理時にはserver_socket関数を通してソケット関数の初期化が実行されます。具体的には以下の流れとなります。

  1. memsetでaddrinfo構造体の初期化
  2. TCPかUDPで条件分岐
    • 2-1. TCPの場合、socktypeをSOCK_STREAM
    • 2-2. UDPの場合、socktypeをSOCK_DGRAM
  3. getaddrinfo関数
  4. new_socket関数
  5. TCPかUDPで条件分岐
    • 2-1. TCPの場合、setsocketops関数
    • 2-2. UDPの場合、maximize_sndbuf関数
  6. bind関数
  7. listen関数
  8. TCPかUDPで条件分岐
    • 2-1. TCPの場合、conn_new関数
    • 2-2. UDPの場合、dispatch_conn_new関数
  9. listen_conn構造体の設定

Unix Socket

Unix Socketを利用する場合は、server_socket_unix関数を通して初期化が実行されます。クライアントとサーバが同一ホスト上にある場合はこちらを利用することで効率よく処理ができます。

  1. socket関数からnew_socket関数を実行
  2. fcntl関数でノンブロッキングO_NONBLOCKの設定
  3. 古いソケットファイルが残っている場合は、unlink関数を実行して削除
  4. flag変数の値がゼロ以外の場合は、setsockopt関数を実行して、アドレスをbindで再利用し、定期的なKeep_aliveメッセージを再送信し、送信されていないメッセージがある場合には、ソケットがクローズさせないようにする
  5. bind関数
  6. listen関数
  7. conn_new関数

コマンド実行時

conn_stateがconn_read時には以下のフローで関数が実行されます。

  1. try_read_command関数
  2. process_command関数
  3. 実行するコマンドに応じて条件分岐
    • 3-1. get/gets 4. process_get_command関数 5. item_get関数 6. add_iov関数 7. item_update関数(process_get_commandでtrue時) 8. conn_stateをconn_mwriteへ変更
    • 3-2. add/set/replace, prepend/append, cas 4. process_update_command 5. item_alloc 6. CAS IDの設定(process_get_commandでtrue時) 7. conn_stateをconn_nreadへ変更
    • 3-3. incr/decr 4. process_arithmetic_command関数 5. add_delta関数
    • 3-4. flush_all 4. item_flush_expired関数
    • 3-5. delete 4. process_delete_command関数
    • 3-6. stats 4. process_stat関数
    • 3-7. quit 4. conn_stateをconn_closingへ変更

O(1)

Memcachedのコマンドは計算量がO(1)、すなわちキーの数によらず実行時間が一定になるようにされています。一方で、全キーを操作することが出来ないデメリットもありました(*)。

(*) Memcached 1.4.18~では、LRU Crawlerを有効にしていれば、lru_crawler metadump all で取得することができます。 デバッグインターフェイスとして stats cachedump が用意されており取得可能ですが、スラブクラスを指定して取得する形となります。また、部分的な取得(上限2MB)で、かつメモリーをロックする上に取得も重いものとなります。

No reply/Quiet

No reply/QuietオプションはASCIIコマンドで利用すると、リクエストに対するエラーが適切に表示されないため使うべきではないとされています。 mutativeなコマンドでパケットが返ってくるのを待つのを避ける必要がある場面で利用することはできます。 バイナリモードでは適切に実装されています。

Commands https://github.com/memcached/memcached/wiki/Commands

stats

Memcachedでは、サーバ内部の状態を調査するツールとしてstatsコマンドを提供しています。以下コマンドの実行結果の例となります。

stats

STAT pid 24875
STAT uptime 513
STAT time 1557149463
STAT version 1.5.14
STAT libevent 2.0.21-stable
STAT pointer_size 64
STAT rusage_user 26.598229
STAT rusage_system 86.178184
STAT max_connections 1024
STAT curr_connections 2
STAT total_connections 66
STAT rejected_connections 0
STAT connection_structures 34
STAT reserved_fds 20
STAT cmd_get 0
STAT cmd_set 600000
STAT cmd_flush 3
STAT cmd_touch 0
STAT get_hits 0
STAT get_misses 0
STAT get_expired 0
STAT get_flushed 0
STAT delete_misses 0
STAT delete_hits 0
STAT incr_misses 0
STAT incr_hits 0
STAT decr_misses 0
STAT decr_hits 0
STAT cas_misses 0
STAT cas_hits 0
STAT cas_badval 0
STAT touch_hits 0
STAT touch_misses 0
STAT auth_cmds 0
STAT auth_errors 0
STAT bytes_read 309601911
STAT bytes_written 4802522
STAT limit_maxbytes 67108864
STAT accepting_conns 1
STAT listen_disabled_num 0
STAT time_in_listen_disabled_us 0
STAT threads 4
STAT conn_yields 0
STAT hash_power_level 17
STAT hash_bytes 1048576
STAT hash_is_expanding 0
STAT slab_reassign_rescues 0
STAT slab_reassign_chunk_rescues 0
STAT slab_reassign_evictions_nomem 0
STAT slab_reassign_inline_reclaim 0
STAT slab_reassign_busy_items 144
STAT slab_reassign_busy_deletes 0
STAT slab_reassign_running 0
STAT slabs_moved 62
STAT lru_crawler_running 0
STAT lru_crawler_starts 1020
STAT lru_maintainer_juggles 178227
STAT malloc_fails 0
STAT log_worker_dropped 0
STAT log_worker_written 0
STAT log_watcher_skipped 0
STAT log_watcher_sent 0
STAT bytes 62500672
STAT curr_items 111808
STAT total_items 600000
STAT slab_global_page_pool 0
STAT expired_unfetched 111664
STAT evicted_unfetched 376384
STAT evicted_active 0
STAT evictions 376384
STAT reclaimed 111664
STAT crawler_reclaimed 0
STAT crawler_items_checked 224505
STAT lrutail_reflocked 1
STAT moves_to_cold 600000
STAT moves_to_warm 0
STAT moves_within_lru 0
STAT direct_reclaims 376384
STAT lru_bumps_dropped 0

stats items

STAT items:9:number 111808
STAT items:9:number_hot 0
STAT items:9:number_warm 0
STAT items:9:number_cold 111808
STAT items:9:age_hot 0
STAT items:9:age_warm 0
STAT items:9:age 142
STAT items:9:evicted 376384
STAT items:9:evicted_nonzero 0
STAT items:9:evicted_time 39
STAT items:9:outofmemory 0
STAT items:9:tailrepairs 0
STAT items:9:reclaimed 111664
STAT items:9:expired_unfetched 111664
STAT items:9:evicted_unfetched 376384
STAT items:9:evicted_active 0
STAT items:9:crawler_reclaimed 0
STAT items:9:crawler_items_checked 224505
STAT items:9:lrutail_reflocked 1
STAT items:9:moves_to_cold 600000
STAT items:9:moves_to_warm 0
STAT items:9:moves_within_lru 0
STAT items:9:direct_reclaims 376384
STAT items:9:hits_to_hot 0
STAT items:9:hits_to_warm 0
STAT items:9:hits_to_cold 0
STAT items:9:hits_to_temp 0

stats sizes

各アイテムのサイズとそれに対応する数。stats sizesを行うためには、事前にstats sizes_enableを実行しておく必要があります。パラメータでtrack_sizesを指定することでも可能です。

STAT 576 111808

stats slabs

(total_chunks * chunk_size) - mem_requestedがSlab class中で無駄なメモリー量となります。

STAT 9:chunk_size 600
STAT 9:chunks_per_page 1747
STAT 9:total_pages 64
STAT 9:total_chunks 111808
STAT 9:used_chunks 111808
STAT 9:free_chunks 0
STAT 9:free_chunks_end 0
STAT 9:mem_requested 62500672
STAT 9:get_hits 0
STAT 9:cmd_set 600000
STAT 9:delete_hits 0
STAT 9:incr_hits 0
STAT 9:decr_hits 0
STAT 9:cas_hits 0
STAT 9:cas_badval 0
STAT 9:touch_hits 0
STAT active_slabs 1
STAT total_malloced 67108864

stats conns

STAT 26:addr tcp:0.0.0.0:11211
STAT 26:state conn_listening
STAT 26:secs_since_last_cmd 584
STAT 27:addr tcp6:[::]:11211
STAT 27:state conn_listening
STAT 27:secs_since_last_cmd 584
STAT 28:addr tcp:127.0.0.1:56626
STAT 28:state conn_parse_cmd
STAT 28:secs_since_last_cmd 0

stats settings

domain_socketには、-s でUnixソケットファイルを指定した際に値が入る。(Netcatでは、-U /tmp/memcached.sock のように指定して接続する)

STAT maxbytes 67108864
STAT maxconns 1024
STAT tcpport 11211
STAT udpport 0
STAT inter NULL
STAT verbosity 3
STAT oldest 406
STAT evictions on
STAT domain_socket NULL
STAT umask 700
STAT growth_factor 1.25
STAT chunk_size 48
STAT num_threads 4
STAT num_threads_per_udp 4
STAT stat_key_prefix :
STAT detail_enabled no
STAT reqs_per_event 20
STAT cas_enabled yes
STAT tcp_backlog 1024
STAT binding_protocol auto-negotiate
STAT auth_enabled_sasl no
STAT item_size_max 1048576
STAT maxconns_fast yes
STAT hashpower_init 0
STAT slab_reassign yes
STAT slab_automove 1
STAT slab_automove_ratio 0.80
STAT slab_automove_window 30
STAT slab_chunk_max 524288
STAT lru_crawler yes
STAT lru_crawler_sleep 100
STAT lru_crawler_tocrawl 0
STAT tail_repair_time 0
STAT flush_enabled yes
STAT dump_enabled yes
STAT hash_algorithm murmur3
STAT lru_maintainer_thread yes
STAT lru_segmented yes
STAT hot_lru_pct 20
STAT warm_lru_pct 40
STAT hot_max_factor 0.20
STAT warm_max_factor 2.00
STAT temp_lru no
STAT temporary_ttl 61
STAT idle_timeout 0
STAT watcher_logbuf_size 262144
STAT worker_logbuf_size 65536
STAT track_sizes yes
STAT inline_ascii_response no

stats cachedump

stats cachedump 12 100 コマンドのようにSlab Class ID(stats itemsなどで確認可能)と取得数を指定します。

stats cachedumpは指定したスラブクラスの表示件数だけ出力しますが、最大で2MBでのレスポンスの制限があります。 https://github.com/memcached/memcached/blob/master/items.c#L596 また、実行時間が長く、キャッシュをロックする挙動となります。その他、Slab Class単位でしか取得できない制約もあります。

ITEM memtier-memtier-memtier-89695 [1000 b; 0 s]
ITEM memtier-memtier-memtier-88214 [1000 b; 0 s]
ITEM memtier-memtier-memtier-27656 [1000 b; 0 s]
ITEM memtier-memtier-memtier-21451 [1000 b; 0 s]
ITEM memtier-memtier-memtier-33994 [1000 b; 0 s]
ITEM memtier-memtier-memtier-1110 [1000 b; 0 s]
ITEM memtier-memtier-memtier-14418 [1000 b; 0 s]

一方で、lru_crawler metadump allはサイズ制限もなくキャッシュをロックせずに全キーをリスト化して出力します。 以下、lru_crawler metadump all の実行結果となります。

key=memtier-memtier-memtier-89695 exp=-1 la=1558165988 cas=6533 fetch=no cls=12 size=1088
key=memtier-memtier-memtier-14418 exp=-1 la=1558163939 cas=1992 fetch=no cls=12 size=1088
key=memtier-memtier-memtier-1110 exp=-1 la=1558163941 cas=2988 fetch=no cls=12 size=1087
key=memtier-memtier-memtier-33994 exp=-1 la=1558163942 cas=3984 fetch=no cls=12 size=1088
key=memtier-memtier-memtier-21451 exp=-1 la=1558163944 cas=4980 fetch=no cls=12 size=1088
key=memtier-memtier-memtier-27656 exp=-1 la=1558163946 cas=5976 fetch=no cls=12 size=1088
key=memtier-memtier-memtier-88214 exp=-1 la=1558163947 cas=6532 fetch=no cls=12 size=1088

statsコマンド詳細 https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L613-L1089

その他詳細

memcached/memcached.c at master · memcached/memcached · GitHub

memcached development tree. Contribute to memcached/memcached development by creating an account on GitHub.

https://github.com/memcached/memcached/blob/master/slabs.c#L516-L549

付属コマンド

./sizes

統計情報やアイテムのサイズ等を表示

Slab Stats	64
Thread stats	-6480
Global stats	168
Settings	248
Item (no cas)	48
Item (cas)	56
Libevent thread	192
Connection	544
----------------------------------------
libevent thread cumulative	6512
Thread stats cumulative		6320

./timedrun 3 memcached

第一引数で指定した時間後の第二引数で指定したプロセスを中断

Timeout.. killing the process

./testapp

テストコード。assert関数で意図した変数の値になっているか確認

1..52
ok 1 - cache_create
ok 2 - cache_constructor
ok 3 - cache_constructor_fail
ok 4 - cache_destructor
ok 5 - cache_reuse
ok 6 - cache_redzone
ok 7 - issue_161
ok 8 - strtol
ok 9 - strtoll
ok 10 - strtoul
ok 11 - strtoull
ok 12 - issue_44
ok 13 - vperror
ok 14 - issue_101
Signal handled: Terminated.
ok 15 - start_server
ok 16 - issue_92
ok 17 - issue_102
ok 18 - binary_noop
ok 19 - binary_quit
ok 20 - binary_quitq
ok 21 - binary_set
ok 22 - binary_setq
ok 23 - binary_add
ok 24 - binary_addq
ok 25 - binary_replace
ok 26 - binary_replaceq
ok 27 - binary_delete
ok 28 - binary_deleteq
ok 29 - binary_get
ok 30 - binary_getq
ok 31 - binary_getk
ok 32 - binary_getkq
ok 33 - binary_gat
ok 34 - binary_gatq
ok 35 - binary_gatk
ok 36 - binary_gatkq
ok 37 - binary_incr
ok 38 - binary_incrq
ok 39 - binary_decr
ok 40 - binary_decrq
ok 41 - binary_version
ok 42 - binary_flush
ok 43 - binary_flushq
ok 44 - binary_append
ok 45 - binary_appendq
ok 46 - binary_prepend
ok 47 - binary_prependq
ok 48 - binary_stat
ok 49 - binary_illegal
ok 50 - binary_pipeline_hickup
Signal handled: Interrupt.
ok 51 - shutdown
ok 52 - stop_server

memcached-tool

  #  Item_Size  Max_age   Pages   Count   Full?  Evicted Evict_Time OOM
  1      96B         0s       1       0     yes        0        0    0
 12     1.2K        33s       1      26     yes        0        0    0

dumpオプションを指定すると以下のようになります。

Dumping memcache contents
  Number of buckets: 2
  Number of items  : 28
Dumping bucket 1 - 2 total items
Dumping bucket 12 - 26 total items
add memtier-memtier-memtier-1213 0 0 1000
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
add memtier-memtier-memtier-60328 0 0 1000
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
add memtier-memtier-memtier
:
:

その他

scripts/ 以下に以下のようなコマンドがあります。

  • damemtop
  • memcached-automove
  • memcached-automove-extstore
  • memcached-init
  • memcached.sysv
  • start-memcached

ベンチマーク

ベンチマークには、memslap/memaslap/mcperf/memtier_benchmarkなどのものがあります。

$ memslap --servers localhost:11211 --concurrency 300 --execute-number=10000 --flush --verbose
$ memaslap --servers localhost:11211 --concurrency 300 --execute_number=10000 --verbose
$ mcperf --sizes=d300 -s localhost -p 11211 -e 900 -n 10000 -N 100000000
$ memtier_benchmark -s localhost -p 11211 -P memcache_text --threads 4 -c 250  --test-time=10 --ratio=1:10 --data-size-list=1000:60,5000:30,100000:10  --key-prefix=memtier-memtier-memtier- --key-maximum=100000

memslap/memaslapはlibmemcachedで提供されています。そのため、yum install libmemcachedを実行後に利用できるようになります。

twemperf(mcperf)は以下のコマンドでインストールすることが可能です。

$ git clone git://github.com/twitter/twemperf.git
$ cd twemperf/
$ autoreconf -fvi
$ CFLAGS="-ggdb3 -O0" ./configure --enable-debug
$ make && sudo make install

memtier_benchmarkのインストール方法は以下の記事をご覧ください。

memtier_benchmark の導入 (memcached-tool で Slab の内容確認) https://hayashier.com/memtier_benchmark-introduction/

バイナリプロトコル

テキストプロトコルのパース処理の省略による処理の高速化とテキスト形式である脆弱性の問題に対応するものとなっています。24バイトの固定長を利用します。

フォーマット

パケットフォーマット

 Byte/     0       |       1       |       2       |       3       |
    /              |               |               |               |
   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
   +---------------+---------------+---------------+---------------+
  0/ HEADER                                                        /
   /                                                               /
   /                                                               /
   /                                                               /
   +---------------+---------------+---------------+---------------+
 24/ COMMAND-SPECIFIC EXTRAS (as needed)                           /
  +/  (note length in the extras length header field)              /
   +---------------+---------------+---------------+---------------+
  m/ Key (as needed)                                               /
  +/  (note length in key length header field)                     /
   +---------------+---------------+---------------+---------------+
  n/ Value (as needed)                                             /
  +/  (note length is total body length header field, minus        /
  +/   sum of the extras and key length body fields)               /
   +---------------+---------------+---------------+---------------+

上記24バイトのヘッダー部分について

リクエストヘッダー

 Byte/     0       |       1       |       2       |       3       |
    /              |               |               |               |
   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
   +---------------+---------------+---------------+---------------+
  0| Magic         | Opcode        | Key length                    |
   +---------------+---------------+---------------+---------------+
  4| Extras length | Data type     | vbucket id                    |
   +---------------+---------------+---------------+---------------+
  8| Total body length                                             |
   +---------------+---------------+---------------+---------------+
 12| Opaque                                                        |
   +---------------+---------------+---------------+---------------+
 16| CAS                                                           |
   |                                                               |
   +---------------+---------------+---------------+---------------+

レスポンスヘッダー

 Byte/     0       |       1       |       2       |       3       |
    /              |               |               |               |
   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
   +---------------+---------------+---------------+---------------+
  0| Magic         | Opcode        | Key Length                    |
   +---------------+---------------+---------------+---------------+
  4| Extras length | Data type     | Status                        |
   +---------------+---------------+---------------+---------------+
  8| Total body length                                             |
   +---------------+---------------+---------------+---------------+
 12| Opaque                                                        |
   +---------------+---------------+---------------+---------------+
 16| CAS                                                           |
   |                                                               |
   +---------------+---------------+---------------+---------------+

ASCIIモードでは、キー長の制限が250バイトでしたが、バイナリモードでは、2^16(=65,536)バイトまで扱えるようになっています。 また、コマンドの種類増加など拡張性も上がっています。 リクエストとレスポンスの違いは、vbucket id(以前は予約済み領域) と Status となります。

各項目

Magic : マジックナンバー。リクエストパケットは 0x80、レスポンスパケットは0x81 Opcode : コマンドを表すコード Key length : key長 Status : レスポンスのステータスコード Extras length : Extras長 Data type : 予約済み領域 vbucket id : コマンドの仮想バケット Total body length : extra + key + value のバイト数 Opaque : リクエストで送った値と同じ値をレスポンスを返すことで対応付けることに利用 Will be copied back to you in the response CAS : CAS機能で使用するデータのバージョン

詳細

Opcodeはコマンドのコードに相当し下記コマンドがあります。

0x00	Get
0x01	Set
0x02	Add
0x03	Replace
0x04	Delete
0x05	Increment
0x06	Decrement
0x07	Quit
0x08	Flush
0x09	GetQ
0x0a	No-op
0x0b	Version
0x0c	GetK
0x0d	GetKQ
0x0e	Append
0x0f	Prepend
0x10	Stat
0x11	SetQ
0x12	AddQ
0x13	ReplaceQ
0x14	DeleteQ
0x15	IncrementQ
0x16	DecrementQ
0x17	QuitQ
0x18	FlushQ
0x19	AppendQ
0x1a	PrependQ
0x1b	Verbosity *
0x1c	Touch *
0x1d	GAT *
0x1e	GATQ *
0x20	SASL list mechs
0x21	SASL Auth
0x22	SASL Step
0x30	RGet
0x31	RSet
0x32	RSetQ
0x33	RAppend
0x34	RAppendQ
0x35	RPrepend
0x36	RPrependQ
0x37	RDelete
0x38	RDeleteQ
0x39	RIncr
0x3a	RIncrQ
0x3b	RDecr
0x3c	RDecrQ
0x3d	Set VBucket *
0x3e	Get VBucket *
0x3f	Del VBucket *
0x40	TAP Connect *
0x41	TAP Mutation *
0x42	TAP Delete *
0x43	TAP Flush *
0x44	TAP Opaque *
0x45	TAP VBucket Set *
0x46	TAP Checkpoint Start *
0x47	TAP Checkpoint End *

レスポンスのStatusは以下の種類のものがあります。

0x0000	No error
0x0001	Key not found
0x0002	Key exists
0x0003	Value too large
0x0004	Invalid arguments
0x0005	Item not stored
0x0006	Incr/Decr on non-numeric value.
0x0007	The vbucket belongs to another server
0x0008	Authentication error
0x0009	Authentication continue
0x0081	Unknown command
0x0082	Out of memory
0x0083	Not supported
0x0084	Internal error
0x0085	Busy
0x0086	Temporary failure

BinaryProtocolRevamped https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped

SASL

Simple Authentication and Security Layerの略。インターネットプロトコルへの認証とデータセキュリティのためのフレームワーク。アプリケーションプロトコルから認証機構を分離し、SASLをサポートするもので利用可能 Memcachedでは、1.4.3以上で--enabled-saslオプションを付与してビルドすることで利用可能です。

以下のような定義済みのSASL機構がある。

  • EXTERNAL
  • ANONYMOUS
  • PLAIN
  • OTP
  • SKEY
  • CRAM-MD5
  • DIGEST-MD5
  • NTLM
  • GSSAPI

Memcached ではバイナリーモードのみで以下のものが提供されています。 サーバが対応している認証機構のリスト化と認証リクエスト、認証継続のコマンドがあります。

  • コマンド
    • List Mechanisms 0x20 None None
    • Start Authentication 0x21 Mechanism Auth Data
    • Authentication Step 0x22 Mechanism Auth Data

エラー文は以下の種類があります。

  • エラー
    • 0x20 認証必須/失敗
    • 0x21 追加の認証必要

SASLAuthProtocol

SASLAuthProtocol · memcached/memcached Wiki · GitHub

memcached development tree. Contribute to memcached/memcached development by creating an account on GitHub.

SASLHowto https://github.com/memcached/memcached/wiki/SASLHowto

Simple Authentication and Security Layer https://ja.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer

Simple Authentication and Security Layer (SASL)

RFC 4422 - Simple Authentication and Security Layer (SASL)

The Simple Authentication and Security Layer (SASL) is a framework for providing authentication and data security services in connection-oriented protocols via…

RFC 2222 - Simple Authentication and Security Layer (SASL)

This document describes a method for adding authentication support to connection-based protocols. [STANDARDS-TRACK]

The Kerberos V5 ("GSSAPI") Simple Authentication and Security Layer (SASL) Mechanism https://tools.ietf.org/html/rfc4752

なお、Memcached 1.5.13以上で--enable-tlsオプションを付与して、TLSの機能を利用することも可能になりました。 https://github.com/memcached/memcached/pull/440

ハッシュ関数

  • jenkins

    • Jenkins ハッシュ関数は、Bob Jenkinsによって設計されたマルチバイトキー向けの非暗号化のハッシュ関数の集合
    • 種類としては、one_at_a_time, lookup2, lookup3などがあり、Memcachedでは1997年に公開されたlookup2が使用されている。
    • hash関数で自由に使って良いとするハッシュ関数を公開している
      • http://burtleburtle.net/bob/hash/doobs.html
      • http://burtleburtle.net/bob/c/lookup2.c
    • Memcachedでの実装はjenkins_hash.c https://github.com/memcached/memcached/blob/master/jenkins_hash.c
  • murmur3

    • 一般的なハッシュベースのルックアップに適した非暗号化ハッシュ関数。2008年にAustin Applebyによって作成。SMHasherのハッシュ関数の中の一つ
    • MurmurHash3 : https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
    • MurmurHash2 : https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp , https://sites.google.com/site/murmurhash/
    • RedisではMurmurHash2、MemcachedではMurmurHash3が採用されている。

実装

アイテム

定義

https://github.com/memcached/memcached/blob/master/memcached.h#L476-L503 Memcached中で扱われるアイテムは以下のように_stritem構造体で定義されています。ハッシュ値が衝突した場合は、チェイン法を使用し、h_next変数に相当しています。

typedef struct _stritem {
    /* Protected by LRU locks */
    struct _stritem *next;
    struct _stritem *prev;
    /* Rest are protected by an item lock */
    struct _stritem *h_next;    /* hash chain next */
    rel_time_t      time;       /* least recent access */
    rel_time_t      exptime;    /* expire time */
    int             nbytes;     /* size of data */
    unsigned short  refcount;
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* ITEM_* above */
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

アイテム保存/削除

memcached/assoc.c at master · memcached/memcached · GitHub

memcached development tree. Contribute to memcached/memcached development by creating an account on GitHub.

int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;

//    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it->h_next = old_hashtable[oldbucket];
        old_hashtable[oldbucket] = it;
    } else {
        it->h_next = primary_hashtable[hv & hashmask(hashpower)];
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }

    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);
    return 1;
}

void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {
    item **before = _hashitem_before(key, nkey, hv);

    if (*before) {
        item *nxt;
        /* The DTrace probe cannot be triggered as the last instruction
         * due to possible tail-optimization by the compiler
         */
        MEMCACHED_ASSOC_DELETE(key, nkey);
        nxt = (*before)->h_next;
        (*before)->h_next = 0;   /* probably pointless, but whatever. */
        *before = nxt;
        return;
    }
    /* Note:  we never actually get here.  the callers don't delete things
       they can't find. */
    assert(*before != 0);
}

Slab class

定義

https://github.com/memcached/memcached/blob/master/slabs.c#L28-L41 Slab classは以下のようにslabclass_t構造体で定義されています。Slabclass毎にアイテムのリストを持っておりslots変数に相当します。また、slab_listでSlab(Chunkの集合)のリストを保持しています。

typedef struct {
    unsigned int size;      /* sizes of items */
    unsigned int perslab;   /* how many items per slab */

    void *slots;           /* list of item ptrs */
    unsigned int sl_curr;   /* total free items in list */

    unsigned int slabs;     /* how many slabs were allocated for this class */

    void **slab_list;       /* array of slab pointers */
    unsigned int list_size; /* size of prev array */

    size_t requested; /* The number of requested bytes */
} slabclass_t;

Slab初期化

https://github.com/memcached/memcached/blob/master/slabs.c#L153-L232 Slabの初期化は以下のslabs_init関数で行われています。preallocをパラメータで有効にしているかで最初の挙動が分岐しています。事前割当てのオプション(-Lオプション)指定時には初めにスラブのためにメモリーを割当てておきますが、そうでない場合は不足している場合に適宜確保していく形となります。

/**
 * Determines the chunk sizes and initializes the slab class descriptors
 * accordingly.
 */
void slabs_init(const size_t limit, const double factor, const bool prealloc, const uint32_t *slab_sizes) {
    int i = POWER_SMALLEST - 1;
    unsigned int size = sizeof(item) + settings.chunk_size;

    /* Some platforms use runtime transparent hugepages. If for any reason
     * the initial allocation fails, the required settings do not persist
     * for remaining allocations. As such it makes little sense to do slab
     * preallocation. */
    bool __attribute__ ((unused)) do_slab_prealloc = false;

    mem_limit = limit;

    if (prealloc) {
#if defined(__linux__) && defined(MADV_HUGEPAGE)
        mem_base = alloc_large_chunk_linux(mem_limit);
        if (mem_base)
            do_slab_prealloc = true;
#else
        /* Allocate everything in a big chunk with malloc */
        mem_base = malloc(mem_limit);
        do_slab_prealloc = true;
#endif
        if (mem_base != NULL) {
            mem_current = mem_base;
            mem_avail = mem_limit;
        } else {
            fprintf(stderr, "Warning: Failed to allocate requested memory in"
                    " one large chunk.\nWill allocate in smaller chunks\n");
        }
    }

    memset(slabclass, 0, sizeof(slabclass));

    while (++i < MAX_NUMBER_OF_SLAB_CLASSES-1) {
        if (slab_sizes != NULL) {
            if (slab_sizes[i-1] == 0)
                break;
            size = slab_sizes[i-1];
        } else if (size >= settings.slab_chunk_size_max / factor) {
            break;
        }
        /* Make sure items are always n-byte aligned */
        if (size % CHUNK_ALIGN_BYTES)
            size += CHUNK_ALIGN_BYTES - (size % CHUNK_ALIGN_BYTES);

        slabclass[i].size = size;
        slabclass[i].perslab = settings.slab_page_size / slabclass[i].size;
        if (slab_sizes == NULL)
            size *= factor;
        if (settings.verbose > 1) {
            fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                    i, slabclass[i].size, slabclass[i].perslab);
        }
    }

    power_largest = i;
    slabclass[power_largest].size = settings.slab_chunk_size_max;
    slabclass[power_largest].perslab = settings.slab_page_size / settings.slab_chunk_size_max;
    if (settings.verbose > 1) {
        fprintf(stderr, "slab class %3d: chunk size %9u perslab %7u\n",
                i, slabclass[i].size, slabclass[i].perslab);
    }

    /* for the test suite:  faking of how much we've already malloc'd */
    {
        char *t_initial_malloc = getenv("T_MEMD_INITIAL_MALLOC");
        if (t_initial_malloc) {
            mem_malloced = (size_t)atol(t_initial_malloc);
        }

    }

    if (prealloc && do_slab_prealloc) {
        slabs_preallocate(power_largest);
    }
}

コネクション

定義

https://github.com/memcached/memcached/blob/master/memcached.h#L601-L706 コネクション情報はconn構造体として以下のように定義されています。

struct conn {
    int    sfd;
#ifdef TLS
    SSL    *ssl;
    char   *ssl_wbuf;
    bool ssl_enabled;
#endif
    sasl_conn_t *sasl_conn;
    bool sasl_started;
    bool authenticated;
    enum conn_states  state;
    enum bin_substates substate;
    rel_time_t last_cmd_time;
    struct event event;
    short  ev_flags;
    short  which;   /** which events were just triggered */

    char   *rbuf;   /** buffer to read commands into */
    char   *rcurr;  /** but if we parsed some already, this is where we stopped */
    int    rsize;   /** total allocated size of rbuf */
    int    rbytes;  /** how much data, starting from rcur, do we have unparsed */

    char   *wbuf;
    char   *wcurr;
    int    wsize;
    int    wbytes;
    /** which state to go into after finishing current write */
    enum conn_states  write_and_go;
    void   *write_and_free; /** free this memory after finishing writing */

    char   *ritem;  /** when we read in an item's value, it goes here */
    int    rlbytes;

    /* data for the nread state */

    /**
     * item is used to hold an item structure created after reading the command
     * line of set/add/replace commands, but before we finished reading the actual
     * data. The data is read into ITEM_data(item) to avoid extra copying.
     */

    void   *item;     /* for commands set/add/replace  */

    /* data for the swallow state */
    int    sbytes;    /* how many bytes to swallow */

    /* data for the mwrite state */
    struct iovec *iov;
    int    iovsize;   /* number of elements allocated in iov[] */
    int    iovused;   /* number of elements used in iov[] */

    struct msghdr *msglist;
    int    msgsize;   /* number of elements allocated in msglist[] */
    int    msgused;   /* number of elements used in msglist[] */
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */

    item   **ilist;   /* list of items to write out */
    int    isize;
    item   **icurr;
    int    ileft;

    char   **suffixlist;
    int    suffixsize;
    char   **suffixcurr;
    int    suffixleft;
#ifdef EXTSTORE
    int io_wrapleft;
    unsigned int recache_counter;
    io_wrap *io_wraplist; /* linked list of io_wraps */
    bool io_queued; /* FIXME: debugging flag */
#endif
    enum protocol protocol;   /* which protocol this connection speaks */
    enum network_transport transport; /* what transport is used by this connection */

    /* data for UDP clients */
    int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
    struct sockaddr_in6 request_addr; /* udp: Who sent the most recent request */
    socklen_t request_addr_size;
    unsigned char *hdrbuf; /* udp packet headers */
    int    hdrsize;   /* number of headers' worth of space is allocated */

    bool   noreply;   /* True if the reply should not be sent. */
    /* current stats command */
    struct {
        char *buffer;
        size_t size;
        size_t offset;
    } stats;

    /* Binary protocol stuff */
    /* This is where the binary header goes */
    protocol_binary_request_header binary_header;
    uint64_t cas; /* the cas to return */
    short cmd; /* current command being processed */
    int opaque;
    int keylen;
    conn   *next;     /* Used for generating a list of conn structures */
    LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
    ssize_t (*read)(conn  *c, void *buf, size_t count);
    ssize_t (*sendmsg)(conn *c, struct msghdr *msg, int flags);
    ssize_t (*write)(conn *c, void *buf, size_t count);
};

コネクションのキュー

https://github.com/memcached/memcached/blob/master/thread.c#L44-L50 受け付けたリクエストはCQ_ITEM構造体のリストとして以下のように定義されています。

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
};

CQ_ITEM構造体は以下のように定義されており、対象のアイテムやコネクション情報が格納されています。

typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
    int               sfd;
    enum conn_states  init_state;
    int               event_flags;
    int               read_buffer_size;
    enum network_transport     transport;
    enum conn_queue_item_modes mode;
    conn *c;
    void    *ssl;
    CQ_ITEM          *next;
};

新規コネクション受付からWorkerスレッドへの橋渡し

コネクションの初期化

https://github.com/memcached/memcached/blob/master/memcached.c#L6121-L6135 TCPリスナーの場合、server_socket関数でmain baseを引数としてconn_new関数で呼ばれています。

        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base, NULL))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
#ifdef TLS
            listen_conn_add->ssl_enabled = ssl_enabled;
#else
            assert(ssl_enabled == false);
#endif
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }

https://github.com/memcached/memcached/blob/master/memcached.c#L540-L709 conn_new関数で受け付けたリクエストの情報をconn構造体にセットして初期化。

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base, void *ssl) {
    conn *c;

    assert(sfd >= 0 && sfd < max_fds);
    c = conns[sfd];

    if (NULL == c) {
        if (!(c = (conn *)calloc(1, sizeof(conn)))) {
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            fprintf(stderr, "Failed to allocate connection object\n");
            return NULL;
        }
        MEMCACHED_CONN_CREATE(c);
        c->read = NULL;
        c->sendmsg = NULL;
        c->write = NULL;
        c->rbuf = c->wbuf = 0;
        c->ilist = 0;
        c->suffixlist = 0;
        c->iov = 0;
        c->msglist = 0;
        c->hdrbuf = 0;

        c->rsize = read_buffer_size;
        c->wsize = DATA_BUFFER_SIZE;
        c->isize = ITEM_LIST_INITIAL;
        c->suffixsize = SUFFIX_LIST_INITIAL;
        c->iovsize = IOV_LIST_INITIAL;
        c->msgsize = MSG_LIST_INITIAL;
        c->hdrsize = 0;

        c->rbuf = (char *)malloc((size_t)c->rsize);
        c->wbuf = (char *)malloc((size_t)c->wsize);
        c->ilist = (item **)malloc(sizeof(item *) * c->isize);
        c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
        c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
        c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);

        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
                c->msglist == 0 || c->suffixlist == 0) {
            conn_free(c);
            STATS_LOCK();
            stats.malloc_fails++;
            STATS_UNLOCK();
            fprintf(stderr, "Failed to allocate buffers for connection\n");
            return NULL;
        }

        STATS_LOCK();
        stats_state.conn_structs++;
        STATS_UNLOCK();

        c->sfd = sfd;
        conns[sfd] = c;
    }

    c->transport = transport;
    c->protocol = settings.binding_protocol;

    /* unix socket mode doesn't need this, so zeroed out.  but why
     * is this done for every command?  presumably for UDP
     * mode.  */
    if (!settings.socketpath) {
        c->request_addr_size = sizeof(c->request_addr);
    } else {
        c->request_addr_size = 0;
    }

    if (transport == tcp_transport && init_state == conn_new_cmd) {
        if (getpeername(sfd, (struct sockaddr *) &c->request_addr,
                        &c->request_addr_size)) {
            perror("getpeername");
            memset(&c->request_addr, 0, sizeof(c->request_addr));
        }
    }

    if (settings.verbose > 1) {
        if (init_state == conn_listening) {
            fprintf(stderr, "<%d server listening (%s)\n", sfd,
                prot_text(c->protocol));
        } else if (IS_UDP(transport)) {
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
        } else if (c->protocol == negotiating_prot) {
            fprintf(stderr, "<%d new auto-negotiating client connection\n",
                    sfd);
        } else if (c->protocol == ascii_prot) {
            fprintf(stderr, "<%d new ascii client connection.\n", sfd);
        } else if (c->protocol == binary_prot) {
            fprintf(stderr, "<%d new binary client connection.\n", sfd);
        } else {
            fprintf(stderr, "<%d new unknown (%d) client connection\n",
                sfd, c->protocol);
            assert(false);
        }
    }
#ifdef TLS
    c->ssl = NULL;
    c->ssl_wbuf = NULL;
    c->ssl_enabled = false;
#endif
    c->state = init_state;
    c->rlbytes = 0;
    c->cmd = -1;
    c->rbytes = c->wbytes = 0;
    c->wcurr = c->wbuf;
    c->rcurr = c->rbuf;
    c->ritem = 0;
    c->icurr = c->ilist;
    c->suffixcurr = c->suffixlist;
    c->ileft = 0;
    c->suffixleft = 0;
    c->iovused = 0;
    c->msgcurr = 0;
    c->msgused = 0;
    c->sasl_started = false;
    c->authenticated = false;
    c->last_cmd_time = current_time; /* initialize for idle kicker */
#ifdef EXTSTORE
    c->io_wraplist = NULL;
    c->io_wrapleft = 0;
#endif

    c->write_and_go = init_state;
    c->write_and_free = 0;
    c->item = 0;

    c->noreply = false;

#ifdef TLS
    if (ssl) {
        c->ssl = (SSL*)ssl;
        c->read = ssl_read;
        c->sendmsg = ssl_sendmsg;
        c->write = ssl_write;
        c->ssl_enabled = true;
        SSL_set_info_callback(c->ssl, ssl_callback);
    } else
#else
    // This must be NULL if TLS is not enabled.
    assert(ssl == NULL);
#endif
    {
        c->read = tcp_read;
        c->sendmsg = tcp_sendmsg;
        c->write = tcp_write;
    }

    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    STATS_LOCK();
    stats_state.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    MEMCACHED_CONN_ALLOCATE(c->sfd);

    return c;
}

https://github.com/memcached/memcached/blob/master/memcached.c#L148 listen_connはグローバルは静的変数として以下のように宣言されています。

static conn *listen_conn = NULL;

リクエスト受付時

https://github.com/memcached/memcached/blob/master/memcached.c#L5499-L5908 リクエストを受け付けた際には、drive_machine関数中からaccept_new_conns関数が実行され、

} else if (errno == EMFILE) {
    if (settings.verbose > 0)
        fprintf(stderr, "Too many open connections\n");
    accept_new_conns(false);
    stop = true;

memcached/thread.c at master · memcached/memcached · GitHub

memcached development tree. Contribute to memcached/memcached development by creating an account on GitHub.

void accept_new_conns(const bool do_accept) {
    pthread_mutex_lock(&conn_lock);
    do_accept_new_conns(do_accept);
    pthread_mutex_unlock(&conn_lock);
}

https://github.com/memcached/memcached/blob/master/memcached.c#L5306-L5347 先程初期化したコネクションをlistenしています。

void do_accept_new_conns(const bool do_accept) {
    conn *next;

    for (next = listen_conn; next; next = next->next) {
        if (do_accept) {
            update_event(next, EV_READ | EV_PERSIST);
            if (listen(next->sfd, settings.backlog) != 0) {
                perror("listen");
            }
        }
        else {
            update_event(next, 0);
            if (listen(next->sfd, 0) != 0) {
                perror("listen");
            }
        }
    }

    if (do_accept) {
        struct timeval maxconns_exited;
        uint64_t elapsed_us;
        gettimeofday(&maxconns_exited,NULL);
        STATS_LOCK();
        elapsed_us =
            (maxconns_exited.tv_sec - stats.maxconns_entered.tv_sec) * 1000000
            + (maxconns_exited.tv_usec - stats.maxconns_entered.tv_usec);
        stats.time_in_listen_disabled_us += elapsed_us;
        stats_state.accepting_conns = true;
        STATS_UNLOCK();
    } else {
        STATS_LOCK();
        stats_state.accepting_conns = false;
        gettimeofday(&stats.maxconns_entered,NULL);
        stats.listen_disabled_num++;
        STATS_UNLOCK();
        allow_new_conns = false;
        maxconns_handler(-42, 0, 0);
    }
}

https://github.com/memcached/memcached/blob/master/memcached.h#L566-L585 libeventのスレッドは、LIBEVENT_THREAD構造体として以下のように定義されています。この要素中にconn_queue構造体のnew_conn_queue変数が含まれています。

typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
#ifdef EXTSTORE
    cache_t *io_cache;          /* IO objects */
    void *storage;              /* data object for storage system */
#endif
    logger *l;                  /* logger buffer */
    void *lru_bump_buf;         /* async LRU bump buffer */
#ifdef TLS
    char   *ssl_wbuf;
#endif

} LIBEVENT_THREAD;

https://github.com/memcached/memcached/blob/master/thread.c#L334-L337 リクエストを受け付けた際には、workerスレッドに対して以下の箇所でthread_libevent_process関数を呼び出すように設定しています。

/* Listen for notifications from other threads */
event_set(&me->notify_event, me->notify_receive_fd,
          EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);

https://github.com/memcached/memcached/blob/master/thread.c#L427 thread_libevent_process関数中で、先程のコネクションのキューからコネクションを取り出して、接続を介ししています。

item = cq_pop(me->new_conn_queue);

if (NULL == item) {
    break;
}
switch (item->mode) {
    case queue_new_conn:
        c = conn_new(item->sfd, item->init_state, item->event_flags,
                           item->read_buffer_size, item->transport,
                           me->base, item->ssl);

リンク

memcached - a distributed memory object caching system

Memached Wiki https://github.com/memcached/memcached/wiki

My Twitter & RSS

Leave a Reply

Your email address will not be published. Required fields are marked *