// Copyright 2019 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package sync import ( "sync/atomic" "unsafe" ) // poolDequeue is a lock-free fixed-size single-producer, // multi-consumer queue. The single producer can both push and pop // from the head, and consumers can pop from the tail. // // It has the added feature that it nils out unused slots to avoid // unnecessary retention of objects. This is important for sync.Pool, // but not typically a property considered in the literature. type poolDequeue struct { // headTail packs together a 32-bit head index and a 32-bit // tail index. Both are indexes into vals modulo len(vals)-1. // // tail = index of oldest data in queue // head = index of next slot to fill // // Slots in the range [tail, head) are owned by consumers. // A consumer continues to own a slot outside this range until // it nils the slot, at which point ownership passes to the // producer. // // The head index is stored in the most-significant bits so // that we can atomically add to it and the overflow is // harmless. headTail atomic.Uint64 // vals is a ring buffer of interface{} values stored in this // dequeue. The size of this must be a power of 2. // // vals[i].typ is nil if the slot is empty and non-nil // otherwise. A slot is still in use until *both* the tail // index has moved beyond it and typ has been set to nil. This // is set to nil atomically by the consumer and read // atomically by the producer. vals []eface } type eface struct { typ, val unsafe.Pointer } const dequeueBits = 32 // dequeueLimit is the maximum size of a poolDequeue. // // This must be at most (1<> dequeueBits) & mask) tail = uint32(ptrs & mask) return } func (d *poolDequeue) pack(head, tail uint32) uint64 { const mask = 1<= dequeueLimit { // Can't make it any bigger. newSize = dequeueLimit } d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) } func (c *poolChain) popHead() (any, bool) { d := c.head for d != nil { if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. d = loadPoolChainElt(&d.prev) } return nil, false } func (c *poolChain) popTail() (any, bool) { d := loadPoolChainElt(&c.tail) if d == nil { return nil, false } for { // It's important that we load the next pointer // *before* popping the tail. In general, d may be // transiently empty, but if next is non-nil before // the pop and the pop fails, then d is permanently // empty, which is the only condition under which it's // safe to drop d from the chain. d2 := loadPoolChainElt(&d.next) if val, ok := d.popTail(); ok { return val, ok } if d2 == nil { // This is the only dequeue. It's empty right // now, but could be pushed to in the future. return nil, false } // The tail of the chain has been drained, so move on // to the next dequeue. Try to drop it from the chain // so the next pop doesn't have to look at the empty // dequeue again. if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { // We won the race. Clear the prev pointer so // the garbage collector can collect the empty // dequeue and so popHead doesn't back up // further than necessary. storePoolChainElt(&d2.prev, nil) } d = d2 } }