Text file src/pkg/runtime/chan.c
1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 #include "runtime.h"
6 #include "arch_GOARCH.h"
7 #include "type.h"
8 #include "race.h"
9 #include "malloc.h"
10
11 #define MAXALIGN 7
12 #define NOSELGEN 1
13
14 static int32 debug = 0;
15
16 typedef struct WaitQ WaitQ;
17 typedef struct SudoG SudoG;
18 typedef struct Select Select;
19 typedef struct Scase Scase;
20
21 struct SudoG
22 {
23 G* g; // g and selgen constitute
24 uint32 selgen; // a weak pointer to g
25 SudoG* link;
26 int64 releasetime;
27 byte* elem; // data element
28 };
29
30 struct WaitQ
31 {
32 SudoG* first;
33 SudoG* last;
34 };
35
36 // The garbage collector is assuming that Hchan can only contain pointers into the stack
37 // and cannot contain pointers into the heap.
38 struct Hchan
39 {
40 uintgo qcount; // total data in the q
41 uintgo dataqsiz; // size of the circular q
42 uint16 elemsize;
43 bool closed;
44 uint8 elemalign;
45 Alg* elemalg; // interface for element type
46 uintgo sendx; // send index
47 uintgo recvx; // receive index
48 WaitQ recvq; // list of recv waiters
49 WaitQ sendq; // list of send waiters
50 Lock;
51 };
52
53 uint32 runtime·Hchansize = sizeof(Hchan);
54
55 // Buffer follows Hchan immediately in memory.
56 // chanbuf(c, i) is pointer to the i'th slot in the buffer.
57 #define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
58
59 enum
60 {
61 // Scase.kind
62 CaseRecv,
63 CaseSend,
64 CaseDefault,
65 };
66
67 struct Scase
68 {
69 SudoG sg; // must be first member (cast to Scase)
70 Hchan* chan; // chan
71 byte* pc; // return pc
72 uint16 kind;
73 uint16 so; // vararg of selected bool
74 bool* receivedp; // pointer to received bool (recv2)
75 };
76
77 struct Select
78 {
79 uint16 tcase; // total count of scase[]
80 uint16 ncase; // currently filled scase[]
81 uint16* pollorder; // case poll order
82 Hchan** lockorder; // channel lock order
83 Scase scase[1]; // one per case (in order of appearance)
84 };
85
86 static void dequeueg(WaitQ*);
87 static SudoG* dequeue(WaitQ*);
88 static void enqueue(WaitQ*, SudoG*);
89 static void destroychan(Hchan*);
90 static void racesync(Hchan*, SudoG*);
91
92 Hchan*
93 runtime·makechan_c(ChanType *t, int64 hint)
94 {
95 Hchan *c;
96 uintptr n;
97 Type *elem;
98
99 elem = t->elem;
100
101 // compiler checks this but be safe.
102 if(elem->size >= (1<<16))
103 runtime·throw("makechan: invalid channel element type");
104
105 if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > MaxMem / elem->size))
106 runtime·panicstring("makechan: size out of range");
107
108 // calculate rounded size of Hchan
109 n = sizeof(*c);
110 while(n & MAXALIGN)
111 n++;
112
113 // allocate memory in one call
114 c = (Hchan*)runtime·mal(n + hint*elem->size);
115 c->elemsize = elem->size;
116 c->elemalg = elem->alg;
117 c->elemalign = elem->align;
118 c->dataqsiz = hint;
119 runtime·settype(c, (uintptr)t | TypeInfo_Chan);
120
121 if(debug)
122 runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; elemalign=%d; dataqsiz=%D\n",
123 c, (int64)elem->size, elem->alg, elem->align, (int64)c->dataqsiz);
124
125 return c;
126 }
127
128 // For reflect
129 // func makechan(typ *ChanType, size uint64) (chan)
130 void
131 reflect·makechan(ChanType *t, uint64 size, Hchan *c)
132 {
133 c = runtime·makechan_c(t, size);
134 FLUSH(&c);
135 }
136
137 // makechan(t *ChanType, hint int64) (hchan *chan any);
138 void
139 runtime·makechan(ChanType *t, int64 hint, Hchan *ret)
140 {
141 ret = runtime·makechan_c(t, hint);
142 FLUSH(&ret);
143 }
144
145 /*
146 * generic single channel send/recv
147 * if the bool pointer is nil,
148 * then the full exchange will
149 * occur. if pres is not nil,
150 * then the protocol will not
151 * sleep but return if it could
152 * not complete.
153 *
154 * sleep can wake up with g->param == nil
155 * when a channel involved in the sleep has
156 * been closed. it is easiest to loop and re-run
157 * the operation; we'll see that it's now closed.
158 */
159 void
160 runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
161 {
162 SudoG *sg;
163 SudoG mysg;
164 G* gp;
165 int64 t0;
166
167 if(c == nil) {
168 USED(t);
169 if(pres != nil) {
170 *pres = false;
171 return;
172 }
173 runtime·park(nil, nil, "chan send (nil chan)");
174 return; // not reached
175 }
176
177 if(runtime·gcwaiting)
178 runtime·gosched();
179
180 if(debug) {
181 runtime·printf("chansend: chan=%p; elem=", c);
182 c->elemalg->print(c->elemsize, ep);
183 runtime·prints("\n");
184 }
185
186 t0 = 0;
187 mysg.releasetime = 0;
188 if(runtime·blockprofilerate > 0) {
189 t0 = runtime·cputicks();
190 mysg.releasetime = -1;
191 }
192
193 runtime·lock(c);
194 // TODO(dvyukov): add similar instrumentation to select.
195 if(raceenabled)
196 runtime·racereadpc(c, pc, runtime·chansend);
197 if(c->closed)
198 goto closed;
199
200 if(c->dataqsiz > 0)
201 goto asynch;
202
203 sg = dequeue(&c->recvq);
204 if(sg != nil) {
205 if(raceenabled)
206 racesync(c, sg);
207 runtime·unlock(c);
208
209 gp = sg->g;
210 gp->param = sg;
211 if(sg->elem != nil)
212 c->elemalg->copy(c->elemsize, sg->elem, ep);
213 if(sg->releasetime)
214 sg->releasetime = runtime·cputicks();
215 runtime·ready(gp);
216
217 if(pres != nil)
218 *pres = true;
219 return;
220 }
221
222 if(pres != nil) {
223 runtime·unlock(c);
224 *pres = false;
225 return;
226 }
227
228 mysg.elem = ep;
229 mysg.g = g;
230 mysg.selgen = NOSELGEN;
231 g->param = nil;
232 enqueue(&c->sendq, &mysg);
233 runtime·park(runtime·unlock, c, "chan send");
234
235 if(g->param == nil) {
236 runtime·lock(c);
237 if(!c->closed)
238 runtime·throw("chansend: spurious wakeup");
239 goto closed;
240 }
241
242 if(mysg.releasetime > 0)
243 runtime·blockevent(mysg.releasetime - t0, 2);
244
245 return;
246
247 asynch:
248 if(c->closed)
249 goto closed;
250
251 if(c->qcount >= c->dataqsiz) {
252 if(pres != nil) {
253 runtime·unlock(c);
254 *pres = false;
255 return;
256 }
257 mysg.g = g;
258 mysg.elem = nil;
259 mysg.selgen = NOSELGEN;
260 enqueue(&c->sendq, &mysg);
261 runtime·park(runtime·unlock, c, "chan send");
262
263 runtime·lock(c);
264 goto asynch;
265 }
266
267 if(raceenabled)
268 runtime·racerelease(chanbuf(c, c->sendx));
269
270 c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
271 if(++c->sendx == c->dataqsiz)
272 c->sendx = 0;
273 c->qcount++;
274
275 sg = dequeue(&c->recvq);
276 if(sg != nil) {
277 gp = sg->g;
278 runtime·unlock(c);
279 if(sg->releasetime)
280 sg->releasetime = runtime·cputicks();
281 runtime·ready(gp);
282 } else
283 runtime·unlock(c);
284 if(pres != nil)
285 *pres = true;
286 if(mysg.releasetime > 0)
287 runtime·blockevent(mysg.releasetime - t0, 2);
288 return;
289
290 closed:
291 runtime·unlock(c);
292 runtime·panicstring("send on closed channel");
293 }
294
295
296 void
297 runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
298 {
299 SudoG *sg;
300 SudoG mysg;
301 G *gp;
302 int64 t0;
303
304 if(runtime·gcwaiting)
305 runtime·gosched();
306
307 if(debug)
308 runtime·printf("chanrecv: chan=%p\n", c);
309
310 if(c == nil) {
311 USED(t);
312 if(selected != nil) {
313 *selected = false;
314 return;
315 }
316 runtime·park(nil, nil, "chan receive (nil chan)");
317 return; // not reached
318 }
319
320 t0 = 0;
321 mysg.releasetime = 0;
322 if(runtime·blockprofilerate > 0) {
323 t0 = runtime·cputicks();
324 mysg.releasetime = -1;
325 }
326
327 runtime·lock(c);
328 if(c->dataqsiz > 0)
329 goto asynch;
330
331 if(c->closed)
332 goto closed;
333
334 sg = dequeue(&c->sendq);
335 if(sg != nil) {
336 if(raceenabled)
337 racesync(c, sg);
338 runtime·unlock(c);
339
340 if(ep != nil)
341 c->elemalg->copy(c->elemsize, ep, sg->elem);
342 gp = sg->g;
343 gp->param = sg;
344 if(sg->releasetime)
345 sg->releasetime = runtime·cputicks();
346 runtime·ready(gp);
347
348 if(selected != nil)
349 *selected = true;
350 if(received != nil)
351 *received = true;
352 return;
353 }
354
355 if(selected != nil) {
356 runtime·unlock(c);
357 *selected = false;
358 return;
359 }
360
361 mysg.elem = ep;
362 mysg.g = g;
363 mysg.selgen = NOSELGEN;
364 g->param = nil;
365 enqueue(&c->recvq, &mysg);
366 runtime·park(runtime·unlock, c, "chan receive");
367
368 if(g->param == nil) {
369 runtime·lock(c);
370 if(!c->closed)
371 runtime·throw("chanrecv: spurious wakeup");
372 goto closed;
373 }
374
375 if(received != nil)
376 *received = true;
377 if(mysg.releasetime > 0)
378 runtime·blockevent(mysg.releasetime - t0, 2);
379 return;
380
381 asynch:
382 if(c->qcount <= 0) {
383 if(c->closed)
384 goto closed;
385
386 if(selected != nil) {
387 runtime·unlock(c);
388 *selected = false;
389 if(received != nil)
390 *received = false;
391 return;
392 }
393 mysg.g = g;
394 mysg.elem = nil;
395 mysg.selgen = NOSELGEN;
396 enqueue(&c->recvq, &mysg);
397 runtime·park(runtime·unlock, c, "chan receive");
398
399 runtime·lock(c);
400 goto asynch;
401 }
402
403 if(raceenabled)
404 runtime·raceacquire(chanbuf(c, c->recvx));
405
406 if(ep != nil)
407 c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
408 c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
409 if(++c->recvx == c->dataqsiz)
410 c->recvx = 0;
411 c->qcount--;
412
413 sg = dequeue(&c->sendq);
414 if(sg != nil) {
415 gp = sg->g;
416 runtime·unlock(c);
417 if(sg->releasetime)
418 sg->releasetime = runtime·cputicks();
419 runtime·ready(gp);
420 } else
421 runtime·unlock(c);
422
423 if(selected != nil)
424 *selected = true;
425 if(received != nil)
426 *received = true;
427 if(mysg.releasetime > 0)
428 runtime·blockevent(mysg.releasetime - t0, 2);
429 return;
430
431 closed:
432 if(ep != nil)
433 c->elemalg->copy(c->elemsize, ep, nil);
434 if(selected != nil)
435 *selected = true;
436 if(received != nil)
437 *received = false;
438 if(raceenabled)
439 runtime·raceacquire(c);
440 runtime·unlock(c);
441 if(mysg.releasetime > 0)
442 runtime·blockevent(mysg.releasetime - t0, 2);
443 }
444
445 // chansend1(hchan *chan any, elem any);
446 #pragma textflag 7
447 void
448 runtime·chansend1(ChanType *t, Hchan* c, ...)
449 {
450 runtime·chansend(t, c, (byte*)(&c+1), nil, runtime·getcallerpc(&t));
451 }
452
453 // chanrecv1(hchan *chan any) (elem any);
454 #pragma textflag 7
455 void
456 runtime·chanrecv1(ChanType *t, Hchan* c, ...)
457 {
458 runtime·chanrecv(t, c, (byte*)(&c+1), nil, nil);
459 }
460
461 // chanrecv2(hchan *chan any) (elem any, received bool);
462 #pragma textflag 7
463 void
464 runtime·chanrecv2(ChanType *t, Hchan* c, ...)
465 {
466 byte *ae, *ap;
467
468 ae = (byte*)(&c+1);
469 ap = ae + t->elem->size;
470 runtime·chanrecv(t, c, ae, nil, ap);
471 }
472
473 // func selectnbsend(c chan any, elem any) bool
474 //
475 // compiler implements
476 //
477 // select {
478 // case c <- v:
479 // ... foo
480 // default:
481 // ... bar
482 // }
483 //
484 // as
485 //
486 // if selectnbsend(c, v) {
487 // ... foo
488 // } else {
489 // ... bar
490 // }
491 //
492 #pragma textflag 7
493 void
494 runtime·selectnbsend(ChanType *t, Hchan *c, ...)
495 {
496 byte *ae, *ap;
497
498 ae = (byte*)(&c + 1);
499 ap = ae + ROUND(t->elem->size, Structrnd);
500 runtime·chansend(t, c, ae, ap, runtime·getcallerpc(&t));
501 }
502
503 // func selectnbrecv(elem *any, c chan any) bool
504 //
505 // compiler implements
506 //
507 // select {
508 // case v = <-c:
509 // ... foo
510 // default:
511 // ... bar
512 // }
513 //
514 // as
515 //
516 // if selectnbrecv(&v, c) {
517 // ... foo
518 // } else {
519 // ... bar
520 // }
521 //
522 #pragma textflag 7
523 void
524 runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected)
525 {
526 runtime·chanrecv(t, c, v, &selected, nil);
527 }
528
529 // func selectnbrecv2(elem *any, ok *bool, c chan any) bool
530 //
531 // compiler implements
532 //
533 // select {
534 // case v, ok = <-c:
535 // ... foo
536 // default:
537 // ... bar
538 // }
539 //
540 // as
541 //
542 // if c != nil && selectnbrecv2(&v, &ok, c) {
543 // ... foo
544 // } else {
545 // ... bar
546 // }
547 //
548 #pragma textflag 7
549 void
550 runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected)
551 {
552 runtime·chanrecv(t, c, v, &selected, received);
553 }
554
555 // For reflect:
556 // func chansend(c chan, val iword, nb bool) (selected bool)
557 // where an iword is the same word an interface value would use:
558 // the actual data if it fits, or else a pointer to the data.
559 //
560 // The "uintptr selected" is really "bool selected" but saying
561 // uintptr gets us the right alignment for the output parameter block.
562 #pragma textflag 7
563 void
564 reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected)
565 {
566 bool *sp;
567 byte *vp;
568
569 if(nb) {
570 selected = false;
571 sp = (bool*)&selected;
572 } else {
573 *(bool*)&selected = true;
574 FLUSH(&selected);
575 sp = nil;
576 }
577 if(t->elem->size <= sizeof(val))
578 vp = (byte*)&val;
579 else
580 vp = (byte*)val;
581 runtime·chansend(t, c, vp, sp, runtime·getcallerpc(&t));
582 }
583
584 // For reflect:
585 // func chanrecv(c chan, nb bool) (val iword, selected, received bool)
586 // where an iword is the same word an interface value would use:
587 // the actual data if it fits, or else a pointer to the data.
588 void
589 reflect·chanrecv(ChanType *t, Hchan *c, bool nb, uintptr val, bool selected, bool received)
590 {
591 byte *vp;
592 bool *sp;
593
594 if(nb) {
595 selected = false;
596 sp = &selected;
597 } else {
598 selected = true;
599 FLUSH(&selected);
600 sp = nil;
601 }
602 received = false;
603 FLUSH(&received);
604 if(t->elem->size <= sizeof(val)) {
605 val = 0;
606 vp = (byte*)&val;
607 } else {
608 vp = runtime·mal(t->elem->size);
609 val = (uintptr)vp;
610 FLUSH(&val);
611 }
612 runtime·chanrecv(t, c, vp, sp, &received);
613 }
614
615 static void newselect(int32, Select**);
616
617 // newselect(size uint32) (sel *byte);
618 #pragma textflag 7
619 void
620 runtime·newselect(int32 size, ...)
621 {
622 int32 o;
623 Select **selp;
624
625 o = ROUND(sizeof(size), Structrnd);
626 selp = (Select**)((byte*)&size + o);
627 newselect(size, selp);
628 }
629
630 static void
631 newselect(int32 size, Select **selp)
632 {
633 int32 n;
634 Select *sel;
635
636 n = 0;
637 if(size > 1)
638 n = size-1;
639
640 // allocate all the memory we need in a single allocation
641 // start with Select with size cases
642 // then lockorder with size entries
643 // then pollorder with size entries
644 sel = runtime·mal(sizeof(*sel) +
645 n*sizeof(sel->scase[0]) +
646 size*sizeof(sel->lockorder[0]) +
647 size*sizeof(sel->pollorder[0]));
648
649 sel->tcase = size;
650 sel->ncase = 0;
651 sel->lockorder = (void*)(sel->scase + size);
652 sel->pollorder = (void*)(sel->lockorder + size);
653 *selp = sel;
654
655 if(debug)
656 runtime·printf("newselect s=%p size=%d\n", sel, size);
657 }
658
659 // cut in half to give stack a chance to split
660 static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so);
661
662 // selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
663 #pragma textflag 7
664 void
665 runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
666 {
667 selected = false;
668 FLUSH(&selected);
669
670 // nil cases do not compete
671 if(c == nil)
672 return;
673
674 selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel);
675 }
676
677 static void
678 selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so)
679 {
680 int32 i;
681 Scase *cas;
682
683 i = sel->ncase;
684 if(i >= sel->tcase)
685 runtime·throw("selectsend: too many cases");
686 sel->ncase = i+1;
687 cas = &sel->scase[i];
688
689 cas->pc = pc;
690 cas->chan = c;
691 cas->so = so;
692 cas->kind = CaseSend;
693 cas->sg.elem = elem;
694
695 if(debug)
696 runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n",
697 sel, cas->pc, cas->chan, cas->so);
698 }
699
700 // cut in half to give stack a chance to split
701 static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32 so);
702
703 // selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
704 #pragma textflag 7
705 void
706 runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
707 {
708 selected = false;
709 FLUSH(&selected);
710
711 // nil cases do not compete
712 if(c == nil)
713 return;
714
715 selectrecv(sel, c, runtime·getcallerpc(&sel), elem, nil, (byte*)&selected - (byte*)&sel);
716 }
717
718 // selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
719 #pragma textflag 7
720 void
721 runtime·selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, bool selected)
722 {
723 selected = false;
724 FLUSH(&selected);
725
726 // nil cases do not compete
727 if(c == nil)
728 return;
729
730 selectrecv(sel, c, runtime·getcallerpc(&sel), elem, received, (byte*)&selected - (byte*)&sel);
731 }
732
733 static void
734 selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so)
735 {
736 int32 i;
737 Scase *cas;
738
739 i = sel->ncase;
740 if(i >= sel->tcase)
741 runtime·throw("selectrecv: too many cases");
742 sel->ncase = i+1;
743 cas = &sel->scase[i];
744 cas->pc = pc;
745 cas->chan = c;
746
747 cas->so = so;
748 cas->kind = CaseRecv;
749 cas->sg.elem = elem;
750 cas->receivedp = received;
751
752 if(debug)
753 runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n",
754 sel, cas->pc, cas->chan, cas->so);
755 }
756
757 // cut in half to give stack a chance to split
758 static void selectdefault(Select*, void*, int32);
759
760 // selectdefault(sel *byte) (selected bool);
761 #pragma textflag 7
762 void
763 runtime·selectdefault(Select *sel, bool selected)
764 {
765 selected = false;
766 FLUSH(&selected);
767
768 selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel);
769 }
770
771 static void
772 selectdefault(Select *sel, void *callerpc, int32 so)
773 {
774 int32 i;
775 Scase *cas;
776
777 i = sel->ncase;
778 if(i >= sel->tcase)
779 runtime·throw("selectdefault: too many cases");
780 sel->ncase = i+1;
781 cas = &sel->scase[i];
782 cas->pc = callerpc;
783 cas->chan = nil;
784
785 cas->so = so;
786 cas->kind = CaseDefault;
787
788 if(debug)
789 runtime·printf("selectdefault s=%p pc=%p so=%d\n",
790 sel, cas->pc, cas->so);
791 }
792
793 static void
794 sellock(Select *sel)
795 {
796 uint32 i;
797 Hchan *c, *c0;
798
799 c = nil;
800 for(i=0; i<sel->ncase; i++) {
801 c0 = sel->lockorder[i];
802 if(c0 && c0 != c) {
803 c = sel->lockorder[i];
804 runtime·lock(c);
805 }
806 }
807 }
808
809 static void
810 selunlock(Select *sel)
811 {
812 int32 i, n, r;
813 Hchan *c;
814
815 // We must be very careful here to not touch sel after we have unlocked
816 // the last lock, because sel can be freed right after the last unlock.
817 // Consider the following situation.
818 // First M calls runtime·park() in runtime·selectgo() passing the sel.
819 // Once runtime·park() has unlocked the last lock, another M makes
820 // the G that calls select runnable again and schedules it for execution.
821 // When the G runs on another M, it locks all the locks and frees sel.
822 // Now if the first M touches sel, it will access freed memory.
823 n = (int32)sel->ncase;
824 r = 0;
825 // skip the default case
826 if(n>0 && sel->lockorder[0] == nil)
827 r = 1;
828 for(i = n-1; i >= r; i--) {
829 c = sel->lockorder[i];
830 if(i>0 && sel->lockorder[i-1] == c)
831 continue; // will unlock it on the next iteration
832 runtime·unlock(c);
833 }
834 }
835
836 void
837 runtime·block(void)
838 {
839 runtime·park(nil, nil, "select (no cases)"); // forever
840 }
841
842 static void* selectgo(Select**);
843
844 // selectgo(sel *byte);
845 //
846 // overwrites return pc on stack to signal which case of the select
847 // to run, so cannot appear at the top of a split stack.
848 #pragma textflag 7
849 void
850 runtime·selectgo(Select *sel)
851 {
852 runtime·setcallerpc(&sel, selectgo(&sel));
853 }
854
855 static void*
856 selectgo(Select **selp)
857 {
858 Select *sel;
859 uint32 o, i, j, k;
860 Scase *cas, *dfl;
861 Hchan *c;
862 SudoG *sg;
863 G *gp;
864 byte *as;
865 void *pc;
866
867 sel = *selp;
868 if(runtime·gcwaiting)
869 runtime·gosched();
870
871 if(debug)
872 runtime·printf("select: sel=%p\n", sel);
873
874 // The compiler rewrites selects that statically have
875 // only 0 or 1 cases plus default into simpler constructs.
876 // The only way we can end up with such small sel->ncase
877 // values here is for a larger select in which most channels
878 // have been nilled out. The general code handles those
879 // cases correctly, and they are rare enough not to bother
880 // optimizing (and needing to test).
881
882 // generate permuted order
883 for(i=0; i<sel->ncase; i++)
884 sel->pollorder[i] = i;
885 for(i=1; i<sel->ncase; i++) {
886 o = sel->pollorder[i];
887 j = runtime·fastrand1()%(i+1);
888 sel->pollorder[i] = sel->pollorder[j];
889 sel->pollorder[j] = o;
890 }
891
892 // sort the cases by Hchan address to get the locking order.
893 // simple heap sort, to guarantee n log n time and constant stack footprint.
894 for(i=0; i<sel->ncase; i++) {
895 j = i;
896 c = sel->scase[j].chan;
897 while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
898 sel->lockorder[j] = sel->lockorder[k];
899 j = k;
900 }
901 sel->lockorder[j] = c;
902 }
903 for(i=sel->ncase; i-->0; ) {
904 c = sel->lockorder[i];
905 sel->lockorder[i] = sel->lockorder[0];
906 j = 0;
907 for(;;) {
908 k = j*2+1;
909 if(k >= i)
910 break;
911 if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
912 k++;
913 if(c < sel->lockorder[k]) {
914 sel->lockorder[j] = sel->lockorder[k];
915 j = k;
916 continue;
917 }
918 break;
919 }
920 sel->lockorder[j] = c;
921 }
922 /*
923 for(i=0; i+1<sel->ncase; i++)
924 if(sel->lockorder[i] > sel->lockorder[i+1]) {
925 runtime·printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
926 runtime·throw("select: broken sort");
927 }
928 */
929 sellock(sel);
930
931 loop:
932 // pass 1 - look for something already waiting
933 dfl = nil;
934 for(i=0; i<sel->ncase; i++) {
935 o = sel->pollorder[i];
936 cas = &sel->scase[o];
937 c = cas->chan;
938
939 switch(cas->kind) {
940 case CaseRecv:
941 if(c->dataqsiz > 0) {
942 if(c->qcount > 0)
943 goto asyncrecv;
944 } else {
945 sg = dequeue(&c->sendq);
946 if(sg != nil)
947 goto syncrecv;
948 }
949 if(c->closed)
950 goto rclose;
951 break;
952
953 case CaseSend:
954 if(c->closed)
955 goto sclose;
956 if(c->dataqsiz > 0) {
957 if(c->qcount < c->dataqsiz)
958 goto asyncsend;
959 } else {
960 sg = dequeue(&c->recvq);
961 if(sg != nil)
962 goto syncsend;
963 }
964 break;
965
966 case CaseDefault:
967 dfl = cas;
968 break;
969 }
970 }
971
972 if(dfl != nil) {
973 selunlock(sel);
974 cas = dfl;
975 goto retc;
976 }
977
978
979 // pass 2 - enqueue on all chans
980 for(i=0; i<sel->ncase; i++) {
981 o = sel->pollorder[i];
982 cas = &sel->scase[o];
983 c = cas->chan;
984 sg = &cas->sg;
985 sg->g = g;
986 sg->selgen = g->selgen;
987
988 switch(cas->kind) {
989 case CaseRecv:
990 enqueue(&c->recvq, sg);
991 break;
992
993 case CaseSend:
994 enqueue(&c->sendq, sg);
995 break;
996 }
997 }
998
999 g->param = nil;
1000 runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
1001
1002 sellock(sel);
1003 sg = g->param;
1004
1005 // pass 3 - dequeue from unsuccessful chans
1006 // otherwise they stack up on quiet channels
1007 for(i=0; i<sel->ncase; i++) {
1008 cas = &sel->scase[i];
1009 if(cas != (Scase*)sg) {
1010 c = cas->chan;
1011 if(cas->kind == CaseSend)
1012 dequeueg(&c->sendq);
1013 else
1014 dequeueg(&c->recvq);
1015 }
1016 }
1017
1018 if(sg == nil)
1019 goto loop;
1020
1021 cas = (Scase*)sg;
1022 c = cas->chan;
1023
1024 if(c->dataqsiz > 0)
1025 runtime·throw("selectgo: shouldn't happen");
1026
1027 if(debug)
1028 runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
1029 sel, c, cas, cas->kind);
1030
1031 if(cas->kind == CaseRecv) {
1032 if(cas->receivedp != nil)
1033 *cas->receivedp = true;
1034 }
1035
1036 selunlock(sel);
1037 goto retc;
1038
1039 asyncrecv:
1040 // can receive from buffer
1041 if(raceenabled)
1042 runtime·raceacquire(chanbuf(c, c->recvx));
1043 if(cas->receivedp != nil)
1044 *cas->receivedp = true;
1045 if(cas->sg.elem != nil)
1046 c->elemalg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->recvx));
1047 c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
1048 if(++c->recvx == c->dataqsiz)
1049 c->recvx = 0;
1050 c->qcount--;
1051 sg = dequeue(&c->sendq);
1052 if(sg != nil) {
1053 gp = sg->g;
1054 selunlock(sel);
1055 runtime·ready(gp);
1056 } else {
1057 selunlock(sel);
1058 }
1059 goto retc;
1060
1061 asyncsend:
1062 // can send to buffer
1063 if(raceenabled)
1064 runtime·racerelease(chanbuf(c, c->sendx));
1065 c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
1066 if(++c->sendx == c->dataqsiz)
1067 c->sendx = 0;
1068 c->qcount++;
1069 sg = dequeue(&c->recvq);
1070 if(sg != nil) {
1071 gp = sg->g;
1072 selunlock(sel);
1073 runtime·ready(gp);
1074 } else {
1075 selunlock(sel);
1076 }
1077 goto retc;
1078
1079 syncrecv:
1080 // can receive from sleeping sender (sg)
1081 if(raceenabled)
1082 racesync(c, sg);
1083 selunlock(sel);
1084 if(debug)
1085 runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
1086 if(cas->receivedp != nil)
1087 *cas->receivedp = true;
1088 if(cas->sg.elem != nil)
1089 c->elemalg->copy(c->elemsize, cas->sg.elem, sg->elem);
1090 gp = sg->g;
1091 gp->param = sg;
1092 runtime·ready(gp);
1093 goto retc;
1094
1095 rclose:
1096 // read at end of closed channel
1097 selunlock(sel);
1098 if(cas->receivedp != nil)
1099 *cas->receivedp = false;
1100 if(cas->sg.elem != nil)
1101 c->elemalg->copy(c->elemsize, cas->sg.elem, nil);
1102 if(raceenabled)
1103 runtime·raceacquire(c);
1104 goto retc;
1105
1106 syncsend:
1107 // can send to sleeping receiver (sg)
1108 if(raceenabled)
1109 racesync(c, sg);
1110 selunlock(sel);
1111 if(debug)
1112 runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
1113 if(sg->elem != nil)
1114 c->elemalg->copy(c->elemsize, sg->elem, cas->sg.elem);
1115 gp = sg->g;
1116 gp->param = sg;
1117 runtime·ready(gp);
1118
1119 retc:
1120 // return pc corresponding to chosen case.
1121 // Set boolean passed during select creation
1122 // (at offset selp + cas->so) to true.
1123 // If cas->so == 0, this is a reflect-driven select and we
1124 // don't need to update the boolean.
1125 pc = cas->pc;
1126 if(cas->so > 0) {
1127 as = (byte*)selp + cas->so;
1128 *as = true;
1129 }
1130 runtime·free(sel);
1131 return pc;
1132
1133 sclose:
1134 // send on closed channel
1135 selunlock(sel);
1136 runtime·panicstring("send on closed channel");
1137 return nil; // not reached
1138 }
1139
1140 // This struct must match ../reflect/value.go:/runtimeSelect.
1141 typedef struct runtimeSelect runtimeSelect;
1142 struct runtimeSelect
1143 {
1144 uintptr dir;
1145 ChanType *typ;
1146 Hchan *ch;
1147 uintptr val;
1148 };
1149
1150 // This enum must match ../reflect/value.go:/SelectDir.
1151 enum SelectDir {
1152 SelectSend = 1,
1153 SelectRecv,
1154 SelectDefault,
1155 };
1156
1157 // func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool)
1158 void
1159 reflect·rselect(Slice cases, intgo chosen, uintptr word, bool recvOK)
1160 {
1161 int32 i;
1162 Select *sel;
1163 runtimeSelect* rcase, *rc;
1164 void *elem;
1165 void *recvptr;
1166 uintptr maxsize;
1167
1168 chosen = -1;
1169 word = 0;
1170 recvOK = false;
1171
1172 maxsize = 0;
1173 rcase = (runtimeSelect*)cases.array;
1174 for(i=0; i<cases.len; i++) {
1175 rc = &rcase[i];
1176 if(rc->dir == SelectRecv && rc->ch != nil && maxsize < rc->typ->elem->size)
1177 maxsize = rc->typ->elem->size;
1178 }
1179
1180 recvptr = nil;
1181 if(maxsize > sizeof(void*))
1182 recvptr = runtime·mal(maxsize);
1183
1184 newselect(cases.len, &sel);
1185 for(i=0; i<cases.len; i++) {
1186 rc = &rcase[i];
1187 switch(rc->dir) {
1188 case SelectDefault:
1189 selectdefault(sel, (void*)i, 0);
1190 break;
1191 case SelectSend:
1192 if(rc->ch == nil)
1193 break;
1194 if(rc->typ->elem->size > sizeof(void*))
1195 elem = (void*)rc->val;
1196 else
1197 elem = (void*)&rc->val;
1198 selectsend(sel, rc->ch, (void*)i, elem, 0);
1199 break;
1200 case SelectRecv:
1201 if(rc->ch == nil)
1202 break;
1203 if(rc->typ->elem->size > sizeof(void*))
1204 elem = recvptr;
1205 else
1206 elem = &word;
1207 selectrecv(sel, rc->ch, (void*)i, elem, &recvOK, 0);
1208 break;
1209 }
1210 }
1211
1212 chosen = (intgo)(uintptr)selectgo(&sel);
1213 if(rcase[chosen].dir == SelectRecv && rcase[chosen].typ->elem->size > sizeof(void*))
1214 word = (uintptr)recvptr;
1215
1216 FLUSH(&chosen);
1217 FLUSH(&word);
1218 FLUSH(&recvOK);
1219 }
1220
1221 // closechan(sel *byte);
1222 #pragma textflag 7
1223 void
1224 runtime·closechan(Hchan *c)
1225 {
1226 SudoG *sg;
1227 G* gp;
1228
1229 if(c == nil)
1230 runtime·panicstring("close of nil channel");
1231
1232 if(runtime·gcwaiting)
1233 runtime·gosched();
1234
1235 runtime·lock(c);
1236 if(c->closed) {
1237 runtime·unlock(c);
1238 runtime·panicstring("close of closed channel");
1239 }
1240
1241 if(raceenabled) {
1242 runtime·racewritepc(c, runtime·getcallerpc(&c), runtime·closechan);
1243 runtime·racerelease(c);
1244 }
1245
1246 c->closed = true;
1247
1248 // release all readers
1249 for(;;) {
1250 sg = dequeue(&c->recvq);
1251 if(sg == nil)
1252 break;
1253 gp = sg->g;
1254 gp->param = nil;
1255 runtime·ready(gp);
1256 }
1257
1258 // release all writers
1259 for(;;) {
1260 sg = dequeue(&c->sendq);
1261 if(sg == nil)
1262 break;
1263 gp = sg->g;
1264 gp->param = nil;
1265 runtime·ready(gp);
1266 }
1267
1268 runtime·unlock(c);
1269 }
1270
1271 // For reflect
1272 // func chanclose(c chan)
1273 void
1274 reflect·chanclose(Hchan *c)
1275 {
1276 runtime·closechan(c);
1277 }
1278
1279 // For reflect
1280 // func chanlen(c chan) (len int)
1281 void
1282 reflect·chanlen(Hchan *c, intgo len)
1283 {
1284 if(c == nil)
1285 len = 0;
1286 else
1287 len = c->qcount;
1288 FLUSH(&len);
1289 }
1290
1291 // For reflect
1292 // func chancap(c chan) int
1293 void
1294 reflect·chancap(Hchan *c, intgo cap)
1295 {
1296 if(c == nil)
1297 cap = 0;
1298 else
1299 cap = c->dataqsiz;
1300 FLUSH(&cap);
1301 }
1302
1303 static SudoG*
1304 dequeue(WaitQ *q)
1305 {
1306 SudoG *sgp;
1307
1308 loop:
1309 sgp = q->first;
1310 if(sgp == nil)
1311 return nil;
1312 q->first = sgp->link;
1313
1314 // if sgp is stale, ignore it
1315 if(sgp->selgen != NOSELGEN &&
1316 (sgp->selgen != sgp->g->selgen ||
1317 !runtime·cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
1318 //prints("INVALID PSEUDOG POINTER\n");
1319 goto loop;
1320 }
1321
1322 return sgp;
1323 }
1324
1325 static void
1326 dequeueg(WaitQ *q)
1327 {
1328 SudoG **l, *sgp, *prevsgp;
1329
1330 prevsgp = nil;
1331 for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
1332 if(sgp->g == g) {
1333 *l = sgp->link;
1334 if(q->last == sgp)
1335 q->last = prevsgp;
1336 break;
1337 }
1338 }
1339 }
1340
1341 static void
1342 enqueue(WaitQ *q, SudoG *sgp)
1343 {
1344 sgp->link = nil;
1345 if(q->first == nil) {
1346 q->first = sgp;
1347 q->last = sgp;
1348 return;
1349 }
1350 q->last->link = sgp;
1351 q->last = sgp;
1352 }
1353
1354 static void
1355 racesync(Hchan *c, SudoG *sg)
1356 {
1357 runtime·racerelease(chanbuf(c, 0));
1358 runtime·raceacquireg(sg->g, chanbuf(c, 0));
1359 runtime·racereleaseg(sg->g, chanbuf(c, 0));
1360 runtime·raceacquire(chanbuf(c, 0));
1361 }
View as plain text