Run Format

Text file src/pkg/runtime/chan.c

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	#include "runtime.h"
     6	#include "arch_GOARCH.h"
     7	#include "type.h"
     8	#include "race.h"
     9	#include "malloc.h"
    10	
    11	#define	MAXALIGN	7
    12	#define	NOSELGEN	1
    13	
    14	static	int32	debug	= 0;
    15	
    16	typedef	struct	WaitQ	WaitQ;
    17	typedef	struct	SudoG	SudoG;
    18	typedef	struct	Select	Select;
    19	typedef	struct	Scase	Scase;
    20	
    21	struct	SudoG
    22	{
    23		G*	g;		// g and selgen constitute
    24		uint32	selgen;		// a weak pointer to g
    25		SudoG*	link;
    26		int64	releasetime;
    27		byte*	elem;		// data element
    28	};
    29	
    30	struct	WaitQ
    31	{
    32		SudoG*	first;
    33		SudoG*	last;
    34	};
    35	
    36	// The garbage collector is assuming that Hchan can only contain pointers into the stack
    37	// and cannot contain pointers into the heap.
    38	struct	Hchan
    39	{
    40		uintgo	qcount;			// total data in the q
    41		uintgo	dataqsiz;		// size of the circular q
    42		uint16	elemsize;
    43		bool	closed;
    44		uint8	elemalign;
    45		Alg*	elemalg;		// interface for element type
    46		uintgo	sendx;			// send index
    47		uintgo	recvx;			// receive index
    48		WaitQ	recvq;			// list of recv waiters
    49		WaitQ	sendq;			// list of send waiters
    50		Lock;
    51	};
    52	
    53	uint32 runtime·Hchansize = sizeof(Hchan);
    54	
    55	// Buffer follows Hchan immediately in memory.
    56	// chanbuf(c, i) is pointer to the i'th slot in the buffer.
    57	#define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
    58	
    59	enum
    60	{
    61		// Scase.kind
    62		CaseRecv,
    63		CaseSend,
    64		CaseDefault,
    65	};
    66	
    67	struct	Scase
    68	{
    69		SudoG	sg;			// must be first member (cast to Scase)
    70		Hchan*	chan;			// chan
    71		byte*	pc;			// return pc
    72		uint16	kind;
    73		uint16	so;			// vararg of selected bool
    74		bool*	receivedp;		// pointer to received bool (recv2)
    75	};
    76	
    77	struct	Select
    78	{
    79		uint16	tcase;			// total count of scase[]
    80		uint16	ncase;			// currently filled scase[]
    81		uint16*	pollorder;		// case poll order
    82		Hchan**	lockorder;		// channel lock order
    83		Scase	scase[1];		// one per case (in order of appearance)
    84	};
    85	
    86	static	void	dequeueg(WaitQ*);
    87	static	SudoG*	dequeue(WaitQ*);
    88	static	void	enqueue(WaitQ*, SudoG*);
    89	static	void	destroychan(Hchan*);
    90	static	void	racesync(Hchan*, SudoG*);
    91	
    92	Hchan*
    93	runtime·makechan_c(ChanType *t, int64 hint)
    94	{
    95		Hchan *c;
    96		uintptr n;
    97		Type *elem;
    98	
    99		elem = t->elem;
   100	
   101		// compiler checks this but be safe.
   102		if(elem->size >= (1<<16))
   103			runtime·throw("makechan: invalid channel element type");
   104	
   105		if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > MaxMem / elem->size))
   106			runtime·panicstring("makechan: size out of range");
   107	
   108		// calculate rounded size of Hchan
   109		n = sizeof(*c);
   110		while(n & MAXALIGN)
   111			n++;
   112	
   113		// allocate memory in one call
   114		c = (Hchan*)runtime·mal(n + hint*elem->size);
   115		c->elemsize = elem->size;
   116		c->elemalg = elem->alg;
   117		c->elemalign = elem->align;
   118		c->dataqsiz = hint;
   119		runtime·settype(c, (uintptr)t | TypeInfo_Chan);
   120	
   121		if(debug)
   122			runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; elemalign=%d; dataqsiz=%D\n",
   123				c, (int64)elem->size, elem->alg, elem->align, (int64)c->dataqsiz);
   124	
   125		return c;
   126	}
   127	
   128	// For reflect
   129	//	func makechan(typ *ChanType, size uint64) (chan)
   130	void
   131	reflect·makechan(ChanType *t, uint64 size, Hchan *c)
   132	{
   133		c = runtime·makechan_c(t, size);
   134		FLUSH(&c);
   135	}
   136	
   137	// makechan(t *ChanType, hint int64) (hchan *chan any);
   138	void
   139	runtime·makechan(ChanType *t, int64 hint, Hchan *ret)
   140	{
   141		ret = runtime·makechan_c(t, hint);
   142		FLUSH(&ret);
   143	}
   144	
   145	/*
   146	 * generic single channel send/recv
   147	 * if the bool pointer is nil,
   148	 * then the full exchange will
   149	 * occur. if pres is not nil,
   150	 * then the protocol will not
   151	 * sleep but return if it could
   152	 * not complete.
   153	 *
   154	 * sleep can wake up with g->param == nil
   155	 * when a channel involved in the sleep has
   156	 * been closed.  it is easiest to loop and re-run
   157	 * the operation; we'll see that it's now closed.
   158	 */
   159	void
   160	runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
   161	{
   162		SudoG *sg;
   163		SudoG mysg;
   164		G* gp;
   165		int64 t0;
   166	
   167		if(c == nil) {
   168			USED(t);
   169			if(pres != nil) {
   170				*pres = false;
   171				return;
   172			}
   173			runtime·park(nil, nil, "chan send (nil chan)");
   174			return;  // not reached
   175		}
   176	
   177		if(runtime·gcwaiting)
   178			runtime·gosched();
   179	
   180		if(debug) {
   181			runtime·printf("chansend: chan=%p; elem=", c);
   182			c->elemalg->print(c->elemsize, ep);
   183			runtime·prints("\n");
   184		}
   185	
   186		t0 = 0;
   187		mysg.releasetime = 0;
   188		if(runtime·blockprofilerate > 0) {
   189			t0 = runtime·cputicks();
   190			mysg.releasetime = -1;
   191		}
   192	
   193		runtime·lock(c);
   194		// TODO(dvyukov): add similar instrumentation to select.
   195		if(raceenabled)
   196			runtime·racereadpc(c, pc, runtime·chansend);
   197		if(c->closed)
   198			goto closed;
   199	
   200		if(c->dataqsiz > 0)
   201			goto asynch;
   202	
   203		sg = dequeue(&c->recvq);
   204		if(sg != nil) {
   205			if(raceenabled)
   206				racesync(c, sg);
   207			runtime·unlock(c);
   208	
   209			gp = sg->g;
   210			gp->param = sg;
   211			if(sg->elem != nil)
   212				c->elemalg->copy(c->elemsize, sg->elem, ep);
   213			if(sg->releasetime)
   214				sg->releasetime = runtime·cputicks();
   215			runtime·ready(gp);
   216	
   217			if(pres != nil)
   218				*pres = true;
   219			return;
   220		}
   221	
   222		if(pres != nil) {
   223			runtime·unlock(c);
   224			*pres = false;
   225			return;
   226		}
   227	
   228		mysg.elem = ep;
   229		mysg.g = g;
   230		mysg.selgen = NOSELGEN;
   231		g->param = nil;
   232		enqueue(&c->sendq, &mysg);
   233		runtime·park(runtime·unlock, c, "chan send");
   234	
   235		if(g->param == nil) {
   236			runtime·lock(c);
   237			if(!c->closed)
   238				runtime·throw("chansend: spurious wakeup");
   239			goto closed;
   240		}
   241	
   242		if(mysg.releasetime > 0)
   243			runtime·blockevent(mysg.releasetime - t0, 2);
   244	
   245		return;
   246	
   247	asynch:
   248		if(c->closed)
   249			goto closed;
   250	
   251		if(c->qcount >= c->dataqsiz) {
   252			if(pres != nil) {
   253				runtime·unlock(c);
   254				*pres = false;
   255				return;
   256			}
   257			mysg.g = g;
   258			mysg.elem = nil;
   259			mysg.selgen = NOSELGEN;
   260			enqueue(&c->sendq, &mysg);
   261			runtime·park(runtime·unlock, c, "chan send");
   262	
   263			runtime·lock(c);
   264			goto asynch;
   265		}
   266	
   267		if(raceenabled)
   268			runtime·racerelease(chanbuf(c, c->sendx));
   269	
   270		c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
   271		if(++c->sendx == c->dataqsiz)
   272			c->sendx = 0;
   273		c->qcount++;
   274	
   275		sg = dequeue(&c->recvq);
   276		if(sg != nil) {
   277			gp = sg->g;
   278			runtime·unlock(c);
   279			if(sg->releasetime)
   280				sg->releasetime = runtime·cputicks();
   281			runtime·ready(gp);
   282		} else
   283			runtime·unlock(c);
   284		if(pres != nil)
   285			*pres = true;
   286		if(mysg.releasetime > 0)
   287			runtime·blockevent(mysg.releasetime - t0, 2);
   288		return;
   289	
   290	closed:
   291		runtime·unlock(c);
   292		runtime·panicstring("send on closed channel");
   293	}
   294	
   295	
   296	void
   297	runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
   298	{
   299		SudoG *sg;
   300		SudoG mysg;
   301		G *gp;
   302		int64 t0;
   303	
   304		if(runtime·gcwaiting)
   305			runtime·gosched();
   306	
   307		if(debug)
   308			runtime·printf("chanrecv: chan=%p\n", c);
   309	
   310		if(c == nil) {
   311			USED(t);
   312			if(selected != nil) {
   313				*selected = false;
   314				return;
   315			}
   316			runtime·park(nil, nil, "chan receive (nil chan)");
   317			return;  // not reached
   318		}
   319	
   320		t0 = 0;
   321		mysg.releasetime = 0;
   322		if(runtime·blockprofilerate > 0) {
   323			t0 = runtime·cputicks();
   324			mysg.releasetime = -1;
   325		}
   326	
   327		runtime·lock(c);
   328		if(c->dataqsiz > 0)
   329			goto asynch;
   330	
   331		if(c->closed)
   332			goto closed;
   333	
   334		sg = dequeue(&c->sendq);
   335		if(sg != nil) {
   336			if(raceenabled)
   337				racesync(c, sg);
   338			runtime·unlock(c);
   339	
   340			if(ep != nil)
   341				c->elemalg->copy(c->elemsize, ep, sg->elem);
   342			gp = sg->g;
   343			gp->param = sg;
   344			if(sg->releasetime)
   345				sg->releasetime = runtime·cputicks();
   346			runtime·ready(gp);
   347	
   348			if(selected != nil)
   349				*selected = true;
   350			if(received != nil)
   351				*received = true;
   352			return;
   353		}
   354	
   355		if(selected != nil) {
   356			runtime·unlock(c);
   357			*selected = false;
   358			return;
   359		}
   360	
   361		mysg.elem = ep;
   362		mysg.g = g;
   363		mysg.selgen = NOSELGEN;
   364		g->param = nil;
   365		enqueue(&c->recvq, &mysg);
   366		runtime·park(runtime·unlock, c, "chan receive");
   367	
   368		if(g->param == nil) {
   369			runtime·lock(c);
   370			if(!c->closed)
   371				runtime·throw("chanrecv: spurious wakeup");
   372			goto closed;
   373		}
   374	
   375		if(received != nil)
   376			*received = true;
   377		if(mysg.releasetime > 0)
   378			runtime·blockevent(mysg.releasetime - t0, 2);
   379		return;
   380	
   381	asynch:
   382		if(c->qcount <= 0) {
   383			if(c->closed)
   384				goto closed;
   385	
   386			if(selected != nil) {
   387				runtime·unlock(c);
   388				*selected = false;
   389				if(received != nil)
   390					*received = false;
   391				return;
   392			}
   393			mysg.g = g;
   394			mysg.elem = nil;
   395			mysg.selgen = NOSELGEN;
   396			enqueue(&c->recvq, &mysg);
   397			runtime·park(runtime·unlock, c, "chan receive");
   398	
   399			runtime·lock(c);
   400			goto asynch;
   401		}
   402	
   403		if(raceenabled)
   404			runtime·raceacquire(chanbuf(c, c->recvx));
   405	
   406		if(ep != nil)
   407			c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
   408		c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
   409		if(++c->recvx == c->dataqsiz)
   410			c->recvx = 0;
   411		c->qcount--;
   412	
   413		sg = dequeue(&c->sendq);
   414		if(sg != nil) {
   415			gp = sg->g;
   416			runtime·unlock(c);
   417			if(sg->releasetime)
   418				sg->releasetime = runtime·cputicks();
   419			runtime·ready(gp);
   420		} else
   421			runtime·unlock(c);
   422	
   423		if(selected != nil)
   424			*selected = true;
   425		if(received != nil)
   426			*received = true;
   427		if(mysg.releasetime > 0)
   428			runtime·blockevent(mysg.releasetime - t0, 2);
   429		return;
   430	
   431	closed:
   432		if(ep != nil)
   433			c->elemalg->copy(c->elemsize, ep, nil);
   434		if(selected != nil)
   435			*selected = true;
   436		if(received != nil)
   437			*received = false;
   438		if(raceenabled)
   439			runtime·raceacquire(c);
   440		runtime·unlock(c);
   441		if(mysg.releasetime > 0)
   442			runtime·blockevent(mysg.releasetime - t0, 2);
   443	}
   444	
   445	// chansend1(hchan *chan any, elem any);
   446	#pragma textflag 7
   447	void
   448	runtime·chansend1(ChanType *t, Hchan* c, ...)
   449	{
   450		runtime·chansend(t, c, (byte*)(&c+1), nil, runtime·getcallerpc(&t));
   451	}
   452	
   453	// chanrecv1(hchan *chan any) (elem any);
   454	#pragma textflag 7
   455	void
   456	runtime·chanrecv1(ChanType *t, Hchan* c, ...)
   457	{
   458		runtime·chanrecv(t, c, (byte*)(&c+1), nil, nil);
   459	}
   460	
   461	// chanrecv2(hchan *chan any) (elem any, received bool);
   462	#pragma textflag 7
   463	void
   464	runtime·chanrecv2(ChanType *t, Hchan* c, ...)
   465	{
   466		byte *ae, *ap;
   467	
   468		ae = (byte*)(&c+1);
   469		ap = ae + t->elem->size;
   470		runtime·chanrecv(t, c, ae, nil, ap);
   471	}
   472	
   473	// func selectnbsend(c chan any, elem any) bool
   474	//
   475	// compiler implements
   476	//
   477	//	select {
   478	//	case c <- v:
   479	//		... foo
   480	//	default:
   481	//		... bar
   482	//	}
   483	//
   484	// as
   485	//
   486	//	if selectnbsend(c, v) {
   487	//		... foo
   488	//	} else {
   489	//		... bar
   490	//	}
   491	//
   492	#pragma textflag 7
   493	void
   494	runtime·selectnbsend(ChanType *t, Hchan *c, ...)
   495	{
   496		byte *ae, *ap;
   497	
   498		ae = (byte*)(&c + 1);
   499		ap = ae + ROUND(t->elem->size, Structrnd);
   500		runtime·chansend(t, c, ae, ap, runtime·getcallerpc(&t));
   501	}
   502	
   503	// func selectnbrecv(elem *any, c chan any) bool
   504	//
   505	// compiler implements
   506	//
   507	//	select {
   508	//	case v = <-c:
   509	//		... foo
   510	//	default:
   511	//		... bar
   512	//	}
   513	//
   514	// as
   515	//
   516	//	if selectnbrecv(&v, c) {
   517	//		... foo
   518	//	} else {
   519	//		... bar
   520	//	}
   521	//
   522	#pragma textflag 7
   523	void
   524	runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected)
   525	{
   526		runtime·chanrecv(t, c, v, &selected, nil);
   527	}
   528	
   529	// func selectnbrecv2(elem *any, ok *bool, c chan any) bool
   530	//
   531	// compiler implements
   532	//
   533	//	select {
   534	//	case v, ok = <-c:
   535	//		... foo
   536	//	default:
   537	//		... bar
   538	//	}
   539	//
   540	// as
   541	//
   542	//	if c != nil && selectnbrecv2(&v, &ok, c) {
   543	//		... foo
   544	//	} else {
   545	//		... bar
   546	//	}
   547	//
   548	#pragma textflag 7
   549	void
   550	runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected)
   551	{
   552		runtime·chanrecv(t, c, v, &selected, received);
   553	}
   554	
   555	// For reflect:
   556	//	func chansend(c chan, val iword, nb bool) (selected bool)
   557	// where an iword is the same word an interface value would use:
   558	// the actual data if it fits, or else a pointer to the data.
   559	//
   560	// The "uintptr selected" is really "bool selected" but saying
   561	// uintptr gets us the right alignment for the output parameter block.
   562	#pragma textflag 7
   563	void
   564	reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected)
   565	{
   566		bool *sp;
   567		byte *vp;
   568	
   569		if(nb) {
   570			selected = false;
   571			sp = (bool*)&selected;
   572		} else {
   573			*(bool*)&selected = true;
   574			FLUSH(&selected);
   575			sp = nil;
   576		}
   577		if(t->elem->size <= sizeof(val))
   578			vp = (byte*)&val;
   579		else
   580			vp = (byte*)val;
   581		runtime·chansend(t, c, vp, sp, runtime·getcallerpc(&t));
   582	}
   583	
   584	// For reflect:
   585	//	func chanrecv(c chan, nb bool) (val iword, selected, received bool)
   586	// where an iword is the same word an interface value would use:
   587	// the actual data if it fits, or else a pointer to the data.
   588	void
   589	reflect·chanrecv(ChanType *t, Hchan *c, bool nb, uintptr val, bool selected, bool received)
   590	{
   591		byte *vp;
   592		bool *sp;
   593	
   594		if(nb) {
   595			selected = false;
   596			sp = &selected;
   597		} else {
   598			selected = true;
   599			FLUSH(&selected);
   600			sp = nil;
   601		}
   602		received = false;
   603		FLUSH(&received);
   604		if(t->elem->size <= sizeof(val)) {
   605			val = 0;
   606			vp = (byte*)&val;
   607		} else {
   608			vp = runtime·mal(t->elem->size);
   609			val = (uintptr)vp;
   610			FLUSH(&val);
   611		}
   612		runtime·chanrecv(t, c, vp, sp, &received);
   613	}
   614	
   615	static void newselect(int32, Select**);
   616	
   617	// newselect(size uint32) (sel *byte);
   618	#pragma textflag 7
   619	void
   620	runtime·newselect(int32 size, ...)
   621	{
   622		int32 o;
   623		Select **selp;
   624	
   625		o = ROUND(sizeof(size), Structrnd);
   626		selp = (Select**)((byte*)&size + o);
   627		newselect(size, selp);
   628	}
   629	
   630	static void
   631	newselect(int32 size, Select **selp)
   632	{
   633		int32 n;
   634		Select *sel;
   635	
   636		n = 0;
   637		if(size > 1)
   638			n = size-1;
   639	
   640		// allocate all the memory we need in a single allocation
   641		// start with Select with size cases
   642		// then lockorder with size entries
   643		// then pollorder with size entries
   644		sel = runtime·mal(sizeof(*sel) +
   645			n*sizeof(sel->scase[0]) +
   646			size*sizeof(sel->lockorder[0]) +
   647			size*sizeof(sel->pollorder[0]));
   648	
   649		sel->tcase = size;
   650		sel->ncase = 0;
   651		sel->lockorder = (void*)(sel->scase + size);
   652		sel->pollorder = (void*)(sel->lockorder + size);
   653		*selp = sel;
   654	
   655		if(debug)
   656			runtime·printf("newselect s=%p size=%d\n", sel, size);
   657	}
   658	
   659	// cut in half to give stack a chance to split
   660	static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so);
   661	
   662	// selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
   663	#pragma textflag 7
   664	void
   665	runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
   666	{
   667		selected = false;
   668		FLUSH(&selected);
   669	
   670		// nil cases do not compete
   671		if(c == nil)
   672			return;
   673	
   674		selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel);
   675	}
   676	
   677	static void
   678	selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so)
   679	{
   680		int32 i;
   681		Scase *cas;
   682	
   683		i = sel->ncase;
   684		if(i >= sel->tcase)
   685			runtime·throw("selectsend: too many cases");
   686		sel->ncase = i+1;
   687		cas = &sel->scase[i];
   688	
   689		cas->pc = pc;
   690		cas->chan = c;
   691		cas->so = so;
   692		cas->kind = CaseSend;
   693		cas->sg.elem = elem;
   694	
   695		if(debug)
   696			runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n",
   697				sel, cas->pc, cas->chan, cas->so);
   698	}
   699	
   700	// cut in half to give stack a chance to split
   701	static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32 so);
   702	
   703	// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
   704	#pragma textflag 7
   705	void
   706	runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
   707	{
   708		selected = false;
   709		FLUSH(&selected);
   710	
   711		// nil cases do not compete
   712		if(c == nil)
   713			return;
   714	
   715		selectrecv(sel, c, runtime·getcallerpc(&sel), elem, nil, (byte*)&selected - (byte*)&sel);
   716	}
   717	
   718	// selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
   719	#pragma textflag 7
   720	void
   721	runtime·selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, bool selected)
   722	{
   723		selected = false;
   724		FLUSH(&selected);
   725	
   726		// nil cases do not compete
   727		if(c == nil)
   728			return;
   729	
   730		selectrecv(sel, c, runtime·getcallerpc(&sel), elem, received, (byte*)&selected - (byte*)&sel);
   731	}
   732	
   733	static void
   734	selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so)
   735	{
   736		int32 i;
   737		Scase *cas;
   738	
   739		i = sel->ncase;
   740		if(i >= sel->tcase)
   741			runtime·throw("selectrecv: too many cases");
   742		sel->ncase = i+1;
   743		cas = &sel->scase[i];
   744		cas->pc = pc;
   745		cas->chan = c;
   746	
   747		cas->so = so;
   748		cas->kind = CaseRecv;
   749		cas->sg.elem = elem;
   750		cas->receivedp = received;
   751	
   752		if(debug)
   753			runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n",
   754				sel, cas->pc, cas->chan, cas->so);
   755	}
   756	
   757	// cut in half to give stack a chance to split
   758	static void selectdefault(Select*, void*, int32);
   759	
   760	// selectdefault(sel *byte) (selected bool);
   761	#pragma textflag 7
   762	void
   763	runtime·selectdefault(Select *sel, bool selected)
   764	{
   765		selected = false;
   766		FLUSH(&selected);
   767	
   768		selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel);
   769	}
   770	
   771	static void
   772	selectdefault(Select *sel, void *callerpc, int32 so)
   773	{
   774		int32 i;
   775		Scase *cas;
   776	
   777		i = sel->ncase;
   778		if(i >= sel->tcase)
   779			runtime·throw("selectdefault: too many cases");
   780		sel->ncase = i+1;
   781		cas = &sel->scase[i];
   782		cas->pc = callerpc;
   783		cas->chan = nil;
   784	
   785		cas->so = so;
   786		cas->kind = CaseDefault;
   787	
   788		if(debug)
   789			runtime·printf("selectdefault s=%p pc=%p so=%d\n",
   790				sel, cas->pc, cas->so);
   791	}
   792	
   793	static void
   794	sellock(Select *sel)
   795	{
   796		uint32 i;
   797		Hchan *c, *c0;
   798	
   799		c = nil;
   800		for(i=0; i<sel->ncase; i++) {
   801			c0 = sel->lockorder[i];
   802			if(c0 && c0 != c) {
   803				c = sel->lockorder[i];
   804				runtime·lock(c);
   805			}
   806		}
   807	}
   808	
   809	static void
   810	selunlock(Select *sel)
   811	{
   812		int32 i, n, r;
   813		Hchan *c;
   814	
   815		// We must be very careful here to not touch sel after we have unlocked
   816		// the last lock, because sel can be freed right after the last unlock.
   817		// Consider the following situation.
   818		// First M calls runtime·park() in runtime·selectgo() passing the sel.
   819		// Once runtime·park() has unlocked the last lock, another M makes
   820		// the G that calls select runnable again and schedules it for execution.
   821		// When the G runs on another M, it locks all the locks and frees sel.
   822		// Now if the first M touches sel, it will access freed memory.
   823		n = (int32)sel->ncase;
   824		r = 0;
   825		// skip the default case
   826		if(n>0 && sel->lockorder[0] == nil)
   827			r = 1;
   828		for(i = n-1; i >= r; i--) {
   829			c = sel->lockorder[i];
   830			if(i>0 && sel->lockorder[i-1] == c)
   831				continue;  // will unlock it on the next iteration
   832			runtime·unlock(c);
   833		}
   834	}
   835	
   836	void
   837	runtime·block(void)
   838	{
   839		runtime·park(nil, nil, "select (no cases)");	// forever
   840	}
   841	
   842	static void* selectgo(Select**);
   843	
   844	// selectgo(sel *byte);
   845	//
   846	// overwrites return pc on stack to signal which case of the select
   847	// to run, so cannot appear at the top of a split stack.
   848	#pragma textflag 7
   849	void
   850	runtime·selectgo(Select *sel)
   851	{
   852		runtime·setcallerpc(&sel, selectgo(&sel));
   853	}
   854	
   855	static void*
   856	selectgo(Select **selp)
   857	{
   858		Select *sel;
   859		uint32 o, i, j, k;
   860		Scase *cas, *dfl;
   861		Hchan *c;
   862		SudoG *sg;
   863		G *gp;
   864		byte *as;
   865		void *pc;
   866	
   867		sel = *selp;
   868		if(runtime·gcwaiting)
   869			runtime·gosched();
   870	
   871		if(debug)
   872			runtime·printf("select: sel=%p\n", sel);
   873	
   874		// The compiler rewrites selects that statically have
   875		// only 0 or 1 cases plus default into simpler constructs.
   876		// The only way we can end up with such small sel->ncase
   877		// values here is for a larger select in which most channels
   878		// have been nilled out.  The general code handles those
   879		// cases correctly, and they are rare enough not to bother
   880		// optimizing (and needing to test).
   881	
   882		// generate permuted order
   883		for(i=0; i<sel->ncase; i++)
   884			sel->pollorder[i] = i;
   885		for(i=1; i<sel->ncase; i++) {
   886			o = sel->pollorder[i];
   887			j = runtime·fastrand1()%(i+1);
   888			sel->pollorder[i] = sel->pollorder[j];
   889			sel->pollorder[j] = o;
   890		}
   891	
   892		// sort the cases by Hchan address to get the locking order.
   893		// simple heap sort, to guarantee n log n time and constant stack footprint.
   894		for(i=0; i<sel->ncase; i++) {
   895			j = i;
   896			c = sel->scase[j].chan;
   897			while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
   898				sel->lockorder[j] = sel->lockorder[k];
   899				j = k;
   900			}
   901			sel->lockorder[j] = c;
   902		}
   903		for(i=sel->ncase; i-->0; ) {
   904			c = sel->lockorder[i];
   905			sel->lockorder[i] = sel->lockorder[0];
   906			j = 0;
   907			for(;;) {
   908				k = j*2+1;
   909				if(k >= i)
   910					break;
   911				if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
   912					k++;
   913				if(c < sel->lockorder[k]) {
   914					sel->lockorder[j] = sel->lockorder[k];
   915					j = k;
   916					continue;
   917				}
   918				break;
   919			}
   920			sel->lockorder[j] = c;
   921		}
   922		/*
   923		for(i=0; i+1<sel->ncase; i++)
   924			if(sel->lockorder[i] > sel->lockorder[i+1]) {
   925				runtime·printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
   926				runtime·throw("select: broken sort");
   927			}
   928		*/
   929		sellock(sel);
   930	
   931	loop:
   932		// pass 1 - look for something already waiting
   933		dfl = nil;
   934		for(i=0; i<sel->ncase; i++) {
   935			o = sel->pollorder[i];
   936			cas = &sel->scase[o];
   937			c = cas->chan;
   938	
   939			switch(cas->kind) {
   940			case CaseRecv:
   941				if(c->dataqsiz > 0) {
   942					if(c->qcount > 0)
   943						goto asyncrecv;
   944				} else {
   945					sg = dequeue(&c->sendq);
   946					if(sg != nil)
   947						goto syncrecv;
   948				}
   949				if(c->closed)
   950					goto rclose;
   951				break;
   952	
   953			case CaseSend:
   954				if(c->closed)
   955					goto sclose;
   956				if(c->dataqsiz > 0) {
   957					if(c->qcount < c->dataqsiz)
   958						goto asyncsend;
   959				} else {
   960					sg = dequeue(&c->recvq);
   961					if(sg != nil)
   962						goto syncsend;
   963				}
   964				break;
   965	
   966			case CaseDefault:
   967				dfl = cas;
   968				break;
   969			}
   970		}
   971	
   972		if(dfl != nil) {
   973			selunlock(sel);
   974			cas = dfl;
   975			goto retc;
   976		}
   977	
   978	
   979		// pass 2 - enqueue on all chans
   980		for(i=0; i<sel->ncase; i++) {
   981			o = sel->pollorder[i];
   982			cas = &sel->scase[o];
   983			c = cas->chan;
   984			sg = &cas->sg;
   985			sg->g = g;
   986			sg->selgen = g->selgen;
   987	
   988			switch(cas->kind) {
   989			case CaseRecv:
   990				enqueue(&c->recvq, sg);
   991				break;
   992	
   993			case CaseSend:
   994				enqueue(&c->sendq, sg);
   995				break;
   996			}
   997		}
   998	
   999		g->param = nil;
  1000		runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
  1001	
  1002		sellock(sel);
  1003		sg = g->param;
  1004	
  1005		// pass 3 - dequeue from unsuccessful chans
  1006		// otherwise they stack up on quiet channels
  1007		for(i=0; i<sel->ncase; i++) {
  1008			cas = &sel->scase[i];
  1009			if(cas != (Scase*)sg) {
  1010				c = cas->chan;
  1011				if(cas->kind == CaseSend)
  1012					dequeueg(&c->sendq);
  1013				else
  1014					dequeueg(&c->recvq);
  1015			}
  1016		}
  1017	
  1018		if(sg == nil)
  1019			goto loop;
  1020	
  1021		cas = (Scase*)sg;
  1022		c = cas->chan;
  1023	
  1024		if(c->dataqsiz > 0)
  1025			runtime·throw("selectgo: shouldn't happen");
  1026	
  1027		if(debug)
  1028			runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
  1029				sel, c, cas, cas->kind);
  1030	
  1031		if(cas->kind == CaseRecv) {
  1032			if(cas->receivedp != nil)
  1033				*cas->receivedp = true;
  1034		}
  1035	
  1036		selunlock(sel);
  1037		goto retc;
  1038	
  1039	asyncrecv:
  1040		// can receive from buffer
  1041		if(raceenabled)
  1042			runtime·raceacquire(chanbuf(c, c->recvx));
  1043		if(cas->receivedp != nil)
  1044			*cas->receivedp = true;
  1045		if(cas->sg.elem != nil)
  1046			c->elemalg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->recvx));
  1047		c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
  1048		if(++c->recvx == c->dataqsiz)
  1049			c->recvx = 0;
  1050		c->qcount--;
  1051		sg = dequeue(&c->sendq);
  1052		if(sg != nil) {
  1053			gp = sg->g;
  1054			selunlock(sel);
  1055			runtime·ready(gp);
  1056		} else {
  1057			selunlock(sel);
  1058		}
  1059		goto retc;
  1060	
  1061	asyncsend:
  1062		// can send to buffer
  1063		if(raceenabled)
  1064			runtime·racerelease(chanbuf(c, c->sendx));
  1065		c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
  1066		if(++c->sendx == c->dataqsiz)
  1067			c->sendx = 0;
  1068		c->qcount++;
  1069		sg = dequeue(&c->recvq);
  1070		if(sg != nil) {
  1071			gp = sg->g;
  1072			selunlock(sel);
  1073			runtime·ready(gp);
  1074		} else {
  1075			selunlock(sel);
  1076		}
  1077		goto retc;
  1078	
  1079	syncrecv:
  1080		// can receive from sleeping sender (sg)
  1081		if(raceenabled)
  1082			racesync(c, sg);
  1083		selunlock(sel);
  1084		if(debug)
  1085			runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
  1086		if(cas->receivedp != nil)
  1087			*cas->receivedp = true;
  1088		if(cas->sg.elem != nil)
  1089			c->elemalg->copy(c->elemsize, cas->sg.elem, sg->elem);
  1090		gp = sg->g;
  1091		gp->param = sg;
  1092		runtime·ready(gp);
  1093		goto retc;
  1094	
  1095	rclose:
  1096		// read at end of closed channel
  1097		selunlock(sel);
  1098		if(cas->receivedp != nil)
  1099			*cas->receivedp = false;
  1100		if(cas->sg.elem != nil)
  1101			c->elemalg->copy(c->elemsize, cas->sg.elem, nil);
  1102		if(raceenabled)
  1103			runtime·raceacquire(c);
  1104		goto retc;
  1105	
  1106	syncsend:
  1107		// can send to sleeping receiver (sg)
  1108		if(raceenabled)
  1109			racesync(c, sg);
  1110		selunlock(sel);
  1111		if(debug)
  1112			runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
  1113		if(sg->elem != nil)
  1114			c->elemalg->copy(c->elemsize, sg->elem, cas->sg.elem);
  1115		gp = sg->g;
  1116		gp->param = sg;
  1117		runtime·ready(gp);
  1118	
  1119	retc:
  1120		// return pc corresponding to chosen case.
  1121		// Set boolean passed during select creation
  1122		// (at offset selp + cas->so) to true.
  1123		// If cas->so == 0, this is a reflect-driven select and we
  1124		// don't need to update the boolean.
  1125		pc = cas->pc;
  1126		if(cas->so > 0) {
  1127			as = (byte*)selp + cas->so;
  1128			*as = true;
  1129		}
  1130		runtime·free(sel);
  1131		return pc;
  1132	
  1133	sclose:
  1134		// send on closed channel
  1135		selunlock(sel);
  1136		runtime·panicstring("send on closed channel");
  1137		return nil;  // not reached
  1138	}
  1139	
  1140	// This struct must match ../reflect/value.go:/runtimeSelect.
  1141	typedef struct runtimeSelect runtimeSelect;
  1142	struct runtimeSelect
  1143	{
  1144		uintptr dir;
  1145		ChanType *typ;
  1146		Hchan *ch;
  1147		uintptr val;
  1148	};
  1149	
  1150	// This enum must match ../reflect/value.go:/SelectDir.
  1151	enum SelectDir {
  1152		SelectSend = 1,
  1153		SelectRecv,
  1154		SelectDefault,
  1155	};
  1156	
  1157	// func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool)
  1158	void
  1159	reflect·rselect(Slice cases, intgo chosen, uintptr word, bool recvOK)
  1160	{
  1161		int32 i;
  1162		Select *sel;
  1163		runtimeSelect* rcase, *rc;
  1164		void *elem;
  1165		void *recvptr;
  1166		uintptr maxsize;
  1167	
  1168		chosen = -1;
  1169		word = 0;
  1170		recvOK = false;
  1171	
  1172		maxsize = 0;
  1173		rcase = (runtimeSelect*)cases.array;
  1174		for(i=0; i<cases.len; i++) {
  1175			rc = &rcase[i];
  1176			if(rc->dir == SelectRecv && rc->ch != nil && maxsize < rc->typ->elem->size)
  1177				maxsize = rc->typ->elem->size;
  1178		}
  1179	
  1180		recvptr = nil;
  1181		if(maxsize > sizeof(void*))
  1182			recvptr = runtime·mal(maxsize);
  1183	
  1184		newselect(cases.len, &sel);
  1185		for(i=0; i<cases.len; i++) {
  1186			rc = &rcase[i];
  1187			switch(rc->dir) {
  1188			case SelectDefault:
  1189				selectdefault(sel, (void*)i, 0);
  1190				break;
  1191			case SelectSend:
  1192				if(rc->ch == nil)
  1193					break;
  1194				if(rc->typ->elem->size > sizeof(void*))
  1195					elem = (void*)rc->val;
  1196				else
  1197					elem = (void*)&rc->val;
  1198				selectsend(sel, rc->ch, (void*)i, elem, 0);
  1199				break;
  1200			case SelectRecv:
  1201				if(rc->ch == nil)
  1202					break;
  1203				if(rc->typ->elem->size > sizeof(void*))
  1204					elem = recvptr;
  1205				else
  1206					elem = &word;
  1207				selectrecv(sel, rc->ch, (void*)i, elem, &recvOK, 0);
  1208				break;
  1209			}
  1210		}
  1211	
  1212		chosen = (intgo)(uintptr)selectgo(&sel);
  1213		if(rcase[chosen].dir == SelectRecv && rcase[chosen].typ->elem->size > sizeof(void*))
  1214			word = (uintptr)recvptr;
  1215	
  1216		FLUSH(&chosen);
  1217		FLUSH(&word);
  1218		FLUSH(&recvOK);
  1219	}
  1220	
  1221	// closechan(sel *byte);
  1222	#pragma textflag 7
  1223	void
  1224	runtime·closechan(Hchan *c)
  1225	{
  1226		SudoG *sg;
  1227		G* gp;
  1228	
  1229		if(c == nil)
  1230			runtime·panicstring("close of nil channel");
  1231	
  1232		if(runtime·gcwaiting)
  1233			runtime·gosched();
  1234	
  1235		runtime·lock(c);
  1236		if(c->closed) {
  1237			runtime·unlock(c);
  1238			runtime·panicstring("close of closed channel");
  1239		}
  1240	
  1241		if(raceenabled) {
  1242			runtime·racewritepc(c, runtime·getcallerpc(&c), runtime·closechan);
  1243			runtime·racerelease(c);
  1244		}
  1245	
  1246		c->closed = true;
  1247	
  1248		// release all readers
  1249		for(;;) {
  1250			sg = dequeue(&c->recvq);
  1251			if(sg == nil)
  1252				break;
  1253			gp = sg->g;
  1254			gp->param = nil;
  1255			runtime·ready(gp);
  1256		}
  1257	
  1258		// release all writers
  1259		for(;;) {
  1260			sg = dequeue(&c->sendq);
  1261			if(sg == nil)
  1262				break;
  1263			gp = sg->g;
  1264			gp->param = nil;
  1265			runtime·ready(gp);
  1266		}
  1267	
  1268		runtime·unlock(c);
  1269	}
  1270	
  1271	// For reflect
  1272	//	func chanclose(c chan)
  1273	void
  1274	reflect·chanclose(Hchan *c)
  1275	{
  1276		runtime·closechan(c);
  1277	}
  1278	
  1279	// For reflect
  1280	//	func chanlen(c chan) (len int)
  1281	void
  1282	reflect·chanlen(Hchan *c, intgo len)
  1283	{
  1284		if(c == nil)
  1285			len = 0;
  1286		else
  1287			len = c->qcount;
  1288		FLUSH(&len);
  1289	}
  1290	
  1291	// For reflect
  1292	//	func chancap(c chan) int
  1293	void
  1294	reflect·chancap(Hchan *c, intgo cap)
  1295	{
  1296		if(c == nil)
  1297			cap = 0;
  1298		else
  1299			cap = c->dataqsiz;
  1300		FLUSH(&cap);
  1301	}
  1302	
  1303	static SudoG*
  1304	dequeue(WaitQ *q)
  1305	{
  1306		SudoG *sgp;
  1307	
  1308	loop:
  1309		sgp = q->first;
  1310		if(sgp == nil)
  1311			return nil;
  1312		q->first = sgp->link;
  1313	
  1314		// if sgp is stale, ignore it
  1315		if(sgp->selgen != NOSELGEN &&
  1316			(sgp->selgen != sgp->g->selgen ||
  1317			!runtime·cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
  1318			//prints("INVALID PSEUDOG POINTER\n");
  1319			goto loop;
  1320		}
  1321	
  1322		return sgp;
  1323	}
  1324	
  1325	static void
  1326	dequeueg(WaitQ *q)
  1327	{
  1328		SudoG **l, *sgp, *prevsgp;
  1329	
  1330		prevsgp = nil;
  1331		for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
  1332			if(sgp->g == g) {
  1333				*l = sgp->link;
  1334				if(q->last == sgp)
  1335					q->last = prevsgp;
  1336				break;
  1337			}
  1338		}
  1339	}
  1340	
  1341	static void
  1342	enqueue(WaitQ *q, SudoG *sgp)
  1343	{
  1344		sgp->link = nil;
  1345		if(q->first == nil) {
  1346			q->first = sgp;
  1347			q->last = sgp;
  1348			return;
  1349		}
  1350		q->last->link = sgp;
  1351		q->last = sgp;
  1352	}
  1353	
  1354	static void
  1355	racesync(Hchan *c, SudoG *sg)
  1356	{
  1357		runtime·racerelease(chanbuf(c, 0));
  1358		runtime·raceacquireg(sg->g, chanbuf(c, 0));
  1359		runtime·racereleaseg(sg->g, chanbuf(c, 0));
  1360		runtime·raceacquire(chanbuf(c, 0));
  1361	}

View as plain text