Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proposal: sync: support for sharded values #18802

Open
aclements opened this issue Jan 26, 2017 · 96 comments
Open

proposal: sync: support for sharded values #18802

aclements opened this issue Jan 26, 2017 · 96 comments

Comments

@aclements
Copy link
Member

Per-CPU sharded values are a useful and common way to reduce contention on shared write-mostly values. However, this technique is currently difficult or impossible to use in Go (though there have been attempts, such as @jonhoo's https://github.com/jonhoo/drwmutex and @bcmills' https://go-review.googlesource.com/#/c/35676/).

We propose providing an API for creating and working with sharded values. Sharding would be encapsulated in a type, say sync.Sharded, that would have Get() interface{}, Put(interface{}), and Do(func(interface{})) methods. Get and Put would always have to be paired to make Do possible. (This is actually the same API that was proposed in #8281 (comment) and rejected, but perhaps we have a better understanding of the issues now.) This idea came out of off-and-on discussions between at least @rsc, @hyangah, @RLH, @bcmills, @Sajmani, and myself.

This is a counter-proposal to various proposals to expose the current thread/P ID as a way to implement sharded values (#8281, #18590). These have been turned down as exposing low-level implementation details, tying Go to an API that may be inappropriate or difficult to support in the future, being difficult to use correctly (since the ID may change at any time), being difficult to specify, and as being broadly susceptible to abuse.

There are several dimensions to the design of such an API.

Get and Put can be blocking or non-blocking:

  • With non-blocking Get and Put, sync.Sharded behaves like a collection. Get returns immediately with the current shard's value or nil if the shard is empty. Put stores a value for the current shard if the shard's slot is empty (which may be different from where Get was called, but would often be the same). If the shard's slot is not empty, Put could either put to some overflow list (in which case the state is potentially unbounded), or run some user-provided combiner (which would bound the state).

  • With blocking Get and Put, sync.Sharded behaves more like a lock. Get returns and locks the current shard's value, blocking further Gets from that shard. Put sets the shard's value and unlocks it. In this case, Put has to know which shard the value came from, so Get can either return a put function (though that would require allocating a closure) or some opaque value that must be passed to Put that internally identifies the shard.

  • It would also be possible to combine these behaviors by using an overflow list with a bounded size. Specifying 0 would yield lock-like behavior, while specifying a larger value would give some slack where Get and Put remain non-blocking without allowing the state to become completely unbounded.

Do could be consistent or inconsistent:

  • If it's consistent, then it passes the callback a snapshot at a single instant. I can think of two ways to do this: block until all outstanding values are Put and also block further Gets until the Do can complete; or use the "current" value of each shard even if it's checked out. The latter requires that shard values be immutable, but it makes Do non-blocking.

  • If it's inconsistent, then it can wait on each shard independently. This is faster and doesn't affect Get and Put, but the caller can only get a rough idea of the combined value. This is fine for uses like approximate statistics counters.

It may be that we can't make this decision at the API level and have to provide both forms of Do.

I think this is a good base API, but I can think of a few reasonable extensions:

  • Provide Peek and CompareAndSwap. If a user of the API can be written in terms of these, then Do would always be able to get an immediate consistent snapshot.

  • Provide a Value operation that uses the user-provided combiner (if we go down that API route) to get the combined value of the sync.Sharded.

@aclements
Copy link
Member Author

aclements commented Jan 26, 2017

My own inclination is towards the non-blocking API with a bounded overflow list. A blocking API seems antithetical to the goal of reducing contention and may lead to performance anomalies if a goroutine or OS thread is descheduled while it has a shard checked out and a non-blocking API with a required combiner may prevent certain use cases (e.g., large structures, or uses that never read the whole sharded value.) It also devolves to the blocking API if the bound is 0.

@ianlancetaylor
Copy link
Contributor

The proposal as written is rather abstract. I think it would help to examine the specific use cases that people have for such a thing.

For example, it's clear that one use case is collecting metrics. Presumably the idea is that you have some sort of server, and it wants to log various metrics for each request. The metrics only need to be accumulated when they are reported, and reporting happens much less often than collection. Using a lock;update;unlock sequence will lead to lock contention. But (let's say) we need the metrics to be accurate. So the idea of sharding for this case is a lock;update;unlock sequence with a sharded lock, and an accumulate step that does lock;collect;zero;unlock for each sharded metric. That gives us the values we need while minimizing lock contention.

One way to implement this use case is for the sync.Sharded to require a combiner method as you describe. Conceptually, then:

func (s *Sharded) Get() interface{} {
    s.LockCurrentShard()
    r := s.CurrentShardValue()
    s.SetCurrentShardValue(nil)
    s.UnlockCurrentShard()
    return r
}

func (s *Sharded) Put(v interface{}) {
    s.LockCurrentShard()
    defer s.UnlockCurrentShard()
    c := s.CurrentShardValue()
    if c == nil {
        s.SetCurrentShardValue(v)
    } else {
        m := s.Combine(c, v) // Combine function defined by user.
        s.SetCurrentShardValue(m)
    }
}

For typical metrics the Do method does not to be consistent. However, it's not hard to have a consistent Do as long as the function passed to Do does not use the sync.Sharded value itself.

With this outline, we see that there is no need for sync.Sharded to maintain an overflow list. Any case that wants to use an overflow list will do so in the Combine function. Obviously the Combine function must not use the sync.Sharded value, as that may lead to deadlock, but otherwise it can do whatever it likes.

What other uses are there for sync.Sharded, and what sorts of implementation do they suggest?

@bcmills
Copy link
Contributor

bcmills commented Jan 26, 2017

I had been considering a somewhat narrower API containing only Get and one of {Range, Do, ForEach}, bounding the number of distinct values to the number of threads in the program. The calling code would provide a func() interface{} at construction time to use when Get is called on a thread without an existing value.

The semantics would be similar to the non-blocking proposal: Get returns the current shard's value (but does not guarantee exclusiveness), and Range iterates over all existing values.

Because of the lack of exclusiveness, application code would still have to use atomic and/or sync to manipulate the values, but if the value is uncontended and usually owned by the same core's cache, the overhead of that application-side synchronization would be relatively small (compared to locking overhead for non-sharded values).

That approach two a few advantages over the alternatives in the current proposal.

  1. There is no "overflow list" to manage. The number of values is strictly bounded to the number of threads, and the value for a given thread cannot accidentally migrate away or be dropped.
  2. Application code using atomic values (as for the stats-counter use case in sync: per-cpu storage #8281) would not have to deal with lock-ordering (as it would with a blocking Get and Put).
  3. There is no possibility of deadlock (or overallocation of values) due to a missing Put in application code. This is perhaps less significant if we can make trivial defer usage less expensive (runtime: defer is slow #14939) and/or add a vet check (along the lines of lostcancel), but it seems simpler to avoid the problem in the first place.

It has one disadvantage that I'm aware of:

  1. Application code must include its own synchronization code.

Are there other tradeoffs for or against the narrower Get/Range API?

@bcmills
Copy link
Contributor

bcmills commented Jan 26, 2017

[Using the "current" value of each shard even if it's checked out] requires that shard values be immutable, but it makes Do non-blocking.

It doesn't even require immutability: "externally synchronized" and/or "atomic" would suffice, although "externally synchronized" carries the risk of lock-ordering issues.

@bcmills
Copy link
Contributor

bcmills commented Jan 26, 2017

One way to implement [consistent counting] is for the sync.Sharded to require a combiner method as you describe.

Anything that reduces values seems tricky to get right: you'd have to ensure that Do iterates in an order such that Combine cannot move a value that Do has already iterated over into one that it has yet to encounter (and vice-versa), otherwise you risk under- or double-counting that value.

I don't immediately see how to provide that property for Do in the general case without reintroducing a cross-thread contention point in Put, but it may be possible.

@ianlancetaylor
Copy link
Contributor

For a consistent Do, first lock all the shards, then run the function on each value, then unlock all the shards. For an inconsistent Do, it doesn't matter.

@bcmills
Copy link
Contributor

bcmills commented Jan 26, 2017

For a consistent Do, first lock all the shards, then run the function on each value, then unlock all the shards.

That essentially makes Do a stop-the-world operation: it not only blocks all of those threads until Do completes, but also invalidates the cache lines containing the per-shard locks in each of the local CPU caches.

Ideally, Do should produce much less interference in the steady state: it should only acquire/invalidate locks that are not in the fast path of Get and Put. If the values are read using atomic, that doesn't need to invalidate any cache lines at all: the core processing Do might need to wait to receive an up-to-date value, but since there is no write to the cross-core data the existing cached value doesn't need to be invalidated.

I guess that means I'm in favor of an inconsistent Do, provided that we don't discover a very compelling use-case for making it consistent.

@funny-falcon
Copy link
Contributor

For some usages there should be strict knowledge of bounding number of allocated "values", ie number of allocated values should not change. And preferrably, values should be allocated at predictable time, for example, at container (Sharded) creation. For that kind of usage, interface with Put is unuseful.

Probably, it should be separate container:

//NewFixSharded preallocates all values by calling alloc function, and returns new FixSharded.
//FixSharded never changes its size, ie never allocates new value after construction.
NewFixShareded(alloc func() interface) *FixSharded {}
//NewFixShardedN preallocates exactly n values by calling alloc function, and returns new FixSharded.
NewFixSharededN(n int, alloc func() interface) *FixSharded {}
func (a *FixSharded) Get() interface{} {}

If size never changes, there is no need in Do or ForEach or locks.
Application code must include its own synchronization code.

Rational: GOMAXPROCS changes rarely (almost never), so dynamic allocation excessive.

I could be mistaken about GOMAXPROCS constantness.

@ianlancetaylor
Copy link
Contributor

@bcmills Well, as I said earlier, I think we need to look at specific use cases. For the specific use case I was discussing, I assert that the cost of a consistent Do is irrelevant, because it is run very infrequently.

What specific use case do you have in mind?

@bcmills
Copy link
Contributor

bcmills commented Jan 26, 2017

@ianlancetaylor I'm specifically thinking about counting (as in #8281) and CPU-local caching (e.g. buffering unconditional stores to a shared map, a potential optimization avenue for #18177).

@funny-falcon
Copy link
Contributor

I'm thinking about stat-collectors and high-performance RPC.

@ianlancetaylor
Copy link
Contributor

@bcmills For counting, it seems to me you would use an inconsistent Do. If you need to avoid inconsistency while still using an inconsistent Do, have the combiner store the additional counts elsewhere and not modify the previously stored value. Presumably the combiner is only called in rare cases, so the speed of that rare case is not too important. You could even mitigate that cost by stacking sync.Sharded values.

I don't actually see how to write a consistent Do that does not disturb the fast path of Get and Put at all.

One approach for buffering stores to a shared map would be a Do that removes all the values, replacing them with nil. Come to think it, that would work for counters also. But it does interfere with the fast path.

@funny-falcon Can you expand on what you mean by "high-performance RPC"? I don't see why you need a global distributed value for RPC.

@balasanjay
Copy link
Contributor

Perhaps stating the obvious, but one slightly tricky thing is when to GC stale per-thread values when GOMAXPROCs is decreased.

For some use-cases (e.g. distributed mutexes), they will presumably have a reference keeping the stale values alive.

For others (e.g. counters), you'd need to keep around the value until its been accumulated.

Also, in the pony category: if I want a distributed int64 counter, they would have sufficient padding to avoid false-sharing, but if I allocate multiple such counters, they could be instantiated within the padding, so to speak. I think this could maybe be built in user-space on top of a more low-level API, but if its possible for the API to provide it directly, that'd be great.

@funny-falcon
Copy link
Contributor

@ianlancetaylor I maintain connector to in-memory transactional database capable to serve more than 1M requests per second.

To be able to send that rate of requests, and to be able to scale smoothly with CPU cores, I have to shard internal data structures of connector. (And I need to build custom hash table, and build custom timers. But sharding is a base of improvement). Without sharding there is too many lock contention.

If there will be shard-to-cpu alignment (even if it will be not strict), it will help further reduce lock contention and improve CPU-cache utilization.

As I understood, most of users doesn't change GOMAXPROCS on the fly, so I'd prefer fixed number of preallocated shards, cause then I can easily map responses from server back to shard.

I still think, simple low-level "ProcHint" api (as proposed in #18590 (comment) ) will be sufficient. But if want for api to look "higher level", then I'd be satisfied with FixSharded.

@funny-falcon
Copy link
Contributor

Link to improved ProcHint api proposal: #18590 (comment)

@funny-falcon
Copy link
Contributor

Excuse me for a bit offtopic:
How often GOMAXPROCS changed at runtime in production workloads? What patterns of this change exists?

@rsc
Copy link
Contributor

rsc commented Jan 27, 2017

Programs might change GOMAXPROCS in response to getting more or less of a machine as co-tenancy changes.

@Sajmani
Copy link
Contributor

Sajmani commented Jan 27, 2017

I'll document some concrete use case examples:

  1. Scalar counters incremented in Go server request handlers
func serveRequest(...) {
  requestCounter.Add(1)
}
func serveCounter(w responseWriter, ...) {
  w.Print(requestCounter.Count())
}

I believe with @aclements 's API we would implement this as:

func (c *counter) Add(n int) {
  if v, ok := c.shards.Get().(int); ok {
    v += n
    c.shards.Put(v)
  } else {
    c.shards.Put(n)
  }
}
func (c *counter) Count() (v int) {
  c.shards.Do(func(shard interface{}) {
    v += shard.(int)
  })
  return v
}
  1. Non-scalar (map) metrics in Go server request handlers
func serveRequest(...) {
  user := userFromRequest(req)
  userRequestCounter.Add(1, user)
}
func serveCounter(w responseWriter, ...) {
  w.Print(userRequestCounter.Map())
}

I believe with @aclements 's API we would implement this as:

func (c *mapCounter) Add(n int, key string) {
  if m, ok := c.shards.Get().(map[string]int); ok {
    m[key] += n
    c.shards.Put(m)
  } else {
    c.shards.Put(map[string]int{key: n})
  }
}
func (c *mapCounter) Map() map[string]int {
  m := make(map[string]int)
  c.shards.Do(func(shard interface{}) {
    for key, count := range shard.(map[string]int) {
      m[key] += count
    }
  })
  return m
}

@aclements does that all look right?

@bradfitz
Copy link
Contributor

we would implement this as

  if v, ok := c.shards.Get().(int); ok {
    v += n
    c.shards.Put(v)
  } else {
    c.shards.Put(n)
  }

... allocating an integer for every increment? (ints into interfaces cause an allocation)

@bcmills
Copy link
Contributor

bcmills commented Jan 27, 2017

And my experience with sync.Pool is that calling Put with the type-asserted value tends to introduce an extra allocation too (for the interface{} value).

So we'd probably actually want to write it as:

p := c.shards.Get()
if p == nil {
  p = new(int)
}
*(p.(*int)) += n
c.shards.Put(p)

(Or else we'll want to fix the extra allocation for the Put call through some compiler optimization.)

@jonhoo
Copy link

jonhoo commented Jan 27, 2017

I wonder if this could also be used to build a better multi-core RWLock similar to my drwmutex. From the proposal thus far, it sound like it might be tricky to implement something like "as a writer, take all locks, and disallow new locks to be added while you hold those locks".

@bcmills
Copy link
Contributor

bcmills commented Jan 27, 2017

@jonhoo

it sounds like it might be tricky to implement something like "as a writer, take all locks, and disallow new locks to be added while you hold those locks".

Tricky but possible, I think. You can add a sync.Mutex to be acquired in the exclusive path and when adding new locks, but you have to be careful about lock-ordering.

The harder part is that if you want to satisfy the existing sync.RWMutex API you have to handle the possibility that the RUnlock call occurs on a different thread from the RLock. One thing you could do is keep a locked bool on each of the sharded values and add a slow-path fallback for the case where your thread's read-lock isn't the one you locked.

A sketch with the blocking version of Get and Put:

type readerLock struct {
  locked bool
  mu sync.Mutex
}

func (m *RWMutex) RLock() {
  i := m.readers.Get()
  l, _ := i.(*readerLock)
  if l != nil && !l.locked {
    l.Lock()
    return
  }
  if l.locked {
    m.readers.Put(i)  // Put this one back and allocate a new one.
  }

  l = &readerLock{locked: true}
  l.Lock()
  m.add.Lock()
  m.readers.Put(l)
  m.add.Unlock()
}

func (m *RWMutex) RUnlock() {
  i := m.readers.Get()
  l, _ := i.(*readerLock)
  if l != nil && l.locked {
    l.Unlock()
    return
  }
  unlocked := false
  m.readers.Do(func(i interface{}) {
    if unlocked {
      return
    }
    l := i.(*readerLock)
    if l.locked {
      l.Unlock()
      unlocked = true
    }
  })
}

@funny-falcon
Copy link
Contributor

funny-falcon commented Jan 28, 2017

Technique used by @valyala to improve timers at https://go-review.googlesource.com/#/c/34784/ exactly shows why runtime.ProcHint() (or Pid()) is useful for high performance multi-core programming.

I agree with @valyala that ProcMaxHint most likely doesn't need. It is enough to have fixed size number of "shards".

So, even if you decided to stick with sync.Sharded, then, please, add also sync.FixSharded with preallocated "values".

@bcmills
Copy link
Contributor

bcmills commented Jan 28, 2017

That approach looks like it would be just as easy to implement (and perhaps with less cross-core contention) using Sharded.

Note that there's nothing stopping a user of the proposed Sharded API from using a fixed set of values with it. FixSharded is redundant.

@funny-falcon
Copy link
Contributor

@bcmills, well, yes: if allocator function returns pointers to preallocated values, then it looks like FixSharded. You are right.

@robpike
Copy link
Contributor

robpike commented Jan 30, 2017

Please don't call this sharding, which is either an obscure English term or a term of art for distributed computing. Neither fits. It's a per-CPU thing, so call it something like perCPU or percpu.

@Sajmani
Copy link
Contributor

Sajmani commented Jan 30, 2017 via email

@mknyszek
Copy link
Contributor

mknyszek commented Jan 10, 2024

Here's a possible API in the vein of what @aclements suggested.

// ShardedValue is a pool of values that all represent a small piece of a single
// conceptual value. These values must implement Shard on themselves.
//
// The purpose of a ShardedValue is to enable the creation of scalable data structures
// that may be updated in shards that are local to the goroutine without any
// additional synchronization. In practice, shards are bound to lower-level
// scheduling resources, such as OS threads and CPUs, for efficiency.
//
// The zero value is ready for use.
type ShardedValue[T Shard[T]] struct {
    // NewShard is a function that produces new shards of type T.
    NewShard func() T

    // unexported fields
}

// Update acquires a shard and passes it to the provided function.
// The function returns the new shard value to return to the pool.
// Update is safe to call from multiple goroutines.
// Callers are encouraged to keep the provided function as short
// as possible, and are discouraged from blocking within them.
func (s *ShardedValue[T]) Update(func(value T) T)

// Value snapshots all values in the pool and returns the result of merging them all into
// a single value. This single value is guaranteed to represent a consistent snapshot of
// merging all outstanding shards at some point in time between when the call is made
// and when it returns. This single value is immediately added back into the pool as a
// single shard before being returned.
//
// Note: Value will block while there are any outstanding Update functions executing.
// The implementation of Value expects this function to be called infrequently relative
// to calls to Update or Merge.
func (s *ShardedValue[T]) Value() T

// Drain snapshots all values in the pool and returns the result of merging them all into
// a single value. This single value is guaranteed to represent a consistent snapshot of
// merging all outstanding shards at some point in time between when the call is made
// and when it returns. Unlike Value, this single value is not added back to the pool.
//
// Note: Drain will block while there are any outstanding Update functions executing.
// The implementation of Drain expects this function to be called infrequently relative
// to calls to Update or Merge.
func (s *ShardedValue[T]) Drain() T

// Shard is an interface implemented by types whose values can be merged
// with values of the same type.
type Shard[T] interface {
    Merge(other T) T
}

Overall, I like this kind of API personally. It encapsulates the way we've made things scalable in the runtime in the past, but in a less ad-hoc way. I have a few examples of what you could implement with this: a scalable counter, a scalable cache, and scalable log writing. The last one is a little rough and this message is long enough as it is, so I'll save the last two for later if others find any interest in this particular API. Here's the counter for a simple example of how to use it:

type Counter struct {
    sp sync.ShardedValue[counterInt]
}

func (c *Counter) Add(value int) {
    c.sp.Update(func(v counterInt) counterInt {
        return v+value
    })
}

func (c *Counter) Value() int {
    return int(c.sp.Value())
}

type counterInt int

func (a counterInt) Merge(b counterInt) counterInt {
    return a+b
}

Drain may not be strictly necessary. That is a last-minute addition motivated mainly by caching use-cases: it gives you a way to flush every cache globally. Value is insufficient for this.

Austin pointed out to me that one big question with this API is: what are the semantics of Value in relation to Update? We enumerated 3 possible options:

  • Read/write lock semantics. In this case, Update is seen as kind of a read-lock, and Value as a write-lock. If Value is executing, then all Update calls block on it. If any Update is executing, then Value blocks on it. Lastly, Update calls don't block on each other. This ensures that the value seen via Update is not only a consistent snapshot, but also never stale. This is the easiest one to understand and use, but probably the least performant. It probably has the broadest applicability.
  • Ragged semantics. In this case neither Update nor Value block. Instead, Value is a best-effort merging of all the sharded values. It may not represent a valid snapshot and it can be constructed of arbitrarily stale shards. While this sounds very loose, it's definitely sufficient for many use-cases and the most performant.
  • Stale snapshot (RCU?) semantics. In this case, Update never blocks for any reason. Instead, Value tells all Update calls to write somewhere else and waits for a quiescence: a moment in time where no Update calls are executing to observe that each P/thread/goroutine/whatever it not in Update (though not simultaneously). At this point it can be sure the old values Update was looking at are only read from, not written to, so it can safely merge them and construct a snapshot. This is how the memory metrics for runtime/metrics work, and is similar to how the new execution tracer flushes buffers on all threads. This ensures the snapshot is always a valid snapshot of all the sharded values at some point in time, but it can be arbitrarily stale (usually not very stale in practice). This is a compromise between the first two.

The API I wrote about picks number the third option, but I think the other two are also very reasonable. I think it'll come down to exactly what use-cases we want to enable.

@qiulaidongfeng
Copy link
Contributor

This API means that using SharedValue also requires customizing a type to implement the Merge method.
And some use it, such as #24479, to only store the data separately! I don't want to merge data because it's not necessary.

@aclements
Copy link
Member Author

aclements commented Jan 10, 2024

This API means that using SharedValue also requires customizing a type to implement the Merge method.

It would be a minor change to instead have a Merge func(a, b T) T callback function in ShardedValue and eliminate the Shard constraint. I'm not sure if a nil value should mean to just drop values, or to never merge. With the particular way @mknyszek defined Value and Drain, "never merge" isn't really an option, but if we defined those differently, I think I would prefer it to mean "never merge" because it's always possible to provide a trivial merge function that drops values.

@Merovius
Copy link
Contributor

Given that this is about performance: It seems to me that generally, a method is easier to devirtualize and potentially inline than a function field. Not that a virtual call is not still cheaper than a mutex, but as this is about building a primitive that people can't build themselves - and as it seems relatively easy to have a wrapper do the opposite, use a function field to implement a method - maybe the base case should be the more efficient one?

And FWIW as another alternative, you could also make the type parameter unconstrained and do a type-assertion to make the Merge method optional.

@mknyszek
Copy link
Contributor

IMO, the API is more flexible and less error-prone if the runtime understands how to do the merge itself.

Specifically, what happens if you want to call Update during an Update callback? Without a merge, it's unclear what to do in the second Update, since the current shard is already checked out by the first. It would probably have to panic. With a Merge method however, the semantics are clear and the API doesn't need to panic: the runtime will simply merge the result of the two Update calls. This reentrancy seems unnecessary at face value, but it can end up simplifying this kind of code significantly.

This has come from experience with the runtime, for example with the execution tracer (at least as a simplifying future path) and with debuglog, and it is not just for convenience.

Also, given that this is intended to be an expert API, adding a trivial merge method in cases where the answer is "I don't care" doesn't really seem like a big deal to me. It's 3 additional lines of code to add a merge method that just drops one side of the merge.

@bcmills
Copy link
Contributor

bcmills commented Jan 10, 2024

I worry about the check-in/check-out model being potentially too fragile — what happens if the user-supplied Update function ends up blocking on something, or otherwise running long enough that it would need to be preempted?

@aclements
Copy link
Member Author

I don't think preemption is an issue. The only effect of an Update function running for a long time is that calls to Value may have to block, depending on the synchronization model we want to provide. The same would be true of any model that can either combine or enumerate all of the shards, which I think is fundamental to the idea of a sharded value. It's true that if it gets preempted, it may cause Value to block for longer. We could always have special logic in the scheduler to make this less likely (there's not much we can do if it just plain blocks).

@aclements
Copy link
Member Author

Read/write lock semantics. In this case, Update is seen as kind of a read-lock, and Value as a write-lock. If Value is executing, then all Update calls block on it. If any Update is executing, then Value blocks on it. Lastly, Update calls don't block on each other. This ensures that the value seen via Update is not only a consistent snapshot, but also never stale. This is the easiest one to understand and use, but probably the least performant. It probably has the broadest applicability.

I agree this is the easiest to understand. I think you meant "This ensures that the value seen via UpdateValue is not only a consistent snapshot, but also never stale." Though I'm not sure it's meaningful to say the value is "never stale". As soon as Value returns, the value could be stale. There is a meaningful happens-before relation here though: read/write lock semantics mean that Update and Value participate in the happens-before graph. With ragged semantics (below), Value doesn't have a happens-before relationship. I think RCU semantics do imply a happens-before, but I'd have to think harder to be positive.

Ragged semantics. In this case neither Update nor Value block. Instead, Value is a best-effort merging of all the sharded values. It may not represent a valid snapshot and it can be constructed of arbitrarily stale shards. While this sounds very loose, it's definitely sufficient for many use-cases and the most performant.
Stale snapshot (RCU?) semantics. In this case, Update never blocks for any reason. Instead, Value tells all Update calls to write somewhere else and waits for a quiescence: a moment in time where no Update calls are executing. At this point it can be sure the old values Update was looking at are only read from, not written to, so it can safely merge them and construct a snapshot. This is how the memory metrics for runtime/metrics work, and is similar to how the new execution tracer flushes buffers on all threads. This ensures the snapshot is always a valid snapshot of all the sharded values at some point in time, but it can be arbitrarily stale (usually not very stale in practice). This is a compromise between the first two.

"a moment in time where no Update calls are executing" isn't strictly true. Value would need to wait for any currently running Update calls to complete, but does not need to wait for calls that start after Value's linearization point.

I believe both ragged and RCU semantics require that the Update callback either 1. doesn't modify its argument in place or 2. synchronizes with Merge. 1 is easy if T is a primitive, but if T is a pointer type, then it would be on the callback to implement the "copy" part of RCU. If the user doesn't correctly implement copying, then the code could be racy. This is potentially pretty subtle, but then again this is an "expert" API.

@mknyszek
Copy link
Contributor

Though I'm not sure it's meaningful to say the value is "never stale". As soon as Value returns, the value could be stale.

Ah yes, oops. You're totally right. "Less stale." :P

"a moment in time where no Update calls are executing" isn't strictly true. Value would need to wait for any currently running Update calls to complete, but does not need to wait for calls that start after Value's linearization point.

'Doh. Yes, that's what I intended to write (and how all the RCU-ish things in the runtime work) but clearly not what I wrote. Value just needs to observe that every P/thread/goroutine/whatever is not in Update at some point in time.

I believe both ragged and RCU semantics require that the Update callback either 1. doesn't modify its argument in place or 2. synchronizes with Merge. 1 is easy if T is a primitive, but if T is a pointer type, then it would be on the callback to implement the "copy" part of RCU. If the user doesn't correctly implement copying, then the code could be racy. This is potentially pretty subtle, but then again this is an "expert" API.

You're right, that's definitely a pitfall here with those two.

@bcmills
Copy link
Contributor

bcmills commented Jan 10, 2024

The only effect of an Update function running for a long time is that calls to Value may have to block

Does that pose lock-ordering problems? We already have two different kinds of synchronization that users have to consider for lock-ordering (channels and mutexes), so it seems like that would add a third kind of lock to reason about.

@aclements
Copy link
Member Author

Does that pose lock-ordering problems?

I don't think it's any different than calling an API that uses blocking synchronization.

@balasanjay
Copy link
Contributor

Is it possible to implement a distributed read-write mutex using this proposed API?

(E.g. RLock() on the distributed mutex would acquire a read-lock on any shard, but Lock() on the distributed mutex would effectively acquire a write-lock on every shard)

@qiulaidongfeng
Copy link
Contributor

Is it possible to implement a distributed read-write mutex using this proposed API?

(E.g. RLock() on the distributed mutex would acquire a read-lock on any shard, but Lock() on the distributed mutex would effectively acquire a write-lock on every shard)

This is not support #18802 (comment)
This is support https://go-review.googlesource.com/c/go/+/552515

@qiulaidongfeng
Copy link
Contributor

Summarize the status of the proposal

The problem here is to provide an API to help programs that compete heavily between multiple cores, such as counters for multi-core contention or Shard read/write lock.

If only to save the data to a different CPU cache line, my https://go-review.googlesource.com/c/go/+/552515 can solve this issue.
@aclements says that synchronization can be eliminated using the check-out/check-in model.
I agree that there are opportunities to eliminate synchronization, but such API have the following drawbacks

  1. Use only for types that can be merged.If the API stores data types as slices, such as in sync: eliminate global Mutex in Pool operations #24479, there is no need to merge values. This creates unnecessary new slice allocation and memory copy overhead.
  2. This API cannot be used in scenarios where both a shard and all Shards need to be acquired, such as shard read/write locks. Because there is no secure way to merge two read/write locks into one read/write lock.

I think that with the check-out/check-in model, the possible API should look like this:
Supports sharding values of any type, and can obtain one shard or all Shards
For example

// Shard store a batch data in different CPU cache line
type Shard[T any] struct {}
func NewShard()*Shard
// Update get a data
// Ensure that this data is not given to other callers until the function return
// The pointer should not be retained after the call end
func (*Shard)Update(func(*T))
// Range get all the data
// The pointer should not be retained after the call end
// If Update is called at the same time,
// Range will first fetch the data not given to the Update caller and wait for the Update call to end, 
// repeating the process until all the data has been given to the Range caller only once
func (*Shard)Range(func(*T))

This API deliberately lets callers get Pointers because the benefits outweigh the disadvantages.
This facilitates shard read/write locks. Because you can't copy the lock.
If a value is returned instead of a pointer, if the type is a slice, it actually makes no essential difference.
Because it is not safe to save a slice value or pointer after the call ends.
There is always the possibility of data competition.

Whether the zero value of this API is available depends on the implementation, and if the zero value is available, you can omit the New function.
This API can be used without merging data, because if the implementation makes the number of Shards small, merging is not necessary.
For example, to store data in a map in P, the number of Shards can be GOMAXPROCS(0).

@mknyszek
Copy link
Contributor

Is it possible to implement a distributed read-write mutex using this proposed API?

(E.g. RLock() on the distributed mutex would acquire a read-lock on any shard, but Lock() on the distributed mutex would effectively acquire a write-lock on every shard)

Can you describe in more detail what you mean? What are the exact semantics of what you have in mind? Do you have any further reading to share? Thanks.

This would be really useful, since I think what this proposal's precise semantics will come down to is use-cases. (There's also a question of whether or not this is a use-case we want to support, or if perhaps this would be better served by some other API, but I'll reserve any judgement like that until I understand it better.)

I agree that there are opportunities to eliminate synchronization, but such API have the following drawbacks

  1. Use only for types that can be merged.If the API stores data types as slices, such as in sync: eliminate global Mutex in Pool operations #24479, there is no need to merge values. This creates unnecessary new slice allocation and memory copy overhead.

I get what you're saying: there's unnecessary overhead in accumulating all the values. I think this is motivating for having a way to iterate over all outstanding shards (like your proposed Range), which the runtime would have to do anyway. I'm personally generally supportive of this, and perhaps Value and Drain are not strictly necessary.

However, I'd still argue for requiring a Merge method. I feel strongly that there needs to be a way to drop values for both implementation flexibility and for defining the semantics of corner cases more simply (after all, GOMAXPROCS can change, but there's also the reentrancy semantics I mentioned earlier). Merge gives you a way to do that but also lets you express things like counters more naturally. In terms of the overhead, if you only used Range and never Value, then calls to Merge would be very rare, so the additional overhead of copying or additional allocations would be very small.

  1. This API cannot be used in scenarios where both a shard and all Shards need to be acquired, such as shard read/write locks. Because there is no secure way to merge two read/write locks into one read/write lock.

Again, I'm not really sure what the distributed read/write lock means exactly, but the T in my proposal could easily be something like a *sync.Mutex. There's no way to "merge" locks, true, but the Merge method could safely drop one of them (again, this is only a guess; I'm not sure I understand this use-case yet).

This API deliberately lets callers get Pointers because the benefits outweigh the disadvantages. This facilitates shard read/write locks. Because you can't copy the lock. If a value is returned instead of a pointer, if the type is a slice, it actually makes no essential difference. Because it is not safe to save a slice value or pointer after the call ends. There is always the possibility of data competition.

This seems overly restrictive. Requiring the API to pass a *T makes the API's usage awkward for reference types like slices and maps. A *[]byte is also likely to force an allocation of the slice header. And like I mentioned above, you can always pass a pointer type as the type argument to the API I proposed and it becomes equivalent to the pointer arguments you proposed.

@qiulaidongfeng
Copy link
Contributor

distributed read-write mutex : https://github.com/jonhoo/drwmutex for this implementation .
Why do need this , See #17973 as long as RWMutex is on a CPU cache line, the performance of a large number of CPU cores contention will not be very good, because different CPU cores are competing for atomic access to the same block of memory.

If the shard will not too much, such as https://go-review.googlesource.com/c/go/+/552515 that can set number.
Or the number of shards of other apis is within the number of P.
It is worthwhile to support this use case.
Because on many core machines, this is helpful when you read more and write less.

Note that if you have two RWMutex, which have different numbers of readers holding read locks, I can't think of any way to merge safely, and sync provides an API that obviously can't merge without unsafe.

If you need to merge, I think that's a sign of too many shards, which is a sign that the implementation needs to be improved.
GOMAXPROCS can be changed, but if it is changed to an integer that is too large (i.e. far beyond the number of CPU cores), I think such cases should cause the caller to rethink this practice.
Because the CPU can execute a limited number of instructions at the same time.
If the GOMAXPROCS is changed to a smaller number, even if the number of shards is large, it can be solved by placing it in a linked list of P or global.

I think the final API, giving the caller T or *T is just a means, as long as it solves the problem.

@qiulaidongfeng
Copy link
Contributor

I think the discussion of whether there is a merge method actually implies this question:

What is the number of Shards?

I recommend that the number of Shards be within the GOMAXPROCS, because then the number of Shards will not be too large, so there is no need to merge.
Please comment if you have different opinions.
But note that shard should be allocated on the heap, New and Merge have a cost, there are considerations of CPU cache hit ratio, the impact of more objects on GC, and different opinions should balance the benefits and losses.

@balasanjay
Copy link
Contributor

Can you describe in more detail what you mean? What are the exact semantics of what you have in mind? Do you have any further reading to share? Thanks.

https://github.com/jonhoo/drwmutex does communicate the general idea. But to summarize: if you have a read-mostly, but contended use-case, and are willing to pay the additional memory cost, you can replace a single RWMutex with N independent instances. Readers will RLock and RUnlock one specific instance, Writers will Lock all of them sequentially. This will ~eliminate contention for read-only use-cases, at the cost of more memory and an O(P) write operation.

The approach also works if the number of shards is dynamically growing, but is a bit more complicated. When a shard is first allocated, the first reader must notice that its assigned lock is previously unused and actually perform effectively a Lock()/Unlock()/RLock() sequence to ensure that there isn't currently a writer who wasn't aware of this particular shard. I don't really see how to make it mergeable, while still being built on top of the standard RWMutex for each shard.

Some more use-cases that come to mind:

  • sharded pRNGs (obviously not seedable, but efficiently usable from multiple goroutines). I think its a bit awkward, because you need to call Update() with a closure that will capture the result of the wrapped RNG and then return it once Update returns. Seems doable, though. Merge would presumably just drop one of the RNGs.
  • sharded ID allocation. Similar to the counter use-case, but the idea is I want to hand out unique ids with high throughput and minimal contention. Instead of a single global counter, you can have per-core caches which will amortize the contention by asking for N ids from the global counter, and then handing out those N ids to requestors. N could be a constant, or a good implementation might vary it adaptively based on time-since-last-refresh. Similar kind of awkwardness with Update() but doable. Merge would either merge the available ID ranges, or just drop one of them.

For a performance-oriented API, Value as defined seems relatively heavyweight to me, and seems to violate one of the cardinal rules of multicore performance: don't turn reads into writes. E.g. I'd imagine that a medium-sized server would have thousands of counters for measuring metrics, and a scrape interval of say 15s. In that case, every 15 seconds, you have a significant moment of churn where all the counters are reorganized into a single shard. But ~all of that is unnecessary, we just need to get an aggregate sum (or similar for histograms), without necessarily consolidating the shards. So I definitely agree that Range or something like it would be much more preferable.

@mknyszek
Copy link
Contributor

distributed read-write mutex : https://github.com/jonhoo/drwmutex for this implementation .

Thanks, that's helpful.

What is the number of Shards?

Right, I suppose what I never really stated is that I think a dynamically-sized pool is the right choice.

Ps in the runtime have so many different types of caches because it's scalable and effective as a performance tool. But because they're tied to Ps, which are very fixed resources, there is a ton of complexity around making sure you have a P, what to do when you don't have one, and trying to prevent resources from getting "stuck" to Ps that don't run. A dynamic pool would help with all those things, because shards can be created and destroyed at will, though most of the time they would stay exactly as they are, still retaining the vast majority of the scalability benefit.

I admit, this is a bit abstract, and how well that translates into regular Go code remains to be seen. I have a gut feeling that offering a more powerful and flexible choice (dynamic pool management) is going to be the right choice in the long term without sacrificing expressivity for the immediate use-cases.

On that note, https://github.com/jonhoo/drwmutex actually made me realize something, and I think there's actually a generalization we can make to allow the effective implementation of statically sized pools where it makes sense. (See further down.)

https://github.com/jonhoo/drwmutex does communicate the general idea. But to summarize: ...

Thanks, that makes sense. I agree that is an awkward fit to the API I proposed.

The approach also works if the number of shards is dynamically growing, but is a bit more complicated.

Right. If you tried to use the API for this use-case very directly, it's going to force you to do the much more complicated thing. I sympathize with that. However (and this is the compromise I mentioned above) I think you can always implement a statically-sized shard pool using the dynamic pool by using the dynamic pool to manage indices into the static pool.

The core problem with implementing the distributed read-write mutex today is trying to associate a read-lock with a P/thread/whatever. When I look at https://github.com/jonhoo/drwmutex, the main thing it seems to struggle with is finding an index that has some affinity to the current scheduling resource.

What sync.ShardedValue gives you, however, is exactly that association. So you could have a sync.ShardedValue[int] (really something like type myIndex int) that provides you with a P/thread/whatever-local index into a static set of RWMutexes. There's a little bit of fiddling necessary to come up with a good indexing scheme, but at a baseline you can have N (say, N=GOMAXPROCS at some point in time) indexes and distribute them with NewShard, returning the indexes back to a free list if they get dropped by Merge. (There's a question as to what to do if NewShard creates one too many indicies, but DRWMutex can assume this is rare or transient.) The important part though is that the index has a strong affinity to a scheduling resource.

Some more use-cases that come to mind:

Thanks, I think those are good use-cases and something that should definitely be considered in the future as part of a corpus of possible use-cases that designs here are tested against. The ID allocation example made me go "oh, duh"; there are least two of those in the runtime.

For a performance-oriented API, Value as defined seems relatively heavyweight to me, and seems to violate one of the cardinal rules of multicore performance: don't turn reads into writes.

I think it depends a lot on the semantics of Value. I agree that the way I proposed it, where the pool is actually drained, is probably too much.

@qiulaidongfeng
Copy link
Contributor

Right, I suppose what I never really stated is that I think a dynamically-sized pool is the right choice ...

Is there a maximum dynamic size?
The nonblocking of the Update, I think, just changes the overhead from the wait time of the Update to the execution time of the New and Merge.

@qiulaidongfeng
Copy link
Contributor

qiulaidongfeng commented Jan 12, 2024

Right, I suppose what I never really stated is that I think a dynamically-sized pool is the right choice ...

If the use case is an http server.
sync.ShardedValue[int64] use as a counter to record the cumulative number of requests.
Then, because of a DDOS attack, the server suddenly receives millions of requests, which are processed in millions of Goroutines.
dynamically-sized pool is it possible that the number of shards is large? Like the number is 10,000?

@mknyszek
Copy link
Contributor

Is there a maximum dynamic size?

Not one that would be documented, but I think in any reasonable implementation the hard upper-bound would be the number of threads in the program (so long as the application didn't recursively call Update within the callback[1]). This might happen if every goroutine blocks in a syscall in the Update callback that cannot be deduplicated with netpoll. I think in general my advice would be "try to avoid that," but I also don't think blocking in the Update callback should be disallowed, necessarily.

In practice, however, it would stay quite close to GOMAXPROCS, though, if it's being used actively.

[1] You could create an unbounded number of shards if you called Update recursively, but the reentrancy is really just supposed to allow for one or two levels deep at most. If you can guarantee that it only goes one level deep, then the worst case maximum would be 2x the number of threads.

@qiulaidongfeng
Copy link
Contributor

Is there a maximum dynamic size?

Not one that would be documented, but I think in any reasonable implementation the hard upper-bound would be the number of threads in the program (so long as the application didn't recursively call Update within the callback[1]). This might happen if every goroutine blocks in a syscall in the Update callback that cannot be deduplicated with netpoll. I think in general my advice would be "try to avoid that," but I also don't think blocking in the Update callback should be disallowed, necessarily.

In practice, however, it would stay quite close to GOMAXPROCS, though, if it's being used actively.

[1] You could create an unbounded number of shards if you called Update recursively, but the reentrancy is really just supposed to allow for one or two levels deep at most. If you can guarantee that it only goes one level deep, then the worst case maximum would be 2x the number of threads.

It seems that this means that the Merge method can be opt-in. Because usage scenario of sync.SharedValue often does not require recursive calls to Update in callback.
For usage scenarios such as log buffer, the cost of merge slice, in addition to the execution time of make+copy, may also include runtime.memmove or other assembly functions, see #59902 and #64417, which are currently not preempted by the go scheduler. I suspect that in some cases, merging with DDOS attacks may cause STW time to be too long or serious tail latency?

@qiulaidongfeng
Copy link
Contributor

By summarizing my point of view, I will list some areas where there are different opinions on this proposal, so as to help people who see this proposal to more easily understand the progress of this proposal.

  1. The Value and Drain methods are not necessary. Because this is more convenient in counters, but as discussed above, in other scenarios, this prevents hard-to-merge types such as *sync.RWMutex, reducing the usefulness of the API. It may also make it easier to write programs that can have long STW or tail delays. Callers who need such methods should implement them themselves.
  2. The problem here is to provide an API to help programs that compete heavily between multiple cores. The API only needs to provide methods to get a single shard and all shard. Whether counters need to provide a separate API is a matter of concern for specific use cases. If ShardedValue is provided, it should be as generic as possible.
  3. The important information to determine whether the Merge method is required is the maximum number of shards and the type of data. My view is no need or opt-in. Because the information I have and my use cases have not proven that there must have a Merge method.

By the way, with so much discussion, it seems that all we need to do is come up with a sufficiently universal API to solve the problem. This proposal can be reached through consensus and accepted.

@mknyszek
Copy link
Contributor

mknyszek commented Jan 17, 2024

It seems that this means that the Merge method can be opt-in. Because usage scenario of sync.SharedValue often does not require recursive calls to Update in callback. For usage scenarios such as log buffer,

The Merge method really goes beyond recursive calls; that's just one example. It's really about giving the implementation and user-level code flexibility.

What happens when a goroutine gets preempted in the Update function? We don't want to pin the P or thread the entire time it's in there, which means that another goroutine might run on the same P later and call back into Update. What happens then? If the runtime is able to Merge shards, it can just generate new shards on the fly and we can be confident everything will be accounted for later. Otherwise calls into Update may need to block or spin or something, all of which is going to be complex and difficult to make really efficient. A rare slow path to create a new shard is going to be much simpler to reason about, and the Update function never has to block or spin (depending on the semantics of Range). IMO this also makes the API simpler to use, because you don't have to be quite so careful about keeping your Update callback small. It's certainly better if you do, but the only downside in most cases is slightly (and transiently) more memory use.

Lastly, IMO, making the Merge method opt-in seems like the most complicated choice. Two different sets of semantics and limitations in the same data structure seems like it'll make the API confusing.

the cost of merge slice, in addition to the execution time of make+copy, may also include runtime.memmove or other assembly functions, see #59902 and #64417, which are currently not preempted by the go scheduler. I suspect that in some cases, merging with DDOS attacks may cause STW time to be too long or serious tail latency?

I don't really know what you mean by this. In the log buffer scenario, I'm imagining sharding fixed-size buffers. Merging them would not involve actually concatenating them; that seems like it would be too expensive regardless.

To be totally clear, Merge does not mean that you have to actually merge the two values in some way. It's purely conceptual, giving you the choice as to how to handle the fact that the implementation will only let you put back one of two values into the pool. In the log scenario I'm imagining you would flush a buffer to a global list and keep the other, for example.

@qiulaidongfeng
Copy link
Contributor

qiulaidongfeng commented Jan 17, 2024

To be totally clear, Merge does not mean that you have to actually merge the two values in some way. It's purely conceptual, giving you the choice as to how to handle the fact that the implementation will only let you put back one of two values into the pool. In the log scenario I'm imagining you would flush a buffer to a global list and keep the other, for example.

Thank you for your reply. This is very useful information. Maybe our use cases are different, so our perspectives are different.
I use cases, #24479 or counter or https://github.com/jonhoo/drwmutex the read-write lock.
I can do without the Merge method.
Even see #18802 (comment), I counter cases, shard a fixed number of GOMAXPROCS(0) can have a huge performance boost.
Maybe my use case is better suited for a fixed number of Shards, and yours is better suited for a dynamic number of Shards.
I had expected the API to meet the needs of both cases, so I suggested that the Merge method consider an opt-in design. If the implementation is too complicated, I may not have thought it through enough.

@qiulaidongfeng
Copy link
Contributor

See #51317 (comment) , I found a new use case that uses sync.ShardedValue , it need by using the dynamic pool to manage indices into the static pool.
For third-party implementations that support multi-Goroutine use of arena, this proposal can be optimized for some hot paths that require atomic addition operations.
For Example:
https://gitee.com/qiulaidongfeng/arena/blob/master/buf.go#L44

This path currently has all goroutine competing for atomic addition operations on the same memory. Combined with other hot paths, this results in performance dropping from about 11ns for a single goroutine to about 32ns at 16P.
With sync.ShardValue in place, I believe that using it would allow third-party implementations of arena to perform roughly the same in both single-goroutine and multi-goroutine scenarios.

@qiulaidongfeng
Copy link
Contributor

qiulaidongfeng commented Jan 29, 2024

Considering the https://github.com/jonhoo/drwmutex and https://github.com/jonhoo/drwmutex
Is it possible to provide a sync.ShardedValue of static size, or provide a generic API by using the dynamic pool to manage indices into the static pool For Example in amd64:

type Int int

func (a Int) Merge(b Int) Int {
    return a+b
}

type value[T any] truct{
       v T
      lock Mutex // Ensure the check-out/check-in model
      _ [64]byte // Separate in different cacheline
}

// StaticShard is equivalent ShardedValue is Static size
type StaticShard[T any] struct{
    index ShardedValue[Int]
    pool []value
    count atomic.Int64
}

func NewStaticShard[T any](Size uint)*StaticShard[T]{
    if Size==0{
        panic("size ==0")
    }
    ret:=new(StaticShard[T])
    ret.pool=make([]value,Size)
    return ret
}

func (s *StaticShard[T]) index() int{
     var index int
     s.index.Update(func (i Int)Int{
           if i!=0{
              index=i
              return i
          }
         index=s.count.Add(1)%len(s.pool)
         return index
    })
}

func (s *StaticShard[T]) Update(f func(value T) T){
    index:=s.index()
    ref:=&s.pool[index]
    ref.lock.Lock()
    defer ref.lock.Unlock()
    ref.v=f(ref.v)
}

func (s *StaticShard[T])Range(f func(value T) T){
    for i:=range s.pool{
        s.pool[i].lock.Lock()
    }

    defer func(){
      for i:=range s.pool{
           s.pool[i].lock.Unlock()
      }
    }()

     for i:=range s.pool{
            ref.v=f(ref.v)
      }
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.