Skip to content

Commit

Permalink
Stack race conditions on concurrent push/pop fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
arnecls committed Mar 17, 2017
1 parent 0924205 commit 9b74fce
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 60 deletions.
102 changes: 74 additions & 28 deletions tsync/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,67 +15,113 @@
package tsync

import (
"github.com/trivago/tgo/tmath"
"sync/atomic"
)

// TODO: BROKEN! Needs rewrite

// Stack implements a simple, growing, lockfree stack.
// The main idea is to use the sign bit of the head index as a mutex.
// If the index is negative, the stack is locked so we need to spin.
// If the index is non-negative the stack is unlocked and we can write or read.
type Stack struct {
data []interface{}
growBy int
top *int32
head *int32
head *uint32
spin Spinner
}

const unlockMask = uint32(0x7FFFFFFF)
const lockMask = uint32(0x80000000)

// NewStack creates a new stack with the given initial size.
// Keep in mind that size is also used to grow the stack, so a value > 1 is
// advisable.
// The given size will also be used as grow size.
// SpinPriorityMedium is used to initialize the spinner.
func NewStack(size int) Stack {
s := Stack{
data: make([]interface{}, size),
growBy: size,
top: new(int32),
head: new(int32),
return NewStackWithSpinnerAndGrowSize(size, size, NewSpinner(SpinPriorityMedium))
}

// NewStackWithGrowSize allows to pass a custom grow size to the stack.
// SpinPriorityMedium is used to initialize the spinner.
func NewStackWithGrowSize(size, grow int) Stack {
return NewStackWithSpinnerAndGrowSize(size, grow, NewSpinner(SpinPriorityMedium))
}

// NewStackWithSpinner allows to pass a custom spinner to the stack.
// The given size will also be used as grow size.
func NewStackWithSpinner(size int, spin Spinner) Stack {
return NewStackWithSpinnerAndGrowSize(size, size, spin)
}

// NewStackWithSpinnerAndGrowSize allows to fully configure the new stack.
func NewStackWithSpinnerAndGrowSize(size, grow int, spin Spinner) Stack {
return Stack{
data: make([]interface{}, tmath.MinI(size, 1)),
growBy: tmath.MinI(grow, 1),
head: new(uint32),
spin: spin,
}
*s.top--
*s.head--
return s
}

// Len returns the number of elements on the stack.
// Please note that this value can be highly unreliable in multithreaded
// environments as this is only a snapshot of the state at calltime.
func (s *Stack) Len() int {
return int(atomic.LoadUint32(s.head) & unlockMask)
}

// Pop retrieves the topmost element from the stack.
// A LimitError is returned when the stack is empty.
func (s *Stack) Pop() (interface{}, error) {
spin := s.spin
for {
top := atomic.LoadInt32(s.top)
if top < 0 {
head := atomic.LoadUint32(s.head)
unlockedHead := head & unlockMask
lockedHead := head | lockMask

// Always work with unlocked head as head might be locked
if unlockedHead == 0 {
return nil, LimitError{"Stack is empty"}
}

if atomic.CompareAndSwapInt32(s.head, top, top-1) {
data := s.data[top]
atomic.AddInt32(s.top, -1)
return data, nil
if atomic.CompareAndSwapUint32(s.head, unlockedHead, lockedHead) {
data := s.data[unlockedHead-1] // copy data
atomic.StoreUint32(s.head, unlockedHead-1) // unlock
return data, nil // ### return ###
}

spin.Yield()
}
}

// Push adds an element to the top of the stack.
// If the stack is full it is growed by its initial size and the element is added.
func (s *Stack) Push(v interface{}) {
// When the stack's capacity is reached the storage grows as defined during
// construction. If the stack reaches 2^31 elements it is considered full
// and will return an LimitError.
func (s *Stack) Push(v interface{}) error {
spin := s.spin
for {
top := atomic.LoadInt32(s.top)
if atomic.CompareAndSwapInt32(s.head, top, top+1) {
if top+1 == int32(len(s.data)) {
head := atomic.LoadUint32(s.head)
unlockedHead := head & unlockMask
lockedHead := head | lockMask

// Always work with unlocked head as head might be locked
if unlockedHead == unlockMask {
return LimitError{"Stack is full"}
}

if atomic.CompareAndSwapUint32(s.head, unlockedHead, lockedHead) {
if unlockedHead == uint32(len(s.data)) {
// Grow stack
old := s.data
s.data = make([]interface{}, len(s.data)+s.growBy)
copy(s.data, old)
}

s.data[top+1] = v
atomic.AddInt32(s.top, 1)
return
s.data[unlockedHead] = v // write to new head
atomic.StoreUint32(s.head, unlockedHead+1) // unlock
return nil // ### return ###
}

spin.Yield()
}
}
192 changes: 160 additions & 32 deletions tsync/stack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package tsync
import (
"github.com/trivago/tgo/ttesting"
"runtime"
"sync"
"sort"
"sync/atomic"
"testing"
)

Expand Down Expand Up @@ -46,48 +47,175 @@ func TestStackFunctionality(t *testing.T) {
expect.Nil(v)
}

func TestStackConcurrency(t *testing.T) {
func TestStackConcurrentPush(t *testing.T) {
expect := ttesting.NewExpect(t)

numRoutines := 10
numWrites := 100

s := NewStack(1)
start := sync.WaitGroup{}
end := sync.WaitGroup{}
start.Add(1)

go func() {
end.Add(1)
defer end.Done()
start.Wait()

for i := 0; i < 10000; i++ {
s.Push(i)
runtime.Gosched()
}
}()
ready := WaitGroup{}
start := WaitGroup{}
finished := WaitGroup{}
start.Inc()
ready.Add(numRoutines)
finished.Add(numRoutines)

pool := new(int32)

// Start reader threads
for r := 0; r < 10; r++ {
for i := 0; i < numRoutines; i++ {
go func() {
end.Add(1)
defer end.Done()
defer finished.Done()
ready.Done()
start.Wait()

lastValue := -1
errCount := 0
for i := 0; i < 1000; i++ {
v, err := s.Pop()
if err == nil {
expect.Greater(v.(int), lastValue)
lastValue = v.(int)
} else {
errCount++
}
for i := 0; i < numWrites; i++ {
num := int(atomic.AddInt32(pool, 1) - 1)
s.Push(num)
runtime.Gosched()
}
}()
}
ready.Wait()
start.Done()
finished.Wait()

expect.Equal(numRoutines*numWrites, s.Len())

numbers := make([]int, 0, numRoutines*numWrites)
for _, num := range s.data {
numbers = append(numbers, num.(int))
}

sort.Ints(numbers)
expected := 0
for _, num := range numbers {
if !expect.Equal(expected, num) {
expected = num
}
expected++
}
}

expect.Less(errCount, 1000)
func TestStackConcurrentPop(t *testing.T) {
expect := ttesting.NewExpect(t)

numRoutines := 10
numReads := 100
totalItems := numRoutines * numReads

numbers := make([]int, totalItems)
writeIdx := new(int32)

s := NewStack(numRoutines * numReads)
for i := 0; i < totalItems; i++ {
s.Push(i)
}

ready := WaitGroup{}
start := WaitGroup{}
finished := WaitGroup{}
start.Inc()
ready.Add(numRoutines)
finished.Add(numRoutines)

for i := 0; i < numRoutines; i++ {
go func() {
defer finished.Done()
ready.Done()
start.Wait()

for i := 0; i < numReads; i++ {
idx := atomic.AddInt32(writeIdx, 1) - 1
num, err := s.Pop()
expect.NoError(err)
numbers[idx] = num.(int)
runtime.Gosched()
}
}()
}
ready.Wait()
start.Done()
finished.Wait()

sort.Ints(numbers)
expected := 0
for _, num := range numbers {
if !expect.Equal(expected, num) {
expected = num
}
expected++
}
}

func TestStackConcurrentPushPop(t *testing.T) {
expect := ttesting.NewExpect(t)

numRoutines := 10
numWrites := 100
numReads := 100
totalItems := numRoutines * numWrites

numbers := make([]int, totalItems)
numIdx := new(int32)

s := NewStack(1)
ready := WaitGroup{}
start := WaitGroup{}
finished := WaitGroup{}
start.Inc()
ready.Add(numRoutines * 2)
finished.Add(numRoutines * 2)

pool := new(int32)

for i := 0; i < numRoutines; i++ {
go func() {
defer finished.Done()
ready.Done()
start.Wait()

for i := 0; i < numWrites; i++ {
num := int(atomic.AddInt32(pool, 1) - 1)
s.Push(num)
runtime.Gosched()
}
}()

go func() {
defer finished.Done()
ready.Done()
start.Wait()

for i := 0; i < numReads; i++ {
num, err := s.Pop()
if err == nil { // Some pops will fail
idx := atomic.AddInt32(numIdx, 1) - 1
numbers[idx] = num.(int)
}
runtime.Gosched()
}
}()
}
ready.Wait()
start.Done()
end.Wait()
finished.Wait()

expect.Equal(len(numbers)-int(*numIdx), s.Len())

for int(*numIdx) < len(numbers) {
num, err := s.Pop()
expect.NoError(err)
numbers[*numIdx] = num.(int)
*numIdx++
}

sort.Ints(numbers)
expected := 0
for _, num := range numbers {
if !expect.Equal(expected, num) {
expected = num
}
expected++
}
}

0 comments on commit 9b74fce

Please sign in to comment.