Source file
src/runtime/chan.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import (
21 "runtime/internal/atomic"
22 "unsafe"
23 )
24
25 const (
26 maxAlign = 8
27 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
28 debugChan = false
29 )
30
31 type hchan struct {
32 qcount uint
33 dataqsiz uint
34 buf unsafe.Pointer
35 elemsize uint16
36 closed uint32
37 elemtype *_type
38 sendx uint
39 recvx uint
40 recvq waitq
41 sendq waitq
42
43
44
45
46
47
48
49 lock mutex
50 }
51
52 type waitq struct {
53 first *sudog
54 last *sudog
55 }
56
57
58 func reflect_makechan(t *chantype, size int) *hchan {
59 return makechan(t, size)
60 }
61
62 func makechan64(t *chantype, size int64) *hchan {
63 if int64(int(size)) != size {
64 panic(plainError("makechan: size out of range"))
65 }
66
67 return makechan(t, int(size))
68 }
69
70 func makechan(t *chantype, size int) *hchan {
71 elem := t.elem
72
73
74 if elem.size >= 1<<16 {
75 throw("makechan: invalid channel element type")
76 }
77 if hchanSize%maxAlign != 0 || elem.align > maxAlign {
78 throw("makechan: bad alignment")
79 }
80
81 if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
82 panic(plainError("makechan: size out of range"))
83 }
84
85
86
87
88
89 var c *hchan
90 switch {
91 case size == 0 || elem.size == 0:
92
93 c = (*hchan)(mallocgc(hchanSize, nil, true))
94
95 c.buf = c.raceaddr()
96 case elem.kind&kindNoPointers != 0:
97
98
99 c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
100 c.buf = add(unsafe.Pointer(c), hchanSize)
101 default:
102
103 c = new(hchan)
104 c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
105 }
106
107 c.elemsize = uint16(elem.size)
108 c.elemtype = elem
109 c.dataqsiz = uint(size)
110
111 if debugChan {
112 print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
113 }
114 return c
115 }
116
117
118 func chanbuf(c *hchan, i uint) unsafe.Pointer {
119 return add(c.buf, uintptr(i)*uintptr(c.elemsize))
120 }
121
122
123
124 func chansend1(c *hchan, elem unsafe.Pointer) {
125 chansend(c, elem, true, getcallerpc())
126 }
127
128
140 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
141 if c == nil {
142 if !block {
143 return false
144 }
145 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
146 throw("unreachable")
147 }
148
149 if debugChan {
150 print("chansend: chan=", c, "\n")
151 }
152
153 if raceenabled {
154 racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
172 (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
173 return false
174 }
175
176 var t0 int64
177 if blockprofilerate > 0 {
178 t0 = cputicks()
179 }
180
181 lock(&c.lock)
182
183 if c.closed != 0 {
184 unlock(&c.lock)
185 panic(plainError("send on closed channel"))
186 }
187
188 if sg := c.recvq.dequeue(); sg != nil {
189
190
191 send(c, sg, ep, func() { unlock(&c.lock) }, 3)
192 return true
193 }
194
195 if c.qcount < c.dataqsiz {
196
197 qp := chanbuf(c, c.sendx)
198 if raceenabled {
199 raceacquire(qp)
200 racerelease(qp)
201 }
202 typedmemmove(c.elemtype, qp, ep)
203 c.sendx++
204 if c.sendx == c.dataqsiz {
205 c.sendx = 0
206 }
207 c.qcount++
208 unlock(&c.lock)
209 return true
210 }
211
212 if !block {
213 unlock(&c.lock)
214 return false
215 }
216
217
218 gp := getg()
219 mysg := acquireSudog()
220 mysg.releasetime = 0
221 if t0 != 0 {
222 mysg.releasetime = -1
223 }
224
225
226 mysg.elem = ep
227 mysg.waitlink = nil
228 mysg.g = gp
229 mysg.isSelect = false
230 mysg.c = c
231 gp.waiting = mysg
232 gp.param = nil
233 c.sendq.enqueue(mysg)
234 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
235
236
237 if mysg != gp.waiting {
238 throw("G waiting list is corrupted")
239 }
240 gp.waiting = nil
241 if gp.param == nil {
242 if c.closed == 0 {
243 throw("chansend: spurious wakeup")
244 }
245 panic(plainError("send on closed channel"))
246 }
247 gp.param = nil
248 if mysg.releasetime > 0 {
249 blockevent(mysg.releasetime-t0, 2)
250 }
251 mysg.c = nil
252 releaseSudog(mysg)
253 return true
254 }
255
256
257
258
259
260
261
262 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
263 if raceenabled {
264 if c.dataqsiz == 0 {
265 racesync(c, sg)
266 } else {
267
268
269
270 qp := chanbuf(c, c.recvx)
271 raceacquire(qp)
272 racerelease(qp)
273 raceacquireg(sg.g, qp)
274 racereleaseg(sg.g, qp)
275 c.recvx++
276 if c.recvx == c.dataqsiz {
277 c.recvx = 0
278 }
279 c.sendx = c.recvx
280 }
281 }
282 if sg.elem != nil {
283 sendDirect(c.elemtype, sg, ep)
284 sg.elem = nil
285 }
286 gp := sg.g
287 unlockf()
288 gp.param = unsafe.Pointer(sg)
289 if sg.releasetime != 0 {
290 sg.releasetime = cputicks()
291 }
292 goready(gp, skip+1)
293 }
294
295
296
297
298
299
300
301
302
303
304
305 func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
306
307
308
309
310
311 dst := sg.elem
312 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
313
314
315 memmove(dst, src, t.size)
316 }
317
318 func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
319
320
321
322 src := sg.elem
323 typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
324 memmove(dst, src, t.size)
325 }
326
327 func closechan(c *hchan) {
328 if c == nil {
329 panic(plainError("close of nil channel"))
330 }
331
332 lock(&c.lock)
333 if c.closed != 0 {
334 unlock(&c.lock)
335 panic(plainError("close of closed channel"))
336 }
337
338 if raceenabled {
339 callerpc := getcallerpc()
340 racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
341 racerelease(c.raceaddr())
342 }
343
344 c.closed = 1
345
346 var glist *g
347
348
349 for {
350 sg := c.recvq.dequeue()
351 if sg == nil {
352 break
353 }
354 if sg.elem != nil {
355 typedmemclr(c.elemtype, sg.elem)
356 sg.elem = nil
357 }
358 if sg.releasetime != 0 {
359 sg.releasetime = cputicks()
360 }
361 gp := sg.g
362 gp.param = nil
363 if raceenabled {
364 raceacquireg(gp, c.raceaddr())
365 }
366 gp.schedlink.set(glist)
367 glist = gp
368 }
369
370
371 for {
372 sg := c.sendq.dequeue()
373 if sg == nil {
374 break
375 }
376 sg.elem = nil
377 if sg.releasetime != 0 {
378 sg.releasetime = cputicks()
379 }
380 gp := sg.g
381 gp.param = nil
382 if raceenabled {
383 raceacquireg(gp, c.raceaddr())
384 }
385 gp.schedlink.set(glist)
386 glist = gp
387 }
388 unlock(&c.lock)
389
390
391 for glist != nil {
392 gp := glist
393 glist = glist.schedlink.ptr()
394 gp.schedlink = 0
395 goready(gp, 3)
396 }
397 }
398
399
400
401 func chanrecv1(c *hchan, elem unsafe.Pointer) {
402 chanrecv(c, elem, true)
403 }
404
405
406 func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
407 _, received = chanrecv(c, elem, true)
408 return
409 }
410
411
412
413
414
415
416
417 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
418
419
420
421 if debugChan {
422 print("chanrecv: chan=", c, "\n")
423 }
424
425 if c == nil {
426 if !block {
427 return
428 }
429 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
430 throw("unreachable")
431 }
432
433
434
435
436
437
438
439
440
441
442
443
444
445 if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
446 c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
447 atomic.Load(&c.closed) == 0 {
448 return
449 }
450
451 var t0 int64
452 if blockprofilerate > 0 {
453 t0 = cputicks()
454 }
455
456 lock(&c.lock)
457
458 if c.closed != 0 && c.qcount == 0 {
459 if raceenabled {
460 raceacquire(c.raceaddr())
461 }
462 unlock(&c.lock)
463 if ep != nil {
464 typedmemclr(c.elemtype, ep)
465 }
466 return true, false
467 }
468
469 if sg := c.sendq.dequeue(); sg != nil {
470
471
472
473
474 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
475 return true, true
476 }
477
478 if c.qcount > 0 {
479
480 qp := chanbuf(c, c.recvx)
481 if raceenabled {
482 raceacquire(qp)
483 racerelease(qp)
484 }
485 if ep != nil {
486 typedmemmove(c.elemtype, ep, qp)
487 }
488 typedmemclr(c.elemtype, qp)
489 c.recvx++
490 if c.recvx == c.dataqsiz {
491 c.recvx = 0
492 }
493 c.qcount--
494 unlock(&c.lock)
495 return true, true
496 }
497
498 if !block {
499 unlock(&c.lock)
500 return false, false
501 }
502
503
504 gp := getg()
505 mysg := acquireSudog()
506 mysg.releasetime = 0
507 if t0 != 0 {
508 mysg.releasetime = -1
509 }
510
511
512 mysg.elem = ep
513 mysg.waitlink = nil
514 gp.waiting = mysg
515 mysg.g = gp
516 mysg.isSelect = false
517 mysg.c = c
518 gp.param = nil
519 c.recvq.enqueue(mysg)
520 goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
521
522
523 if mysg != gp.waiting {
524 throw("G waiting list is corrupted")
525 }
526 gp.waiting = nil
527 if mysg.releasetime > 0 {
528 blockevent(mysg.releasetime-t0, 2)
529 }
530 closed := gp.param == nil
531 gp.param = nil
532 mysg.c = nil
533 releaseSudog(mysg)
534 return true, !closed
535 }
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550 func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
551 if c.dataqsiz == 0 {
552 if raceenabled {
553 racesync(c, sg)
554 }
555 if ep != nil {
556
557 recvDirect(c.elemtype, sg, ep)
558 }
559 } else {
560
561
562
563
564 qp := chanbuf(c, c.recvx)
565 if raceenabled {
566 raceacquire(qp)
567 racerelease(qp)
568 raceacquireg(sg.g, qp)
569 racereleaseg(sg.g, qp)
570 }
571
572 if ep != nil {
573 typedmemmove(c.elemtype, ep, qp)
574 }
575
576 typedmemmove(c.elemtype, qp, sg.elem)
577 c.recvx++
578 if c.recvx == c.dataqsiz {
579 c.recvx = 0
580 }
581 c.sendx = c.recvx
582 }
583 sg.elem = nil
584 gp := sg.g
585 unlockf()
586 gp.param = unsafe.Pointer(sg)
587 if sg.releasetime != 0 {
588 sg.releasetime = cputicks()
589 }
590 goready(gp, skip+1)
591 }
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610 func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
611 return chansend(c, elem, false, getcallerpc())
612 }
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631 func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
632 selected, _ = chanrecv(c, elem, false)
633 return
634 }
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653 func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
654
655 selected, *received = chanrecv(c, elem, false)
656 return
657 }
658
659
660 func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
661 return chansend(c, elem, !nb, getcallerpc())
662 }
663
664
665 func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
666 return chanrecv(c, elem, !nb)
667 }
668
669
670 func reflect_chanlen(c *hchan) int {
671 if c == nil {
672 return 0
673 }
674 return int(c.qcount)
675 }
676
677
678 func reflect_chancap(c *hchan) int {
679 if c == nil {
680 return 0
681 }
682 return int(c.dataqsiz)
683 }
684
685
686 func reflect_chanclose(c *hchan) {
687 closechan(c)
688 }
689
690 func (q *waitq) enqueue(sgp *sudog) {
691 sgp.next = nil
692 x := q.last
693 if x == nil {
694 sgp.prev = nil
695 q.first = sgp
696 q.last = sgp
697 return
698 }
699 sgp.prev = x
700 x.next = sgp
701 q.last = sgp
702 }
703
704 func (q *waitq) dequeue() *sudog {
705 for {
706 sgp := q.first
707 if sgp == nil {
708 return nil
709 }
710 y := sgp.next
711 if y == nil {
712 q.first = nil
713 q.last = nil
714 } else {
715 y.prev = nil
716 q.first = y
717 sgp.next = nil
718 }
719
720
721
722
723
724
725
726
727
728 if sgp.isSelect {
729 if !atomic.Cas(&sgp.g.selectDone, 0, 1) {
730 continue
731 }
732 }
733
734 return sgp
735 }
736 }
737
738 func (c *hchan) raceaddr() unsafe.Pointer {
739
740
741
742
743
744 return unsafe.Pointer(&c.buf)
745 }
746
747 func racesync(c *hchan, sg *sudog) {
748 racerelease(chanbuf(c, 0))
749 raceacquireg(sg.g, chanbuf(c, 0))
750 racereleaseg(sg.g, chanbuf(c, 0))
751 raceacquire(chanbuf(c, 0))
752 }
753
View as plain text