Skip to content

Commit

Permalink
Treat accessing_next as a RW spinlock
Browse files Browse the repository at this point in the history
* There is a tricky race condition among iteration, insertion, and
deletion, and RW spinlock is inevitable to avoid it.
  • Loading branch information
greensky00 committed Aug 22, 2017
1 parent fcaae7f commit 9908849
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 36 deletions.
124 changes: 104 additions & 20 deletions src/skiplist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* OTHER DEALINGS IN THE SOFTWARE.
*/

#include <sched.h>
#include <stdlib.h>

#include "skiplist.h"
Expand Down Expand Up @@ -143,8 +144,8 @@ void skiplist_init_node(skiplist_node *node)
ATM_STORE(node->is_fully_linked, bool_false);
ATM_STORE(node->being_modified, bool_false);
ATM_STORE(node->removed, bool_false);
ATM_STORE(node->accessing_next, bool_false);

node->accessing_next = 0;
node->top_layer = 0;
node->ref_count = 0;
}
Expand Down Expand Up @@ -208,33 +209,101 @@ static inline bool _sl_valid_node(skiplist_node *node) {
return !removed && is_fully_linked;
}

static inline void _sl_read_lock_an(skiplist_node* node) {
for(;;) {
// Wait for active writer to release the lock
uint32_t accessing_next = 0;
ATM_LOAD(node->accessing_next, accessing_next);
while (accessing_next & 0xfff00000) {
sched_yield();
ATM_LOAD(node->accessing_next, accessing_next);
}

ATM_FETCH_ADD(node->accessing_next, 0x1);
ATM_LOAD(node->accessing_next, accessing_next);
if ((accessing_next & 0xfff00000) == 0) {
return;
}

ATM_FETCH_SUB(node->accessing_next, 0x1);
}
}

static inline void _sl_read_unlock_an(skiplist_node* node) {
ATM_FETCH_SUB(node->accessing_next, 0x1);
}

static inline void _sl_write_lock_an(skiplist_node* node) {
for(;;) {
// Wait for active writer to release the lock
uint32_t accessing_next = 0;
ATM_LOAD(node->accessing_next, accessing_next);
while (accessing_next & 0xfff00000) {
sched_yield();
ATM_LOAD(node->accessing_next, accessing_next);
}

ATM_FETCH_ADD(node->accessing_next, 0x100000);
ATM_LOAD(node->accessing_next, accessing_next);
if((accessing_next & 0xfff00000) == 0x100000) {
// Wait until there's no more readers
while (accessing_next & 0x000fffff) {
sched_yield();
ATM_LOAD(node->accessing_next, accessing_next);
}
return;
}

ATM_FETCH_SUB(node->accessing_next, 0x100000);
}
}

static inline void _sl_write_unlock_an(skiplist_node* node) {
ATM_FETCH_SUB(node->accessing_next, 0x100000);
}

// Note: it increases the `ref_count` of returned node.
// Caller is responsible to decrease it.
static inline skiplist_node* _sl_next(skiplist_raw *slist,
skiplist_node *cur_node,
int layer)
{
skiplist_node *next_node = NULL;
bool bool_true = true;
bool bool_false = false;

// Turn on `accessing_next`:
// now `cur_node` is not removable from skiplist,
// which means that `cur_node->next` will be consistent
// until clearing `accessing_next`.
ATM_STORE(cur_node->accessing_next, bool_true);
_sl_read_lock_an(cur_node);
ATM_LOAD(cur_node->next[layer], next_node);
// Increase ref count of `next_node`:
// now `next_node` is not destroyable.

// << Remaining issue >>
// 1) initially: A -> B
// 2) T1: call _sl_next(A):
// A.accessing_next := true;
// next_node := B;
// ----- context switch happens here -----
// 3) T2: insert C:
// A -> C -> B
// 4) T2: and then erase B, and free B.
// A -> C B(freed)
// ----- context switch back again -----
// 5) T1: try to do something with B,
// but crash happens.
//
// ... maybe resolved using RW spinlock (Aug 21, 2017).

if (next_node) ATM_FETCH_ADD(next_node->ref_count, 1);
ATM_STORE(cur_node->accessing_next, bool_false);
_sl_read_unlock_an(cur_node);

while ( next_node && !_sl_valid_node(next_node) ) {
skiplist_node* temp = next_node;
ATM_STORE(temp->accessing_next, bool_true);
_sl_read_lock_an(temp);
ATM_LOAD(temp->next[layer], next_node);
if (next_node) ATM_FETCH_ADD(next_node->ref_count, 1);
ATM_STORE(temp->accessing_next, bool_false);
_sl_read_unlock_an(temp);
ATM_FETCH_SUB(temp->ref_count, 1);
}
return next_node;
Expand Down Expand Up @@ -285,11 +354,11 @@ void skiplist_insert(skiplist_raw *slist,
skiplist_node *node)
{
int top_layer = _sl_decide_top_layer(slist);
bool bool_true = true, bool_false = false;
bool bool_true = true;

// init node before insertion
_sl_node_init(node, top_layer);
ATM_STORE(node->accessing_next, bool_true);
_sl_write_lock_an(node);

skiplist_node* prevs[SKIPLIST_MAX_LAYER];
skiplist_node* nexts[SKIPLIST_MAX_LAYER];
Expand Down Expand Up @@ -380,14 +449,17 @@ void skiplist_insert(skiplist_raw *slist,
// bottom layer => insertion succeeded
// change prev/next nodes' prev/next pointers from 0 ~ top_layer
for (layer = 0; layer <= top_layer; ++layer) {
// `accessing_next` works as a spin-lock.
_sl_write_lock_an(prevs[layer]);
ATM_STORE(prevs[layer]->next[layer], node);
_sl_write_unlock_an(prevs[layer]);
}

// now this node is fully linked
ATM_STORE(node->is_fully_linked, bool_true);

// allow removing next nodes
ATM_STORE(node->accessing_next, bool_false);
_sl_write_unlock_an(node);

// modification is done for all layers
_sl_clr_flags(prevs, 0, top_layer);
Expand Down Expand Up @@ -594,21 +666,14 @@ int skiplist_erase_node(skiplist_raw *slist,
// bottom layer => removal succeeded.
// change prev nodes' next pointer from 0 ~ top_layer
for (cur_layer = 0; cur_layer <= top_layer; ++cur_layer) {
_sl_write_lock_an(prevs[cur_layer]);
ATM_STORE(prevs[cur_layer]->next[cur_layer], nexts[cur_layer]);
_sl_write_unlock_an(prevs[cur_layer]);
}

// now this node is unlinked
ATM_STORE(node->is_fully_linked, bool_false);

// do not return until all links are fully unreachable.
bool accessing_next = false;
do {
for (int i=0; i<=top_layer; ++i) {
ATM_LOAD(prevs[i]->accessing_next, accessing_next);
if (accessing_next) break;
}
} while (accessing_next);

// modification is done for all layers
_sl_clr_flags(prevs, 0, top_layer);
ATM_FETCH_SUB(cur_node->ref_count, 1);
Expand All @@ -617,6 +682,18 @@ int skiplist_erase_node(skiplist_raw *slist,
return 0;
}

int skiplist_erase_node_enforce(skiplist_raw *slist,
skiplist_node *node)
{
int ret = 0;
do {
ret = skiplist_erase_node(slist, node);
// if ret == -2, other thread is accessing the same node
// at the same time. try again.
} while (ret == -2);
return ret;
}

int skiplist_erase(skiplist_raw *slist,
skiplist_node *query)
{
Expand All @@ -637,6 +714,9 @@ int skiplist_erase(skiplist_raw *slist,
return ret;
}


#include <assert.h>

int skiplist_is_safe_to_free(skiplist_node* node) {
if (node->accessing_next) return 0;
if (node->being_modified) return 0;
Expand All @@ -648,8 +728,12 @@ int skiplist_is_safe_to_free(skiplist_node* node) {
return 1;
}

// Decrease reference count.
void skiplist_grab_node(skiplist_node* node) {
ATM_FETCH_ADD(node->ref_count, 1);
}

void skiplist_release_node(skiplist_node* node) {
assert(node->ref_count);
ATM_FETCH_SUB(node->ref_count, 1);
}

Expand Down
16 changes: 6 additions & 10 deletions src/skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ struct _skiplist_node;
typedef std::atomic<_skiplist_node*> atm_node_ptr;
typedef std::atomic<bool> atm_bool;
typedef std::atomic<uint16_t> atm_uint16_t;
typedef std::atomic<uint32_t> atm_uint32_t;
#else
typedef struct _skiplist_node* atm_node_ptr;
typedef uint8_t atm_bool;
typedef uint16_t atm_uint16_t;
typedef uint32_t atm_uint32_t;
#endif

#ifdef __cplusplus
Expand All @@ -55,9 +57,9 @@ typedef struct _skiplist_node {
atm_bool is_fully_linked;
atm_bool being_modified;
atm_bool removed;
atm_bool accessing_next;
uint8_t top_layer; // 0: bottom
atm_uint16_t ref_count;
atm_uint32_t accessing_next;
} skiplist_node;

// *a < *b : return neg
Expand Down Expand Up @@ -87,15 +89,12 @@ typedef struct {

void skiplist_init(skiplist_raw* slist,
skiplist_cmp_t* cmp_func);

void skiplist_free(skiplist_raw* slist);

void skiplist_init_node(skiplist_node* node);

void skiplist_free_node(skiplist_node* node);

skiplist_raw_config skiplist_get_default_config();

skiplist_raw_config skiplist_get_config(skiplist_raw* slist);

void skiplist_set_config(skiplist_raw* slist,
Expand All @@ -106,31 +105,28 @@ void skiplist_insert(skiplist_raw* slist,

skiplist_node* skiplist_find(skiplist_raw* slist,
skiplist_node* query);

skiplist_node* skiplist_find_smaller_or_equal(skiplist_raw* slist,
skiplist_node* query);

skiplist_node* skiplist_find_greater_or_equal(skiplist_raw* slist,
skiplist_node* query);

int skiplist_erase_node(skiplist_raw* slist,
skiplist_node* node);

int skiplist_erase_node_enforce(skiplist_raw *slist,
skiplist_node *node);
int skiplist_erase(skiplist_raw* slist,
skiplist_node* query);

int skiplist_is_safe_to_free(skiplist_node* node);

void skiplist_grab_node(skiplist_node* node);
void skiplist_release_node(skiplist_node* node);

skiplist_node* skiplist_next(skiplist_raw* slist,
skiplist_node* node);

skiplist_node* skiplist_prev(skiplist_raw* slist,
skiplist_node* node);

skiplist_node* skiplist_begin(skiplist_raw* slist);

skiplist_node* skiplist_end(skiplist_raw* slist);

#ifdef __cplusplus
Expand Down
9 changes: 3 additions & 6 deletions tests/mt_itr_write_erase_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,14 @@ int itr_write_erase() {
skiplist_insert(&slist, &node[ii]->snode);
}
for (int ii=0; ii<num; ii+=1) {
if (node[ii]->snode.ref_count)
printf("%d %d\n", ii, node[ii]->snode.ref_count);
CHK_EQ(0, node[ii]->snode.ref_count);
}

for (int ii=0; ii<num; ii+=2) {
skiplist_erase_node(&slist, &node[ii]->snode);
}
for (int ii=0; ii<num; ii+=1) {
if (node[ii]->snode.ref_count)
printf("%d %d\n", ii, node[ii]->snode.ref_count);
CHK_EQ(0, node[ii]->snode.ref_count);
}

args.slist = &slist;
Expand All @@ -155,8 +153,7 @@ int itr_write_erase() {
writer.join();

for (int ii=0; ii<num; ii+=1) {
if (node[ii]->snode.ref_count)
printf("%d %d\n", ii, node[ii]->snode.ref_count);
CHK_EQ(0, node[ii]->snode.ref_count);
}

skiplist_node* cursor = skiplist_begin(&slist);
Expand Down

0 comments on commit 9908849

Please sign in to comment.