Source file
src/runtime/select.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9 import (
10 "runtime/internal/atomic"
11 "unsafe"
12 )
13
14 const debugSelect = false
15
16
17
18
19 const (
20 caseNil = iota
21 caseRecv
22 caseSend
23 caseDefault
24 )
25
26
27
28
29 type scase struct {
30 c *hchan
31 elem unsafe.Pointer
32 kind uint16
33 pc uintptr
34 releasetime int64
35 }
36
37 var (
38 chansendpc = funcPC(chansend)
39 chanrecvpc = funcPC(chanrecv)
40 )
41
42 func selectsetpc(cas *scase) {
43 cas.pc = getcallerpc()
44 }
45
46 func sellock(scases []scase, lockorder []uint16) {
47 var c *hchan
48 for _, o := range lockorder {
49 c0 := scases[o].c
50 if c0 != nil && c0 != c {
51 c = c0
52 lock(&c.lock)
53 }
54 }
55 }
56
57 func selunlock(scases []scase, lockorder []uint16) {
58
59
60
61
62
63
64
65
66 for i := len(scases) - 1; i >= 0; i-- {
67 c := scases[lockorder[i]].c
68 if c == nil {
69 break
70 }
71 if i > 0 && c == scases[lockorder[i-1]].c {
72 continue
73 }
74 unlock(&c.lock)
75 }
76 }
77
78 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
79
80
81
82
83
84 gp.activeStackChans = true
85
86
87
88 atomic.Store8(&gp.parkingOnChan, 0)
89
90
91
92
93
94
95
96
97
98
99 var lastc *hchan
100 for sg := gp.waiting; sg != nil; sg = sg.waitlink {
101 if sg.c != lastc && lastc != nil {
102
103
104
105
106
107
108 unlock(&lastc.lock)
109 }
110 lastc = sg.c
111 }
112 if lastc != nil {
113 unlock(&lastc.lock)
114 }
115 return true
116 }
117
118 func block() {
119 gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1)
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133 func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
134 if debugSelect {
135 print("select: cas0=", cas0, "\n")
136 }
137
138
139
140 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
141 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
142
143 scases := cas1[:ncases:ncases]
144 pollorder := order1[:ncases:ncases]
145 lockorder := order1[ncases:][:ncases:ncases]
146
147
148
149 for i := range scases {
150 cas := &scases[i]
151 if cas.c == nil && cas.kind != caseDefault {
152 *cas = scase{}
153 }
154 }
155
156 var t0 int64
157 if blockprofilerate > 0 {
158 t0 = cputicks()
159 for i := 0; i < ncases; i++ {
160 scases[i].releasetime = -1
161 }
162 }
163
164
165
166
167
168
169
170
171
172
173 for i := 1; i < ncases; i++ {
174 j := fastrandn(uint32(i + 1))
175 pollorder[i] = pollorder[j]
176 pollorder[j] = uint16(i)
177 }
178
179
180
181 for i := 0; i < ncases; i++ {
182 j := i
183
184 c := scases[pollorder[i]].c
185 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
186 k := (j - 1) / 2
187 lockorder[j] = lockorder[k]
188 j = k
189 }
190 lockorder[j] = pollorder[i]
191 }
192 for i := ncases - 1; i >= 0; i-- {
193 o := lockorder[i]
194 c := scases[o].c
195 lockorder[i] = lockorder[0]
196 j := 0
197 for {
198 k := j*2 + 1
199 if k >= i {
200 break
201 }
202 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
203 k++
204 }
205 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
206 lockorder[j] = lockorder[k]
207 j = k
208 continue
209 }
210 break
211 }
212 lockorder[j] = o
213 }
214
215 if debugSelect {
216 for i := 0; i+1 < ncases; i++ {
217 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
218 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
219 throw("select: broken sort")
220 }
221 }
222 }
223
224
225 sellock(scases, lockorder)
226
227 var (
228 gp *g
229 sg *sudog
230 c *hchan
231 k *scase
232 sglist *sudog
233 sgnext *sudog
234 qp unsafe.Pointer
235 nextp **sudog
236 )
237
238 loop:
239
240 var dfli int
241 var dfl *scase
242 var casi int
243 var cas *scase
244 var recvOK bool
245 for i := 0; i < ncases; i++ {
246 casi = int(pollorder[i])
247 cas = &scases[casi]
248 c = cas.c
249
250 switch cas.kind {
251 case caseNil:
252 continue
253
254 case caseRecv:
255 sg = c.sendq.dequeue()
256 if sg != nil {
257 goto recv
258 }
259 if c.qcount > 0 {
260 goto bufrecv
261 }
262 if c.closed != 0 {
263 goto rclose
264 }
265
266 case caseSend:
267 if raceenabled {
268 racereadpc(c.raceaddr(), cas.pc, chansendpc)
269 }
270 if c.closed != 0 {
271 goto sclose
272 }
273 sg = c.recvq.dequeue()
274 if sg != nil {
275 goto send
276 }
277 if c.qcount < c.dataqsiz {
278 goto bufsend
279 }
280
281 case caseDefault:
282 dfli = casi
283 dfl = cas
284 }
285 }
286
287 if dfl != nil {
288 selunlock(scases, lockorder)
289 casi = dfli
290 cas = dfl
291 goto retc
292 }
293
294
295 gp = getg()
296 if gp.waiting != nil {
297 throw("gp.waiting != nil")
298 }
299 nextp = &gp.waiting
300 for _, casei := range lockorder {
301 casi = int(casei)
302 cas = &scases[casi]
303 if cas.kind == caseNil {
304 continue
305 }
306 c = cas.c
307 sg := acquireSudog()
308 sg.g = gp
309 sg.isSelect = true
310
311
312 sg.elem = cas.elem
313 sg.releasetime = 0
314 if t0 != 0 {
315 sg.releasetime = -1
316 }
317 sg.c = c
318
319 *nextp = sg
320 nextp = &sg.waitlink
321
322 switch cas.kind {
323 case caseRecv:
324 c.recvq.enqueue(sg)
325
326 case caseSend:
327 c.sendq.enqueue(sg)
328 }
329 }
330
331
332 gp.param = nil
333
334
335
336
337 atomic.Store8(&gp.parkingOnChan, 1)
338 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
339 gp.activeStackChans = false
340
341 sellock(scases, lockorder)
342
343 gp.selectDone = 0
344 sg = (*sudog)(gp.param)
345 gp.param = nil
346
347
348
349
350
351 casi = -1
352 cas = nil
353 sglist = gp.waiting
354
355 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
356 sg1.isSelect = false
357 sg1.elem = nil
358 sg1.c = nil
359 }
360 gp.waiting = nil
361
362 for _, casei := range lockorder {
363 k = &scases[casei]
364 if k.kind == caseNil {
365 continue
366 }
367 if sglist.releasetime > 0 {
368 k.releasetime = sglist.releasetime
369 }
370 if sg == sglist {
371
372 casi = int(casei)
373 cas = k
374 } else {
375 c = k.c
376 if k.kind == caseSend {
377 c.sendq.dequeueSudoG(sglist)
378 } else {
379 c.recvq.dequeueSudoG(sglist)
380 }
381 }
382 sgnext = sglist.waitlink
383 sglist.waitlink = nil
384 releaseSudog(sglist)
385 sglist = sgnext
386 }
387
388 if cas == nil {
389
390
391
392
393
394
395
396
397
398 goto loop
399 }
400
401 c = cas.c
402
403 if debugSelect {
404 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
405 }
406
407 if cas.kind == caseRecv {
408 recvOK = true
409 }
410
411 if raceenabled {
412 if cas.kind == caseRecv && cas.elem != nil {
413 raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
414 } else if cas.kind == caseSend {
415 raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
416 }
417 }
418 if msanenabled {
419 if cas.kind == caseRecv && cas.elem != nil {
420 msanwrite(cas.elem, c.elemtype.size)
421 } else if cas.kind == caseSend {
422 msanread(cas.elem, c.elemtype.size)
423 }
424 }
425
426 selunlock(scases, lockorder)
427 goto retc
428
429 bufrecv:
430
431 if raceenabled {
432 if cas.elem != nil {
433 raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
434 }
435 raceacquire(chanbuf(c, c.recvx))
436 racerelease(chanbuf(c, c.recvx))
437 }
438 if msanenabled && cas.elem != nil {
439 msanwrite(cas.elem, c.elemtype.size)
440 }
441 recvOK = true
442 qp = chanbuf(c, c.recvx)
443 if cas.elem != nil {
444 typedmemmove(c.elemtype, cas.elem, qp)
445 }
446 typedmemclr(c.elemtype, qp)
447 c.recvx++
448 if c.recvx == c.dataqsiz {
449 c.recvx = 0
450 }
451 c.qcount--
452 selunlock(scases, lockorder)
453 goto retc
454
455 bufsend:
456
457 if raceenabled {
458 raceacquire(chanbuf(c, c.sendx))
459 racerelease(chanbuf(c, c.sendx))
460 raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
461 }
462 if msanenabled {
463 msanread(cas.elem, c.elemtype.size)
464 }
465 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
466 c.sendx++
467 if c.sendx == c.dataqsiz {
468 c.sendx = 0
469 }
470 c.qcount++
471 selunlock(scases, lockorder)
472 goto retc
473
474 recv:
475
476 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
477 if debugSelect {
478 print("syncrecv: cas0=", cas0, " c=", c, "\n")
479 }
480 recvOK = true
481 goto retc
482
483 rclose:
484
485 selunlock(scases, lockorder)
486 recvOK = false
487 if cas.elem != nil {
488 typedmemclr(c.elemtype, cas.elem)
489 }
490 if raceenabled {
491 raceacquire(c.raceaddr())
492 }
493 goto retc
494
495 send:
496
497 if raceenabled {
498 raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
499 }
500 if msanenabled {
501 msanread(cas.elem, c.elemtype.size)
502 }
503 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
504 if debugSelect {
505 print("syncsend: cas0=", cas0, " c=", c, "\n")
506 }
507 goto retc
508
509 retc:
510 if cas.releasetime > 0 {
511 blockevent(cas.releasetime-t0, 1)
512 }
513 return casi, recvOK
514
515 sclose:
516
517 selunlock(scases, lockorder)
518 panic(plainError("send on closed channel"))
519 }
520
521 func (c *hchan) sortkey() uintptr {
522 return uintptr(unsafe.Pointer(c))
523 }
524
525
526
527 type runtimeSelect struct {
528 dir selectDir
529 typ unsafe.Pointer
530 ch *hchan
531 val unsafe.Pointer
532 }
533
534
535 type selectDir int
536
537 const (
538 _ selectDir = iota
539 selectSend
540 selectRecv
541 selectDefault
542 )
543
544
545 func reflect_rselect(cases []runtimeSelect) (int, bool) {
546 if len(cases) == 0 {
547 block()
548 }
549 sel := make([]scase, len(cases))
550 order := make([]uint16, 2*len(cases))
551 for i := range cases {
552 rc := &cases[i]
553 switch rc.dir {
554 case selectDefault:
555 sel[i] = scase{kind: caseDefault}
556 case selectSend:
557 sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
558 case selectRecv:
559 sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
560 }
561 if raceenabled || msanenabled {
562 selectsetpc(&sel[i])
563 }
564 }
565
566 return selectgo(&sel[0], &order[0], len(cases))
567 }
568
569 func (q *waitq) dequeueSudoG(sgp *sudog) {
570 x := sgp.prev
571 y := sgp.next
572 if x != nil {
573 if y != nil {
574
575 x.next = y
576 y.prev = x
577 sgp.next = nil
578 sgp.prev = nil
579 return
580 }
581
582 x.next = nil
583 q.last = x
584 sgp.prev = nil
585 return
586 }
587 if y != nil {
588
589 y.prev = nil
590 q.first = y
591 sgp.next = nil
592 return
593 }
594
595
596
597 if q.first == sgp {
598 q.first = nil
599 q.last = nil
600 }
601 }
602
View as plain text