DEV Community

holmeshe
holmeshe

Posted on • Originally published at holmeshe.me on

Understanding The Memcached Source Code - LRU II

II, III) is the core module of the cache system, which largely determines how efficient the bottleneck resource, memory, can be utilized. The other 3 parts, namely,

II - this article) for entry expiration; and an

event driven model (not complete) based on libevent; and the

consistent harsh (not complete) for data distribution,

are built around it.

More often than not, the LRU algorithm is combined with a hash map , and is referred to as an LRU cache.

In a LRU-cache , the hash map enables fast accessing of cached objects; and LRU avoids the cache to grow infinitely by marking expired, or so called, least recently used objects. This time we examine the memcached ‘s implementation of hash map.

Overview (textbook overlapped, skip)

Hash map is basically a fixed-sized array that indexes value_s with integers hashed from _key_s. In hash map an array entry is referred to as a _bucket. If the hash value exceeds the number of buckets (i.e., array size), it rolls over using ‘mod’ (%). Collision occurs when more than two _key_s result in the same hash value or different hash value s roll over to the same bucket, then a *linked list is formed on the bucket in a collision.

Collision slows down lookups speed for the sequential access of linked list, hence it is required to increase the bucket number, and to rehash entries using the new bucket number before the performance goes too bad. This process will be discussed soon.

Module initialization

The first relevant method is

hash_init

which simply determines the hash algorithm type.

int hash_init(enum hashfunc_type type) {
    switch(type) {
        case JENKINS_HASH:
            hash = jenkins_hash;
            settings.hash_algorithm = "jenkins";
            break;
        case MURMUR3_HASH:
            hash = MurmurHash3_x86_32;
            settings.hash_algorithm = "murmur3";
            break;
        default:
            return -1;
    }
    return 0;
}
Enter fullscreen mode Exit fullscreen mode
hash_init@hash.c

This method is called from here as one of the init steps before the logic enters the main event loop.

...
...
    if (hash_init(hash_type) != 0) {
        fprintf(stderr, "Failed to initialize hash_algorithm!\n");
        exit(EX_USAGE);
    }
...
Enter fullscreen mode Exit fullscreen mode
main@memcached.c:5849

The parameter hash_type is set to MURMUR3_HASH by the mentioned command-line argument modern.

...
            case MODERN:
                /* Modernized defaults. Need to add equivalent no_* flags
                 * before making truly default. */
...
                hash_type = MURMUR3_HASH;
...
                break;
...
Enter fullscreen mode Exit fullscreen mode
main@memcached.c:5849

The second method

assoc_init

allocates the fixed sized array mentioned in the beginning.

void assoc_init(const int hashtable_init) {
    if (hashtable_init) {
        hashpower = hashtable_init;
    }
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }
...// scr: stat
}
Enter fullscreen mode Exit fullscreen mode
hash_init@hash.c

This method is called in a similar location as hash_init as part of the system bootstrap process.

...
    assoc_init(settings.hashpower_init);
...
Enter fullscreen mode Exit fullscreen mode
main@memcached.c:5976

And the actual initial size is determined by the command-line argument hashpower.

...
case HASHPOWER_INIT:
    if (subopts_value == NULL) {
        fprintf(stderr, "Missing numeric argument for hashpower\n");
        return 1;
    }
    settings.hashpower_init = atoi(subopts_value);
    if (settings.hashpower_init < 12) {
        fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
            settings.hashpower_init);
        return 1;
    } else if (settings.hashpower_init > 64) {
        fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
            "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
            settings.hashpower_init);
        return 1;
    }
    break;
...
Enter fullscreen mode Exit fullscreen mode
main@memcached.c:5677

As said before, the array can be replaced with a newly allocated larger one if the performance drops due to excessive collision. Next we discuss the process of

Scale up & entry migration

In memcached , the threshold is 1.5, meaning, if the item s number exceeds 1.5 * bucket number , the mentioned expanding process starts.

Expand hash map

...
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        assoc_start_expand();
    }
...
Enter fullscreen mode Exit fullscreen mode
assoc_insert@assoc.c:173

The assoc_start_expand simply set a flag (i.e., do_run_maintenance_thread), and send a signal to awake a maintenance thread that does the actual job.

static void assoc_start_expand(void) {
    if (started_expanding)
        return;

    started_expanding = true;
    pthread_cond_signal(&maintenance_cond);
}
Enter fullscreen mode Exit fullscreen mode
assoc_insert@assoc.c:173

Maintenance thread main loop

static void *assoc_maintenance_thread(void *arg) {

  mutex_lock(&maintenance_lock);
  while (do_run_maintenance_thread/* scr: the flag*/) {
    int ii = 0;

    /* There is only one expansion thread, so no need to global lock. */
    for (ii = 0; ii < hash_bulk_move && expanding; ++ii) { // scr: ----> 2)
      item *it, *next;
      int bucket;
      void *item_lock = NULL;

      /* bucket = hv & hashmask(hashpower) =>the bucket of hash table
       * is the lowest N bits of the hv, and the bucket of item_locks is
       *  also the lowest M bits of hv, and N is greater than M.
       *  So we can process expanding with only one item_lock. cool! */
      if ((item_lock = item_trylock(expand_bucket))) { // scr: --------> 3)
        for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
          next = it->h_next; // scr: ----------------------------------> 4)
          bucket = hash(ITEM_key(it), it->nkey) & hashmask(hashpower);
          it->h_next = primary_hashtable[bucket];
          primary_hashtable[bucket] = it;
        }

        old_hashtable[expand_bucket] = NULL; // scr: ----------------> 4.1)

        expand_bucket++; // scr: --------------------------------------> 5)
        if (expand_bucket == hashsize(hashpower - 1)) { // scr: -------> 6)
          expanding = false;
          free(old_hashtable);
... // scr: ---------------------------------------------------> stat & log
        }
      } else {
        usleep(10*1000); // scr: ------------------------------------> 3.1)
      }


      if (item_lock) { // scr: --------------------------------------> 3.2)
        item_trylock_unlock(item_lock);
        item_lock = NULL;
      }
    }

    if (!expanding) {
      /* We are done expanding.. just wait for next invocation */
      started_expanding = false;
      pthread_cond_wait(&maintenance_cond, &maintenance_lock); // scr: > 0)
      /* assoc_expand() swaps out the hash table entirely, so we need
       * all threads to not hold any references related to the hash
       * table while this happens.
       * This is instead of a more complex, possibly slower algorithm to
       * allow dynamic hash table expansion without causing significant
       * wait times.
       */
      pause_threads(PAUSE_ALL_THREADS);
      assoc_expand(); // scr: -----------------------------------------> 1)
      pause_threads(RESUME_ALL_THREADS);
    }
  }
  return NULL;
}
Enter fullscreen mode Exit fullscreen mode
assoc_maintenance_thread@assoc.c

0) This is where the thread waits up from sleep and start working, and goes to sleep when there is nothing to be done.

1) assoc_expand allocates the resource for the new hash map which is meant to replace the old one initialized from here.

/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
    old_hashtable = primary_hashtable;

    primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
    if (primary_hashtable) {
... // scr: log
        hashpower++;
        expanding = true;
        expand_bucket = 0;
... // scr: stat
    } else {
        primary_hashtable = old_hashtable;
        /* Bad news, but we can keep running. */
    }
}
Enter fullscreen mode Exit fullscreen mode
assoc_expand@assoc.c

2) Only migrate a certain number of item s in one batch. hash_bulk_move avoids the thread hanging around too long when stop_assoc_maintenance_thread is called. In contrast to the discussed assoc_start_expand, stop_assoc_maintenance_thread reset the flag do_run_maintenance_thread and send the signal to wake up the thread to exit.

#define DEFAULT_HASH_BULK_MOVE 1
int hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
Enter fullscreen mode Exit fullscreen mode
assoc.c:207
...
    char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
    if (env != NULL) {
        hash_bulk_move = atoi(env);
        if (hash_bulk_move == 0) {
            hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
        }
    }
...
Enter fullscreen mode Exit fullscreen mode
start_assoc_maintenance_thread@assoc.c:281
void stop_assoc_maintenance_thread() {
    mutex_lock(&maintenance_lock);
    do_run_maintenance_thread = 0;
    pthread_cond_signal(&maintenance_cond);
    mutex_unlock(&maintenance_lock);

    /* Wait for the maintenance thread to stop */
    pthread_join(maintenance_tid, NULL);
}
Enter fullscreen mode Exit fullscreen mode
stop_assoc_maintenance_thread@assoc.c

3) (The “item lock” actually works on the whole bucket hence I will call it bucket lock instead) Use low priority item_trylock (i.e., pthread_mutex_trylock) to access the bucket lock; 3.1) sleep for 10 sec when the the item is not available; and 3.2) release the lock using item_trylock_unlock when the migration (of this bucket) completes.

void *item_trylock(uint32_t hv) {
    pthread_mutex_t *lock = &item_locks[hv & hashmask(item_lock_hashpower)];
    if (pthread_mutex_trylock(lock) == 0) {
        return lock;
    }
    return NULL;
}
Enter fullscreen mode Exit fullscreen mode
item_trylock@thread.c
void item_trylock_unlock(void *lock) {
    mutex_unlock((pthread_mutex_t *) lock);
}
Enter fullscreen mode Exit fullscreen mode
item_trylock@thread.c

4) Rehash all the item s in the bucket, and migrate them to the new hash map.

5) Move on to the next bucket.

6) Last bucket reached -> go to 0)

Maintenance thread start

int start_assoc_maintenance_thread() {
    int ret;
    char *env = getenv("MEMCACHED_HASH_BULK_MOVE");
    if (env != NULL) {
        hash_bulk_move = atoi(env);
        if (hash_bulk_move == 0) {
            hash_bulk_move = DEFAULT_HASH_BULK_MOVE;
        }
    }
    pthread_mutex_init(&maintenance_lock, NULL);
    if ((ret = pthread_create(&maintenance_tid, NULL,
                              assoc_maintenance_thread, NULL)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
        return -1;
    }
    return 0;
}
Enter fullscreen mode Exit fullscreen mode
start_assoc_maintenance_thread@assoc.c

Similar to initialization methods, it is called during system bootstrap.

...
if (start_assoc_maintenance_thread() == -1) {
    exit(EXIT_FAILURE);
}
...
Enter fullscreen mode Exit fullscreen mode
main@memcached.c:5992

Maintenance thread stop

This method is called in system shutdown process, hence it is opposite in logic to start_assoc_maintenance_thread. Nevertheless, the operations of this method are opposite that of assoc_start_expand mechanism wise.

...
    stop_assoc_maintenance_thread();
...
Enter fullscreen mode Exit fullscreen mode
main@memcached.c:6098
void stop_assoc_maintenance_thread() {
    mutex_lock(&maintenance_lock);
    do_run_maintenance_thread = 0;
    pthread_cond_signal(&maintenance_cond);
    mutex_unlock(&maintenance_lock);

    /* Wait for the maintenance thread to stop */
    pthread_join(maintenance_tid, NULL);
}
Enter fullscreen mode Exit fullscreen mode
stop_assoc_maintenance_thread@assoc.c

As said before, the expanding & migration process discussed here has an impact on the logic of all hash map related operations. In the next section, we look at these operations.

CRUD

N.b., assoc_delete has been discussed in the previous post; and in a key-value system update and insert are essentially the same, thus, this section will discuss the operations of C (create) and R (read) only.

assoc_insert

int assoc_insert(item *it, const uint32_t hv) {
    unsigned int oldbucket;

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it->h_next = old_hashtable[oldbucket]; // scr: -------------------> 1)
        old_hashtable[oldbucket] = it;
    } else {
        it->h_next = primary_hashtable[hv & hashmask(hashpower)]; // scr: > 2)
        primary_hashtable[hv & hashmask(hashpower)] = it;
    }

    pthread_mutex_lock(&hash_items_counter_lock);
    hash_items++; // scr: ------------------------------------------------> 3)
    if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
        assoc_start_expand();
    }
    pthread_mutex_unlock(&hash_items_counter_lock);

    MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey, hash_items);
    return 1;
}
Enter fullscreen mode Exit fullscreen mode
assoc_insert@assoc.c

1) If expanding process is undergoing and the hash key associated bucket has not been migrated, insert the item to old_hashtable. Note that here we use the old bucket number (i.e., hashmask(hashpower - 1))) to calculate the hash index.

2) Otherwise, insert the item to primary_hashtable directly.

3) Increase the global variable hash_items (number of item s). If it exceeds the threshold after the item is added, start expanding & migration process. Note that this is also the preamble of the previous section.

...
static unsigned int hash_items = 0;
...
Enter fullscreen mode Exit fullscreen mode
assoc.c:51

assoc_find

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {
    item *it;
    unsigned int oldbucket;

    if (expanding &&
        (oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
    {
        it = old_hashtable[oldbucket]; // scr: ---------------------------> 1)
    } else {
        it = primary_hashtable[hv & hashmask(hashpower)]; // scr: --------> 2)
    }

    item *ret = NULL;
    int depth = 0;
    while (it) { // scr: -------------------------------------------------> 3)
        if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {
            ret = it;
            break;
        }
        it = it->h_next;
        ++depth;
    }
    MEMCACHED_ASSOC_FIND(key, nkey, depth);
    return ret;
}
Enter fullscreen mode Exit fullscreen mode
assoc_find@assoc.c

1) Similar to that of assoc_insert, this step locates the bucket from old_hashtable when the key is yet to be rehashed.

2) Use primary_hashtable directly otherwise.

3) Go through the linked list and compare the key (instead of the hash index) directly to lookup the item in the case of Collision.

One thing worth noting is that assoc_find is very similar to _hashitem_before which has been discussed in the previous post. The difference here is, _hashitem_before returns the address of the next member of the element before the found one (pos = &(*pos)->h_next;), which is required when removing entries from a singly linked list; whilst this method returns the element found directly (ret = it;).

Top comments (0)