Skip to content

Commit

Permalink
fixes #22510 (#23100)
Browse files Browse the repository at this point in the history
(cherry picked from commit 69d0b73)
  • Loading branch information
Araq authored and narimiran committed Jun 6, 2024
1 parent 7db4376 commit d6bc869
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 27 deletions.
51 changes: 24 additions & 27 deletions lib/system/alloc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ type
freeList: ptr FreeCell
free: int # how many bytes remain
acc: int # accumulator for small object allocation
when defined(gcDestructors):
sharedFreeList: ptr FreeCell # make no attempt at avoiding false sharing for now for this object field
data {.align: MemAlign.}: UncheckedArray[byte] # start of usable memory

BigChunk = object of BaseChunk # not necessarily > PageSize!
Expand All @@ -109,7 +107,9 @@ type
MemRegion = object
when not defined(gcDestructors):
minLargeObj, maxLargeObj: int
freeSmallChunks: array[0..max(1,SmallChunkSize div MemAlign-1), PSmallChunk]
freeSmallChunks: array[0..max(1, SmallChunkSize div MemAlign-1), PSmallChunk]
when defined(gcDestructors):
sharedFreeLists: array[0..max(1, SmallChunkSize div MemAlign-1), ptr FreeCell]
flBitmap: uint32
slBitmap: array[RealFli, uint32]
matrix: array[RealFli, array[MaxSli, PBigChunk]]
Expand Down Expand Up @@ -775,8 +775,10 @@ when defined(gcDestructors):
sysAssert c.next == nil, "c.next pointer must be nil"
atomicPrepend a.sharedFreeListBigChunks, c

proc addToSharedFreeList(c: PSmallChunk; f: ptr FreeCell) {.inline.} =
atomicPrepend c.sharedFreeList, f
proc addToSharedFreeList(c: PSmallChunk; f: ptr FreeCell; size: int) {.inline.} =
atomicPrepend c.owner.sharedFreeLists[size], f

const MaxSteps = 20

proc compensateCounters(a: var MemRegion; c: PSmallChunk; size: int) =
# rawDealloc did NOT do the usual:
Expand All @@ -786,30 +788,26 @@ when defined(gcDestructors):
# we split the list in order to achieve bounded response times.
var it = c.freeList
var x = 0
var maxIters = 20 # make it time-bounded
while it != nil:
if maxIters == 0:
let rest = it.next.loada
if rest != nil:
it.next.storea nil
addToSharedFreeList(c, rest)
break
inc x, size
it = it.next.loada
dec maxIters
inc(c.free, x)
let chunk = cast[PSmallChunk](pageAddr(it))
inc(chunk.free, x)
it = it.next
dec(a.occ, x)

proc freeDeferredObjects(a: var MemRegion; root: PBigChunk) =
var it = root
var maxIters = 20 # make it time-bounded
var maxIters = MaxSteps # make it time-bounded
while true:
let rest = it.next.loada
it.next.storea nil
deallocBigChunk(a, cast[PBigChunk](it))
if maxIters == 0:
let rest = it.next.loada
it.next.storea nil
addToSharedFreeListBigChunks(a, rest)
if rest != nil:
addToSharedFreeListBigChunks(a, rest)
sysAssert a.sharedFreeListBigChunks != nil, "re-enqueing failed"
break
it = it.next.loada
it = rest
dec maxIters
if it == nil: break

Expand All @@ -833,8 +831,6 @@ proc rawAlloc(a: var MemRegion, requestedSize: int): pointer =
sysAssert c.size == PageSize, "rawAlloc 3"
c.size = size
c.acc = size
when defined(gcDestructors):
c.sharedFreeList = nil
c.free = SmallChunkSize - smallChunkOverhead() - size
sysAssert c.owner == addr(a), "rawAlloc: No owner set!"
c.next = nil
Expand All @@ -851,10 +847,11 @@ proc rawAlloc(a: var MemRegion, requestedSize: int): pointer =
when defined(gcDestructors):
if c.freeList == nil:
when hasThreadSupport:
c.freeList = atomicExchangeN(addr c.sharedFreeList, nil, ATOMIC_RELAXED)
# Steal the entire list from `sharedFreeList`:
c.freeList = atomicExchangeN(addr a.sharedFreeLists[s], nil, ATOMIC_RELAXED)
else:
c.freeList = c.sharedFreeList
c.sharedFreeList = nil
c.freeList = a.sharedFreeLists[s]
a.sharedFreeLists[s] = nil
compensateCounters(a, c, size)
if c.freeList == nil:
sysAssert(c.acc + smallChunkOverhead() + size <= SmallChunkSize,
Expand Down Expand Up @@ -921,7 +918,7 @@ proc rawDealloc(a: var MemRegion, p: pointer) =
if isSmallChunk(c):
# `p` is within a small chunk:
var c = cast[PSmallChunk](c)
var s = c.size
let s = c.size
# ^ We might access thread foreign storage here.
# The other thread cannot possibly free this block as it's still alive.
var f = cast[ptr FreeCell](p)
Expand Down Expand Up @@ -955,7 +952,7 @@ proc rawDealloc(a: var MemRegion, p: pointer) =
freeBigChunk(a, cast[PBigChunk](c))
else:
when defined(gcDestructors):
addToSharedFreeList(c, f)
addToSharedFreeList(c, f, s div MemAlign)
sysAssert(((cast[int](p) and PageMask) - smallChunkOverhead()) %%
s == 0, "rawDealloc 2")
else:
Expand Down
54 changes: 54 additions & 0 deletions tests/alloc/tmembug.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
discard """
joinable: false
"""

import std / [atomics, strutils, sequtils]

type
BackendMessage* = object
field*: seq[int]

var
chan1: Channel[BackendMessage]
chan2: Channel[BackendMessage]

chan1.open()
chan2.open()

proc routeMessage*(msg: BackendMessage) =
discard chan2.trySend(msg)

var
recv: Thread[void]
stopToken: Atomic[bool]

proc recvMsg() =
while not stopToken.load(moRelaxed):
let resp = chan1.tryRecv()
if resp.dataAvailable:
routeMessage(resp.msg)
echo "child consumes ", formatSize getOccupiedMem()

createThread[void](recv, recvMsg)

const MESSAGE_COUNT = 100

proc main() =
let msg: BackendMessage = BackendMessage(field: (0..500).toSeq())
for j in 0..0: #100:
echo "New iteration"

for _ in 1..MESSAGE_COUNT:
chan1.send(msg)
echo "After sending"

var counter = 0
while counter < MESSAGE_COUNT:
let resp = recv(chan2)
counter.inc
echo "After receiving ", formatSize getOccupiedMem()

stopToken.store true, moRelaxed
joinThreads(recv)

main()
58 changes: 58 additions & 0 deletions tests/alloc/tmembug2.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
discard """
disabled: "true"
"""

import std / [atomics, strutils, sequtils, isolation]

import threading / channels

type
BackendMessage* = object
field*: seq[int]

const MESSAGE_COUNT = 100

var
chan1 = newChan[BackendMessage](MESSAGE_COUNT*2)
chan2 = newChan[BackendMessage](MESSAGE_COUNT*2)

#chan1.open()
#chan2.open()

proc routeMessage*(msg: BackendMessage) =
var m = isolate(msg)
discard chan2.trySend(m)

var
thr: Thread[void]
stopToken: Atomic[bool]

proc recvMsg() =
while not stopToken.load(moRelaxed):
var resp: BackendMessage
if chan1.tryRecv(resp):
#if resp.dataAvailable:
routeMessage(resp)
echo "child consumes ", formatSize getOccupiedMem()

createThread[void](thr, recvMsg)

proc main() =
let msg: BackendMessage = BackendMessage(field: (0..5).toSeq())
for j in 0..100:
echo "New iteration"

for _ in 1..MESSAGE_COUNT:
chan1.send(msg)
echo "After sending"

var counter = 0
while counter < MESSAGE_COUNT:
let resp = recv(chan2)
counter.inc
echo "After receiving ", formatSize getOccupiedMem()

stopToken.store true, moRelaxed
joinThreads(thr)

main()
51 changes: 51 additions & 0 deletions tests/threads/tmembug.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

import std / [atomics, strutils, sequtils]

type
BackendMessage* = object
field*: seq[int]

var
chan1: Channel[BackendMessage]
chan2: Channel[BackendMessage]

chan1.open()
chan2.open()

proc routeMessage*(msg: BackendMessage) =
discard chan2.trySend(msg)

var
recv: Thread[void]
stopToken: Atomic[bool]

proc recvMsg() =
while not stopToken.load(moRelaxed):
let resp = chan1.tryRecv()
if resp.dataAvailable:
routeMessage(resp.msg)
echo "child consumes ", formatSize getOccupiedMem()

createThread[void](recv, recvMsg)

const MESSAGE_COUNT = 100

proc main() =
let msg: BackendMessage = BackendMessage(field: (0..500).toSeq())
for j in 0..0: #100:
echo "New iteration"

for _ in 1..MESSAGE_COUNT:
chan1.send(msg)
echo "After sending"

var counter = 0
while counter < MESSAGE_COUNT:
let resp = recv(chan2)
counter.inc
echo "After receiving ", formatSize getOccupiedMem()

stopToken.store true, moRelaxed
joinThreads(recv)

main()

0 comments on commit d6bc869

Please sign in to comment.