Run Format

Text file src/pkg/runtime/chan.c

     1	// Copyright 2009 The Go Authors. All rights reserved.
     2	// Use of this source code is governed by a BSD-style
     3	// license that can be found in the LICENSE file.
     4	
     5	#include "runtime.h"
     6	#include "arch_GOARCH.h"
     7	#include "type.h"
     8	#include "race.h"
     9	#include "malloc.h"
    10	#include "../../cmd/ld/textflag.h"
    11	
    12	#define	MAXALIGN	8
    13	#define	NOSELGEN	1
    14	
    15	typedef	struct	WaitQ	WaitQ;
    16	typedef	struct	SudoG	SudoG;
    17	typedef	struct	Select	Select;
    18	typedef	struct	Scase	Scase;
    19	
    20	struct	SudoG
    21	{
    22		G*	g;		// g and selgen constitute
    23		uint32	selgen;		// a weak pointer to g
    24		SudoG*	link;
    25		int64	releasetime;
    26		byte*	elem;		// data element
    27	};
    28	
    29	struct	WaitQ
    30	{
    31		SudoG*	first;
    32		SudoG*	last;
    33	};
    34	
    35	// The garbage collector is assuming that Hchan can only contain pointers into the stack
    36	// and cannot contain pointers into the heap.
    37	struct	Hchan
    38	{
    39		uintgo	qcount;			// total data in the q
    40		uintgo	dataqsiz;		// size of the circular q
    41		uint16	elemsize;
    42		uint16	pad;			// ensures proper alignment of the buffer that follows Hchan in memory
    43		bool	closed;
    44		Alg*	elemalg;		// interface for element type
    45		uintgo	sendx;			// send index
    46		uintgo	recvx;			// receive index
    47		WaitQ	recvq;			// list of recv waiters
    48		WaitQ	sendq;			// list of send waiters
    49		Lock;
    50	};
    51	
    52	uint32 runtime·Hchansize = sizeof(Hchan);
    53	
    54	// Buffer follows Hchan immediately in memory.
    55	// chanbuf(c, i) is pointer to the i'th slot in the buffer.
    56	#define chanbuf(c, i) ((byte*)((c)+1)+(uintptr)(c)->elemsize*(i))
    57	
    58	enum
    59	{
    60		debug = 0,
    61	
    62		// Scase.kind
    63		CaseRecv,
    64		CaseSend,
    65		CaseDefault,
    66	};
    67	
    68	struct	Scase
    69	{
    70		SudoG	sg;			// must be first member (cast to Scase)
    71		Hchan*	chan;			// chan
    72		byte*	pc;			// return pc
    73		uint16	kind;
    74		uint16	so;			// vararg of selected bool
    75		bool*	receivedp;		// pointer to received bool (recv2)
    76	};
    77	
    78	struct	Select
    79	{
    80		uint16	tcase;			// total count of scase[]
    81		uint16	ncase;			// currently filled scase[]
    82		uint16*	pollorder;		// case poll order
    83		Hchan**	lockorder;		// channel lock order
    84		Scase	scase[1];		// one per case (in order of appearance)
    85	};
    86	
    87	static	void	dequeueg(WaitQ*);
    88	static	SudoG*	dequeue(WaitQ*);
    89	static	void	enqueue(WaitQ*, SudoG*);
    90	static	void	destroychan(Hchan*);
    91	static	void	racesync(Hchan*, SudoG*);
    92	
    93	Hchan*
    94	runtime·makechan_c(ChanType *t, int64 hint)
    95	{
    96		Hchan *c;
    97		Type *elem;
    98	
    99		elem = t->elem;
   100	
   101		// compiler checks this but be safe.
   102		if(elem->size >= (1<<16))
   103			runtime·throw("makechan: invalid channel element type");
   104		if((sizeof(*c)%MAXALIGN) != 0 || elem->align > MAXALIGN)
   105			runtime·throw("makechan: bad alignment");
   106	
   107		if(hint < 0 || (intgo)hint != hint || (elem->size > 0 && hint > MaxMem / elem->size))
   108			runtime·panicstring("makechan: size out of range");
   109	
   110		// allocate memory in one call
   111		c = (Hchan*)runtime·mallocgc(sizeof(*c) + hint*elem->size, (uintptr)t | TypeInfo_Chan, 0);
   112		c->elemsize = elem->size;
   113		c->elemalg = elem->alg;
   114		c->dataqsiz = hint;
   115	
   116		if(debug)
   117			runtime·printf("makechan: chan=%p; elemsize=%D; elemalg=%p; dataqsiz=%D\n",
   118				c, (int64)elem->size, elem->alg, (int64)c->dataqsiz);
   119	
   120		return c;
   121	}
   122	
   123	// For reflect
   124	//	func makechan(typ *ChanType, size uint64) (chan)
   125	void
   126	reflect·makechan(ChanType *t, uint64 size, Hchan *c)
   127	{
   128		c = runtime·makechan_c(t, size);
   129		FLUSH(&c);
   130	}
   131	
   132	// makechan(t *ChanType, hint int64) (hchan *chan any);
   133	void
   134	runtime·makechan(ChanType *t, int64 hint, Hchan *ret)
   135	{
   136		ret = runtime·makechan_c(t, hint);
   137		FLUSH(&ret);
   138	}
   139	
   140	/*
   141	 * generic single channel send/recv
   142	 * if the bool pointer is nil,
   143	 * then the full exchange will
   144	 * occur. if pres is not nil,
   145	 * then the protocol will not
   146	 * sleep but return if it could
   147	 * not complete.
   148	 *
   149	 * sleep can wake up with g->param == nil
   150	 * when a channel involved in the sleep has
   151	 * been closed.  it is easiest to loop and re-run
   152	 * the operation; we'll see that it's now closed.
   153	 */
   154	void
   155	runtime·chansend(ChanType *t, Hchan *c, byte *ep, bool *pres, void *pc)
   156	{
   157		SudoG *sg;
   158		SudoG mysg;
   159		G* gp;
   160		int64 t0;
   161	
   162		if(c == nil) {
   163			USED(t);
   164			if(pres != nil) {
   165				*pres = false;
   166				return;
   167			}
   168			runtime·park(nil, nil, "chan send (nil chan)");
   169			return;  // not reached
   170		}
   171	
   172		if(debug) {
   173			runtime·printf("chansend: chan=%p; elem=", c);
   174			c->elemalg->print(c->elemsize, ep);
   175			runtime·prints("\n");
   176		}
   177	
   178		t0 = 0;
   179		mysg.releasetime = 0;
   180		if(runtime·blockprofilerate > 0) {
   181			t0 = runtime·cputicks();
   182			mysg.releasetime = -1;
   183		}
   184	
   185		runtime·lock(c);
   186		if(raceenabled)
   187			runtime·racereadpc(c, pc, runtime·chansend);
   188		if(c->closed)
   189			goto closed;
   190	
   191		if(c->dataqsiz > 0)
   192			goto asynch;
   193	
   194		sg = dequeue(&c->recvq);
   195		if(sg != nil) {
   196			if(raceenabled)
   197				racesync(c, sg);
   198			runtime·unlock(c);
   199	
   200			gp = sg->g;
   201			gp->param = sg;
   202			if(sg->elem != nil)
   203				c->elemalg->copy(c->elemsize, sg->elem, ep);
   204			if(sg->releasetime)
   205				sg->releasetime = runtime·cputicks();
   206			runtime·ready(gp);
   207	
   208			if(pres != nil)
   209				*pres = true;
   210			return;
   211		}
   212	
   213		if(pres != nil) {
   214			runtime·unlock(c);
   215			*pres = false;
   216			return;
   217		}
   218	
   219		mysg.elem = ep;
   220		mysg.g = g;
   221		mysg.selgen = NOSELGEN;
   222		g->param = nil;
   223		enqueue(&c->sendq, &mysg);
   224		runtime·park(runtime·unlock, c, "chan send");
   225	
   226		if(g->param == nil) {
   227			runtime·lock(c);
   228			if(!c->closed)
   229				runtime·throw("chansend: spurious wakeup");
   230			goto closed;
   231		}
   232	
   233		if(mysg.releasetime > 0)
   234			runtime·blockevent(mysg.releasetime - t0, 2);
   235	
   236		return;
   237	
   238	asynch:
   239		if(c->closed)
   240			goto closed;
   241	
   242		if(c->qcount >= c->dataqsiz) {
   243			if(pres != nil) {
   244				runtime·unlock(c);
   245				*pres = false;
   246				return;
   247			}
   248			mysg.g = g;
   249			mysg.elem = nil;
   250			mysg.selgen = NOSELGEN;
   251			enqueue(&c->sendq, &mysg);
   252			runtime·park(runtime·unlock, c, "chan send");
   253	
   254			runtime·lock(c);
   255			goto asynch;
   256		}
   257	
   258		if(raceenabled)
   259			runtime·racerelease(chanbuf(c, c->sendx));
   260	
   261		c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), ep);
   262		if(++c->sendx == c->dataqsiz)
   263			c->sendx = 0;
   264		c->qcount++;
   265	
   266		sg = dequeue(&c->recvq);
   267		if(sg != nil) {
   268			gp = sg->g;
   269			runtime·unlock(c);
   270			if(sg->releasetime)
   271				sg->releasetime = runtime·cputicks();
   272			runtime·ready(gp);
   273		} else
   274			runtime·unlock(c);
   275		if(pres != nil)
   276			*pres = true;
   277		if(mysg.releasetime > 0)
   278			runtime·blockevent(mysg.releasetime - t0, 2);
   279		return;
   280	
   281	closed:
   282		runtime·unlock(c);
   283		runtime·panicstring("send on closed channel");
   284	}
   285	
   286	
   287	void
   288	runtime·chanrecv(ChanType *t, Hchan* c, byte *ep, bool *selected, bool *received)
   289	{
   290		SudoG *sg;
   291		SudoG mysg;
   292		G *gp;
   293		int64 t0;
   294	
   295		if(debug)
   296			runtime·printf("chanrecv: chan=%p\n", c);
   297	
   298		if(c == nil) {
   299			USED(t);
   300			if(selected != nil) {
   301				*selected = false;
   302				return;
   303			}
   304			runtime·park(nil, nil, "chan receive (nil chan)");
   305			return;  // not reached
   306		}
   307	
   308		t0 = 0;
   309		mysg.releasetime = 0;
   310		if(runtime·blockprofilerate > 0) {
   311			t0 = runtime·cputicks();
   312			mysg.releasetime = -1;
   313		}
   314	
   315		runtime·lock(c);
   316		if(c->dataqsiz > 0)
   317			goto asynch;
   318	
   319		if(c->closed)
   320			goto closed;
   321	
   322		sg = dequeue(&c->sendq);
   323		if(sg != nil) {
   324			if(raceenabled)
   325				racesync(c, sg);
   326			runtime·unlock(c);
   327	
   328			if(ep != nil)
   329				c->elemalg->copy(c->elemsize, ep, sg->elem);
   330			gp = sg->g;
   331			gp->param = sg;
   332			if(sg->releasetime)
   333				sg->releasetime = runtime·cputicks();
   334			runtime·ready(gp);
   335	
   336			if(selected != nil)
   337				*selected = true;
   338			if(received != nil)
   339				*received = true;
   340			return;
   341		}
   342	
   343		if(selected != nil) {
   344			runtime·unlock(c);
   345			*selected = false;
   346			return;
   347		}
   348	
   349		mysg.elem = ep;
   350		mysg.g = g;
   351		mysg.selgen = NOSELGEN;
   352		g->param = nil;
   353		enqueue(&c->recvq, &mysg);
   354		runtime·park(runtime·unlock, c, "chan receive");
   355	
   356		if(g->param == nil) {
   357			runtime·lock(c);
   358			if(!c->closed)
   359				runtime·throw("chanrecv: spurious wakeup");
   360			goto closed;
   361		}
   362	
   363		if(received != nil)
   364			*received = true;
   365		if(mysg.releasetime > 0)
   366			runtime·blockevent(mysg.releasetime - t0, 2);
   367		return;
   368	
   369	asynch:
   370		if(c->qcount <= 0) {
   371			if(c->closed)
   372				goto closed;
   373	
   374			if(selected != nil) {
   375				runtime·unlock(c);
   376				*selected = false;
   377				if(received != nil)
   378					*received = false;
   379				return;
   380			}
   381			mysg.g = g;
   382			mysg.elem = nil;
   383			mysg.selgen = NOSELGEN;
   384			enqueue(&c->recvq, &mysg);
   385			runtime·park(runtime·unlock, c, "chan receive");
   386	
   387			runtime·lock(c);
   388			goto asynch;
   389		}
   390	
   391		if(raceenabled)
   392			runtime·raceacquire(chanbuf(c, c->recvx));
   393	
   394		if(ep != nil)
   395			c->elemalg->copy(c->elemsize, ep, chanbuf(c, c->recvx));
   396		c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
   397		if(++c->recvx == c->dataqsiz)
   398			c->recvx = 0;
   399		c->qcount--;
   400	
   401		sg = dequeue(&c->sendq);
   402		if(sg != nil) {
   403			gp = sg->g;
   404			runtime·unlock(c);
   405			if(sg->releasetime)
   406				sg->releasetime = runtime·cputicks();
   407			runtime·ready(gp);
   408		} else
   409			runtime·unlock(c);
   410	
   411		if(selected != nil)
   412			*selected = true;
   413		if(received != nil)
   414			*received = true;
   415		if(mysg.releasetime > 0)
   416			runtime·blockevent(mysg.releasetime - t0, 2);
   417		return;
   418	
   419	closed:
   420		if(ep != nil)
   421			c->elemalg->copy(c->elemsize, ep, nil);
   422		if(selected != nil)
   423			*selected = true;
   424		if(received != nil)
   425			*received = false;
   426		if(raceenabled)
   427			runtime·raceacquire(c);
   428		runtime·unlock(c);
   429		if(mysg.releasetime > 0)
   430			runtime·blockevent(mysg.releasetime - t0, 2);
   431	}
   432	
   433	// chansend1(hchan *chan any, elem any);
   434	#pragma textflag NOSPLIT
   435	void
   436	runtime·chansend1(ChanType *t, Hchan* c, ...)
   437	{
   438		runtime·chansend(t, c, (byte*)(&c+1), nil, runtime·getcallerpc(&t));
   439	}
   440	
   441	// chanrecv1(hchan *chan any) (elem any);
   442	#pragma textflag NOSPLIT
   443	void
   444	runtime·chanrecv1(ChanType *t, Hchan* c, ...)
   445	{
   446		runtime·chanrecv(t, c, (byte*)(&c+1), nil, nil);
   447	}
   448	
   449	// chanrecv2(hchan *chan any) (elem any, received bool);
   450	#pragma textflag NOSPLIT
   451	void
   452	runtime·chanrecv2(ChanType *t, Hchan* c, ...)
   453	{
   454		byte *ae, *ap;
   455	
   456		ae = (byte*)(&c+1);
   457		ap = ae + t->elem->size;
   458		runtime·chanrecv(t, c, ae, nil, ap);
   459	}
   460	
   461	// func selectnbsend(c chan any, elem any) bool
   462	//
   463	// compiler implements
   464	//
   465	//	select {
   466	//	case c <- v:
   467	//		... foo
   468	//	default:
   469	//		... bar
   470	//	}
   471	//
   472	// as
   473	//
   474	//	if selectnbsend(c, v) {
   475	//		... foo
   476	//	} else {
   477	//		... bar
   478	//	}
   479	//
   480	#pragma textflag NOSPLIT
   481	void
   482	runtime·selectnbsend(ChanType *t, Hchan *c, ...)
   483	{
   484		byte *ae, *ap;
   485	
   486		ae = (byte*)(&c + 1);
   487		ap = ae + ROUND(t->elem->size, Structrnd);
   488		runtime·chansend(t, c, ae, ap, runtime·getcallerpc(&t));
   489	}
   490	
   491	// func selectnbrecv(elem *any, c chan any) bool
   492	//
   493	// compiler implements
   494	//
   495	//	select {
   496	//	case v = <-c:
   497	//		... foo
   498	//	default:
   499	//		... bar
   500	//	}
   501	//
   502	// as
   503	//
   504	//	if selectnbrecv(&v, c) {
   505	//		... foo
   506	//	} else {
   507	//		... bar
   508	//	}
   509	//
   510	#pragma textflag NOSPLIT
   511	void
   512	runtime·selectnbrecv(ChanType *t, byte *v, Hchan *c, bool selected)
   513	{
   514		runtime·chanrecv(t, c, v, &selected, nil);
   515	}
   516	
   517	// func selectnbrecv2(elem *any, ok *bool, c chan any) bool
   518	//
   519	// compiler implements
   520	//
   521	//	select {
   522	//	case v, ok = <-c:
   523	//		... foo
   524	//	default:
   525	//		... bar
   526	//	}
   527	//
   528	// as
   529	//
   530	//	if c != nil && selectnbrecv2(&v, &ok, c) {
   531	//		... foo
   532	//	} else {
   533	//		... bar
   534	//	}
   535	//
   536	#pragma textflag NOSPLIT
   537	void
   538	runtime·selectnbrecv2(ChanType *t, byte *v, bool *received, Hchan *c, bool selected)
   539	{
   540		runtime·chanrecv(t, c, v, &selected, received);
   541	}
   542	
   543	// For reflect:
   544	//	func chansend(c chan, val iword, nb bool) (selected bool)
   545	// where an iword is the same word an interface value would use:
   546	// the actual data if it fits, or else a pointer to the data.
   547	//
   548	// The "uintptr selected" is really "bool selected" but saying
   549	// uintptr gets us the right alignment for the output parameter block.
   550	#pragma textflag NOSPLIT
   551	void
   552	reflect·chansend(ChanType *t, Hchan *c, uintptr val, bool nb, uintptr selected)
   553	{
   554		bool *sp;
   555		byte *vp;
   556	
   557		if(nb) {
   558			selected = false;
   559			sp = (bool*)&selected;
   560		} else {
   561			*(bool*)&selected = true;
   562			FLUSH(&selected);
   563			sp = nil;
   564		}
   565		if(t->elem->size <= sizeof(val))
   566			vp = (byte*)&val;
   567		else
   568			vp = (byte*)val;
   569		runtime·chansend(t, c, vp, sp, runtime·getcallerpc(&t));
   570	}
   571	
   572	// For reflect:
   573	//	func chanrecv(c chan, nb bool) (val iword, selected, received bool)
   574	// where an iword is the same word an interface value would use:
   575	// the actual data if it fits, or else a pointer to the data.
   576	void
   577	reflect·chanrecv(ChanType *t, Hchan *c, bool nb, uintptr val, bool selected, bool received)
   578	{
   579		byte *vp;
   580		bool *sp;
   581	
   582		if(nb) {
   583			selected = false;
   584			sp = &selected;
   585		} else {
   586			selected = true;
   587			FLUSH(&selected);
   588			sp = nil;
   589		}
   590		received = false;
   591		FLUSH(&received);
   592		if(t->elem->size <= sizeof(val)) {
   593			val = 0;
   594			vp = (byte*)&val;
   595		} else {
   596			vp = runtime·mal(t->elem->size);
   597			val = (uintptr)vp;
   598			FLUSH(&val);
   599		}
   600		runtime·chanrecv(t, c, vp, sp, &received);
   601	}
   602	
   603	static void newselect(int32, Select**);
   604	
   605	// newselect(size uint32) (sel *byte);
   606	#pragma textflag NOSPLIT
   607	void
   608	runtime·newselect(int32 size, ...)
   609	{
   610		int32 o;
   611		Select **selp;
   612	
   613		o = ROUND(sizeof(size), Structrnd);
   614		selp = (Select**)((byte*)&size + o);
   615		newselect(size, selp);
   616	}
   617	
   618	static void
   619	newselect(int32 size, Select **selp)
   620	{
   621		int32 n;
   622		Select *sel;
   623	
   624		n = 0;
   625		if(size > 1)
   626			n = size-1;
   627	
   628		// allocate all the memory we need in a single allocation
   629		// start with Select with size cases
   630		// then lockorder with size entries
   631		// then pollorder with size entries
   632		sel = runtime·mal(sizeof(*sel) +
   633			n*sizeof(sel->scase[0]) +
   634			size*sizeof(sel->lockorder[0]) +
   635			size*sizeof(sel->pollorder[0]));
   636	
   637		sel->tcase = size;
   638		sel->ncase = 0;
   639		sel->lockorder = (void*)(sel->scase + size);
   640		sel->pollorder = (void*)(sel->lockorder + size);
   641		*selp = sel;
   642	
   643		if(debug)
   644			runtime·printf("newselect s=%p size=%d\n", sel, size);
   645	}
   646	
   647	// cut in half to give stack a chance to split
   648	static void selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so);
   649	
   650	// selectsend(sel *byte, hchan *chan any, elem *any) (selected bool);
   651	#pragma textflag NOSPLIT
   652	void
   653	runtime·selectsend(Select *sel, Hchan *c, void *elem, bool selected)
   654	{
   655		selected = false;
   656		FLUSH(&selected);
   657	
   658		// nil cases do not compete
   659		if(c == nil)
   660			return;
   661	
   662		selectsend(sel, c, runtime·getcallerpc(&sel), elem, (byte*)&selected - (byte*)&sel);
   663	}
   664	
   665	static void
   666	selectsend(Select *sel, Hchan *c, void *pc, void *elem, int32 so)
   667	{
   668		int32 i;
   669		Scase *cas;
   670	
   671		i = sel->ncase;
   672		if(i >= sel->tcase)
   673			runtime·throw("selectsend: too many cases");
   674		sel->ncase = i+1;
   675		cas = &sel->scase[i];
   676	
   677		cas->pc = pc;
   678		cas->chan = c;
   679		cas->so = so;
   680		cas->kind = CaseSend;
   681		cas->sg.elem = elem;
   682	
   683		if(debug)
   684			runtime·printf("selectsend s=%p pc=%p chan=%p so=%d\n",
   685				sel, cas->pc, cas->chan, cas->so);
   686	}
   687	
   688	// cut in half to give stack a chance to split
   689	static void selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool*, int32 so);
   690	
   691	// selectrecv(sel *byte, hchan *chan any, elem *any) (selected bool);
   692	#pragma textflag NOSPLIT
   693	void
   694	runtime·selectrecv(Select *sel, Hchan *c, void *elem, bool selected)
   695	{
   696		selected = false;
   697		FLUSH(&selected);
   698	
   699		// nil cases do not compete
   700		if(c == nil)
   701			return;
   702	
   703		selectrecv(sel, c, runtime·getcallerpc(&sel), elem, nil, (byte*)&selected - (byte*)&sel);
   704	}
   705	
   706	// selectrecv2(sel *byte, hchan *chan any, elem *any, received *bool) (selected bool);
   707	#pragma textflag NOSPLIT
   708	void
   709	runtime·selectrecv2(Select *sel, Hchan *c, void *elem, bool *received, bool selected)
   710	{
   711		selected = false;
   712		FLUSH(&selected);
   713	
   714		// nil cases do not compete
   715		if(c == nil)
   716			return;
   717	
   718		selectrecv(sel, c, runtime·getcallerpc(&sel), elem, received, (byte*)&selected - (byte*)&sel);
   719	}
   720	
   721	static void
   722	selectrecv(Select *sel, Hchan *c, void *pc, void *elem, bool *received, int32 so)
   723	{
   724		int32 i;
   725		Scase *cas;
   726	
   727		i = sel->ncase;
   728		if(i >= sel->tcase)
   729			runtime·throw("selectrecv: too many cases");
   730		sel->ncase = i+1;
   731		cas = &sel->scase[i];
   732		cas->pc = pc;
   733		cas->chan = c;
   734	
   735		cas->so = so;
   736		cas->kind = CaseRecv;
   737		cas->sg.elem = elem;
   738		cas->receivedp = received;
   739	
   740		if(debug)
   741			runtime·printf("selectrecv s=%p pc=%p chan=%p so=%d\n",
   742				sel, cas->pc, cas->chan, cas->so);
   743	}
   744	
   745	// cut in half to give stack a chance to split
   746	static void selectdefault(Select*, void*, int32);
   747	
   748	// selectdefault(sel *byte) (selected bool);
   749	#pragma textflag NOSPLIT
   750	void
   751	runtime·selectdefault(Select *sel, bool selected)
   752	{
   753		selected = false;
   754		FLUSH(&selected);
   755	
   756		selectdefault(sel, runtime·getcallerpc(&sel), (byte*)&selected - (byte*)&sel);
   757	}
   758	
   759	static void
   760	selectdefault(Select *sel, void *callerpc, int32 so)
   761	{
   762		int32 i;
   763		Scase *cas;
   764	
   765		i = sel->ncase;
   766		if(i >= sel->tcase)
   767			runtime·throw("selectdefault: too many cases");
   768		sel->ncase = i+1;
   769		cas = &sel->scase[i];
   770		cas->pc = callerpc;
   771		cas->chan = nil;
   772	
   773		cas->so = so;
   774		cas->kind = CaseDefault;
   775	
   776		if(debug)
   777			runtime·printf("selectdefault s=%p pc=%p so=%d\n",
   778				sel, cas->pc, cas->so);
   779	}
   780	
   781	static void
   782	sellock(Select *sel)
   783	{
   784		uint32 i;
   785		Hchan *c, *c0;
   786	
   787		c = nil;
   788		for(i=0; i<sel->ncase; i++) {
   789			c0 = sel->lockorder[i];
   790			if(c0 && c0 != c) {
   791				c = sel->lockorder[i];
   792				runtime·lock(c);
   793			}
   794		}
   795	}
   796	
   797	static void
   798	selunlock(Select *sel)
   799	{
   800		int32 i, n, r;
   801		Hchan *c;
   802	
   803		// We must be very careful here to not touch sel after we have unlocked
   804		// the last lock, because sel can be freed right after the last unlock.
   805		// Consider the following situation.
   806		// First M calls runtime·park() in runtime·selectgo() passing the sel.
   807		// Once runtime·park() has unlocked the last lock, another M makes
   808		// the G that calls select runnable again and schedules it for execution.
   809		// When the G runs on another M, it locks all the locks and frees sel.
   810		// Now if the first M touches sel, it will access freed memory.
   811		n = (int32)sel->ncase;
   812		r = 0;
   813		// skip the default case
   814		if(n>0 && sel->lockorder[0] == nil)
   815			r = 1;
   816		for(i = n-1; i >= r; i--) {
   817			c = sel->lockorder[i];
   818			if(i>0 && sel->lockorder[i-1] == c)
   819				continue;  // will unlock it on the next iteration
   820			runtime·unlock(c);
   821		}
   822	}
   823	
   824	void
   825	runtime·block(void)
   826	{
   827		runtime·park(nil, nil, "select (no cases)");	// forever
   828	}
   829	
   830	static void* selectgo(Select**);
   831	
   832	// selectgo(sel *byte);
   833	//
   834	// overwrites return pc on stack to signal which case of the select
   835	// to run, so cannot appear at the top of a split stack.
   836	#pragma textflag NOSPLIT
   837	void
   838	runtime·selectgo(Select *sel)
   839	{
   840		runtime·setcallerpc(&sel, selectgo(&sel));
   841	}
   842	
   843	static void*
   844	selectgo(Select **selp)
   845	{
   846		Select *sel;
   847		uint32 o, i, j, k;
   848		int64 t0;
   849		Scase *cas, *dfl;
   850		Hchan *c;
   851		SudoG *sg;
   852		G *gp;
   853		byte *as;
   854		void *pc;
   855	
   856		sel = *selp;
   857	
   858		if(debug)
   859			runtime·printf("select: sel=%p\n", sel);
   860	
   861		t0 = 0;
   862		if(runtime·blockprofilerate > 0) {
   863			t0 = runtime·cputicks();
   864			for(i=0; i<sel->ncase; i++)
   865				sel->scase[i].sg.releasetime = -1;
   866		}
   867	
   868		// The compiler rewrites selects that statically have
   869		// only 0 or 1 cases plus default into simpler constructs.
   870		// The only way we can end up with such small sel->ncase
   871		// values here is for a larger select in which most channels
   872		// have been nilled out.  The general code handles those
   873		// cases correctly, and they are rare enough not to bother
   874		// optimizing (and needing to test).
   875	
   876		// generate permuted order
   877		for(i=0; i<sel->ncase; i++)
   878			sel->pollorder[i] = i;
   879		for(i=1; i<sel->ncase; i++) {
   880			o = sel->pollorder[i];
   881			j = runtime·fastrand1()%(i+1);
   882			sel->pollorder[i] = sel->pollorder[j];
   883			sel->pollorder[j] = o;
   884		}
   885	
   886		// sort the cases by Hchan address to get the locking order.
   887		// simple heap sort, to guarantee n log n time and constant stack footprint.
   888		for(i=0; i<sel->ncase; i++) {
   889			j = i;
   890			c = sel->scase[j].chan;
   891			while(j > 0 && sel->lockorder[k=(j-1)/2] < c) {
   892				sel->lockorder[j] = sel->lockorder[k];
   893				j = k;
   894			}
   895			sel->lockorder[j] = c;
   896		}
   897		for(i=sel->ncase; i-->0; ) {
   898			c = sel->lockorder[i];
   899			sel->lockorder[i] = sel->lockorder[0];
   900			j = 0;
   901			for(;;) {
   902				k = j*2+1;
   903				if(k >= i)
   904					break;
   905				if(k+1 < i && sel->lockorder[k] < sel->lockorder[k+1])
   906					k++;
   907				if(c < sel->lockorder[k]) {
   908					sel->lockorder[j] = sel->lockorder[k];
   909					j = k;
   910					continue;
   911				}
   912				break;
   913			}
   914			sel->lockorder[j] = c;
   915		}
   916		/*
   917		for(i=0; i+1<sel->ncase; i++)
   918			if(sel->lockorder[i] > sel->lockorder[i+1]) {
   919				runtime·printf("i=%d %p %p\n", i, sel->lockorder[i], sel->lockorder[i+1]);
   920				runtime·throw("select: broken sort");
   921			}
   922		*/
   923		sellock(sel);
   924	
   925	loop:
   926		// pass 1 - look for something already waiting
   927		dfl = nil;
   928		for(i=0; i<sel->ncase; i++) {
   929			o = sel->pollorder[i];
   930			cas = &sel->scase[o];
   931			c = cas->chan;
   932	
   933			switch(cas->kind) {
   934			case CaseRecv:
   935				if(c->dataqsiz > 0) {
   936					if(c->qcount > 0)
   937						goto asyncrecv;
   938				} else {
   939					sg = dequeue(&c->sendq);
   940					if(sg != nil)
   941						goto syncrecv;
   942				}
   943				if(c->closed)
   944					goto rclose;
   945				break;
   946	
   947			case CaseSend:
   948				if(raceenabled)
   949					runtime·racereadpc(c, cas->pc, runtime·chansend);
   950				if(c->closed)
   951					goto sclose;
   952				if(c->dataqsiz > 0) {
   953					if(c->qcount < c->dataqsiz)
   954						goto asyncsend;
   955				} else {
   956					sg = dequeue(&c->recvq);
   957					if(sg != nil)
   958						goto syncsend;
   959				}
   960				break;
   961	
   962			case CaseDefault:
   963				dfl = cas;
   964				break;
   965			}
   966		}
   967	
   968		if(dfl != nil) {
   969			selunlock(sel);
   970			cas = dfl;
   971			goto retc;
   972		}
   973	
   974	
   975		// pass 2 - enqueue on all chans
   976		for(i=0; i<sel->ncase; i++) {
   977			o = sel->pollorder[i];
   978			cas = &sel->scase[o];
   979			c = cas->chan;
   980			sg = &cas->sg;
   981			sg->g = g;
   982			sg->selgen = g->selgen;
   983	
   984			switch(cas->kind) {
   985			case CaseRecv:
   986				enqueue(&c->recvq, sg);
   987				break;
   988	
   989			case CaseSend:
   990				enqueue(&c->sendq, sg);
   991				break;
   992			}
   993		}
   994	
   995		g->param = nil;
   996		runtime·park((void(*)(Lock*))selunlock, (Lock*)sel, "select");
   997	
   998		sellock(sel);
   999		sg = g->param;
  1000	
  1001		// pass 3 - dequeue from unsuccessful chans
  1002		// otherwise they stack up on quiet channels
  1003		for(i=0; i<sel->ncase; i++) {
  1004			cas = &sel->scase[i];
  1005			if(cas != (Scase*)sg) {
  1006				c = cas->chan;
  1007				if(cas->kind == CaseSend)
  1008					dequeueg(&c->sendq);
  1009				else
  1010					dequeueg(&c->recvq);
  1011			}
  1012		}
  1013	
  1014		if(sg == nil)
  1015			goto loop;
  1016	
  1017		cas = (Scase*)sg;
  1018		c = cas->chan;
  1019	
  1020		if(c->dataqsiz > 0)
  1021			runtime·throw("selectgo: shouldn't happen");
  1022	
  1023		if(debug)
  1024			runtime·printf("wait-return: sel=%p c=%p cas=%p kind=%d\n",
  1025				sel, c, cas, cas->kind);
  1026	
  1027		if(cas->kind == CaseRecv) {
  1028			if(cas->receivedp != nil)
  1029				*cas->receivedp = true;
  1030		}
  1031	
  1032		selunlock(sel);
  1033		goto retc;
  1034	
  1035	asyncrecv:
  1036		// can receive from buffer
  1037		if(raceenabled)
  1038			runtime·raceacquire(chanbuf(c, c->recvx));
  1039		if(cas->receivedp != nil)
  1040			*cas->receivedp = true;
  1041		if(cas->sg.elem != nil)
  1042			c->elemalg->copy(c->elemsize, cas->sg.elem, chanbuf(c, c->recvx));
  1043		c->elemalg->copy(c->elemsize, chanbuf(c, c->recvx), nil);
  1044		if(++c->recvx == c->dataqsiz)
  1045			c->recvx = 0;
  1046		c->qcount--;
  1047		sg = dequeue(&c->sendq);
  1048		if(sg != nil) {
  1049			gp = sg->g;
  1050			selunlock(sel);
  1051			if(sg->releasetime)
  1052				sg->releasetime = runtime·cputicks();
  1053			runtime·ready(gp);
  1054		} else {
  1055			selunlock(sel);
  1056		}
  1057		goto retc;
  1058	
  1059	asyncsend:
  1060		// can send to buffer
  1061		if(raceenabled)
  1062			runtime·racerelease(chanbuf(c, c->sendx));
  1063		c->elemalg->copy(c->elemsize, chanbuf(c, c->sendx), cas->sg.elem);
  1064		if(++c->sendx == c->dataqsiz)
  1065			c->sendx = 0;
  1066		c->qcount++;
  1067		sg = dequeue(&c->recvq);
  1068		if(sg != nil) {
  1069			gp = sg->g;
  1070			selunlock(sel);
  1071			if(sg->releasetime)
  1072				sg->releasetime = runtime·cputicks();
  1073			runtime·ready(gp);
  1074		} else {
  1075			selunlock(sel);
  1076		}
  1077		goto retc;
  1078	
  1079	syncrecv:
  1080		// can receive from sleeping sender (sg)
  1081		if(raceenabled)
  1082			racesync(c, sg);
  1083		selunlock(sel);
  1084		if(debug)
  1085			runtime·printf("syncrecv: sel=%p c=%p o=%d\n", sel, c, o);
  1086		if(cas->receivedp != nil)
  1087			*cas->receivedp = true;
  1088		if(cas->sg.elem != nil)
  1089			c->elemalg->copy(c->elemsize, cas->sg.elem, sg->elem);
  1090		gp = sg->g;
  1091		gp->param = sg;
  1092		if(sg->releasetime)
  1093			sg->releasetime = runtime·cputicks();
  1094		runtime·ready(gp);
  1095		goto retc;
  1096	
  1097	rclose:
  1098		// read at end of closed channel
  1099		selunlock(sel);
  1100		if(cas->receivedp != nil)
  1101			*cas->receivedp = false;
  1102		if(cas->sg.elem != nil)
  1103			c->elemalg->copy(c->elemsize, cas->sg.elem, nil);
  1104		if(raceenabled)
  1105			runtime·raceacquire(c);
  1106		goto retc;
  1107	
  1108	syncsend:
  1109		// can send to sleeping receiver (sg)
  1110		if(raceenabled)
  1111			racesync(c, sg);
  1112		selunlock(sel);
  1113		if(debug)
  1114			runtime·printf("syncsend: sel=%p c=%p o=%d\n", sel, c, o);
  1115		if(sg->elem != nil)
  1116			c->elemalg->copy(c->elemsize, sg->elem, cas->sg.elem);
  1117		gp = sg->g;
  1118		gp->param = sg;
  1119		if(sg->releasetime)
  1120			sg->releasetime = runtime·cputicks();
  1121		runtime·ready(gp);
  1122	
  1123	retc:
  1124		// return pc corresponding to chosen case.
  1125		// Set boolean passed during select creation
  1126		// (at offset selp + cas->so) to true.
  1127		// If cas->so == 0, this is a reflect-driven select and we
  1128		// don't need to update the boolean.
  1129		pc = cas->pc;
  1130		if(cas->so > 0) {
  1131			as = (byte*)selp + cas->so;
  1132			*as = true;
  1133		}
  1134		if(cas->sg.releasetime > 0)
  1135			runtime·blockevent(cas->sg.releasetime - t0, 2);
  1136		runtime·free(sel);
  1137		return pc;
  1138	
  1139	sclose:
  1140		// send on closed channel
  1141		selunlock(sel);
  1142		runtime·panicstring("send on closed channel");
  1143		return nil;  // not reached
  1144	}
  1145	
  1146	// This struct must match ../reflect/value.go:/runtimeSelect.
  1147	typedef struct runtimeSelect runtimeSelect;
  1148	struct runtimeSelect
  1149	{
  1150		uintptr dir;
  1151		ChanType *typ;
  1152		Hchan *ch;
  1153		uintptr val;
  1154	};
  1155	
  1156	// This enum must match ../reflect/value.go:/SelectDir.
  1157	enum SelectDir {
  1158		SelectSend = 1,
  1159		SelectRecv,
  1160		SelectDefault,
  1161	};
  1162	
  1163	// func rselect(cases []runtimeSelect) (chosen int, word uintptr, recvOK bool)
  1164	void
  1165	reflect·rselect(Slice cases, intgo chosen, uintptr word, bool recvOK)
  1166	{
  1167		int32 i;
  1168		Select *sel;
  1169		runtimeSelect* rcase, *rc;
  1170		void *elem;
  1171		void *recvptr;
  1172		uintptr maxsize;
  1173	
  1174		chosen = -1;
  1175		word = 0;
  1176		recvOK = false;
  1177	
  1178		maxsize = 0;
  1179		rcase = (runtimeSelect*)cases.array;
  1180		for(i=0; i<cases.len; i++) {
  1181			rc = &rcase[i];
  1182			if(rc->dir == SelectRecv && rc->ch != nil && maxsize < rc->typ->elem->size)
  1183				maxsize = rc->typ->elem->size;
  1184		}
  1185	
  1186		recvptr = nil;
  1187		if(maxsize > sizeof(void*))
  1188			recvptr = runtime·mal(maxsize);
  1189	
  1190		newselect(cases.len, &sel);
  1191		for(i=0; i<cases.len; i++) {
  1192			rc = &rcase[i];
  1193			switch(rc->dir) {
  1194			case SelectDefault:
  1195				selectdefault(sel, (void*)i, 0);
  1196				break;
  1197			case SelectSend:
  1198				if(rc->ch == nil)
  1199					break;
  1200				if(rc->typ->elem->size > sizeof(void*))
  1201					elem = (void*)rc->val;
  1202				else
  1203					elem = (void*)&rc->val;
  1204				selectsend(sel, rc->ch, (void*)i, elem, 0);
  1205				break;
  1206			case SelectRecv:
  1207				if(rc->ch == nil)
  1208					break;
  1209				if(rc->typ->elem->size > sizeof(void*))
  1210					elem = recvptr;
  1211				else
  1212					elem = &word;
  1213				selectrecv(sel, rc->ch, (void*)i, elem, &recvOK, 0);
  1214				break;
  1215			}
  1216		}
  1217	
  1218		chosen = (intgo)(uintptr)selectgo(&sel);
  1219		if(rcase[chosen].dir == SelectRecv && rcase[chosen].typ->elem->size > sizeof(void*))
  1220			word = (uintptr)recvptr;
  1221	
  1222		FLUSH(&chosen);
  1223		FLUSH(&word);
  1224		FLUSH(&recvOK);
  1225	}
  1226	
  1227	static void closechan(Hchan *c, void *pc);
  1228	
  1229	// closechan(sel *byte);
  1230	#pragma textflag NOSPLIT
  1231	void
  1232	runtime·closechan(Hchan *c)
  1233	{
  1234		closechan(c, runtime·getcallerpc(&c));
  1235	}
  1236	
  1237	// For reflect
  1238	//	func chanclose(c chan)
  1239	#pragma textflag NOSPLIT
  1240	void
  1241	reflect·chanclose(Hchan *c)
  1242	{
  1243		closechan(c, runtime·getcallerpc(&c));
  1244	}
  1245	
  1246	static void
  1247	closechan(Hchan *c, void *pc)
  1248	{
  1249		SudoG *sg;
  1250		G* gp;
  1251	
  1252		if(c == nil)
  1253			runtime·panicstring("close of nil channel");
  1254	
  1255		runtime·lock(c);
  1256		if(c->closed) {
  1257			runtime·unlock(c);
  1258			runtime·panicstring("close of closed channel");
  1259		}
  1260	
  1261		if(raceenabled) {
  1262			runtime·racewritepc(c, pc, runtime·closechan);
  1263			runtime·racerelease(c);
  1264		}
  1265	
  1266		c->closed = true;
  1267	
  1268		// release all readers
  1269		for(;;) {
  1270			sg = dequeue(&c->recvq);
  1271			if(sg == nil)
  1272				break;
  1273			gp = sg->g;
  1274			gp->param = nil;
  1275			if(sg->releasetime)
  1276				sg->releasetime = runtime·cputicks();
  1277			runtime·ready(gp);
  1278		}
  1279	
  1280		// release all writers
  1281		for(;;) {
  1282			sg = dequeue(&c->sendq);
  1283			if(sg == nil)
  1284				break;
  1285			gp = sg->g;
  1286			gp->param = nil;
  1287			if(sg->releasetime)
  1288				sg->releasetime = runtime·cputicks();
  1289			runtime·ready(gp);
  1290		}
  1291	
  1292		runtime·unlock(c);
  1293	}
  1294	
  1295	// For reflect
  1296	//	func chanlen(c chan) (len int)
  1297	void
  1298	reflect·chanlen(Hchan *c, intgo len)
  1299	{
  1300		if(c == nil)
  1301			len = 0;
  1302		else
  1303			len = c->qcount;
  1304		FLUSH(&len);
  1305	}
  1306	
  1307	// For reflect
  1308	//	func chancap(c chan) int
  1309	void
  1310	reflect·chancap(Hchan *c, intgo cap)
  1311	{
  1312		if(c == nil)
  1313			cap = 0;
  1314		else
  1315			cap = c->dataqsiz;
  1316		FLUSH(&cap);
  1317	}
  1318	
  1319	static SudoG*
  1320	dequeue(WaitQ *q)
  1321	{
  1322		SudoG *sgp;
  1323	
  1324	loop:
  1325		sgp = q->first;
  1326		if(sgp == nil)
  1327			return nil;
  1328		q->first = sgp->link;
  1329	
  1330		// if sgp is stale, ignore it
  1331		if(sgp->selgen != NOSELGEN &&
  1332			(sgp->selgen != sgp->g->selgen ||
  1333			!runtime·cas(&sgp->g->selgen, sgp->selgen, sgp->selgen + 2))) {
  1334			//prints("INVALID PSEUDOG POINTER\n");
  1335			goto loop;
  1336		}
  1337	
  1338		return sgp;
  1339	}
  1340	
  1341	static void
  1342	dequeueg(WaitQ *q)
  1343	{
  1344		SudoG **l, *sgp, *prevsgp;
  1345	
  1346		prevsgp = nil;
  1347		for(l=&q->first; (sgp=*l) != nil; l=&sgp->link, prevsgp=sgp) {
  1348			if(sgp->g == g) {
  1349				*l = sgp->link;
  1350				if(q->last == sgp)
  1351					q->last = prevsgp;
  1352				break;
  1353			}
  1354		}
  1355	}
  1356	
  1357	static void
  1358	enqueue(WaitQ *q, SudoG *sgp)
  1359	{
  1360		sgp->link = nil;
  1361		if(q->first == nil) {
  1362			q->first = sgp;
  1363			q->last = sgp;
  1364			return;
  1365		}
  1366		q->last->link = sgp;
  1367		q->last = sgp;
  1368	}
  1369	
  1370	static void
  1371	racesync(Hchan *c, SudoG *sg)
  1372	{
  1373		runtime·racerelease(chanbuf(c, 0));
  1374		runtime·raceacquireg(sg->g, chanbuf(c, 0));
  1375		runtime·racereleaseg(sg->g, chanbuf(c, 0));
  1376		runtime·raceacquire(chanbuf(c, 0));
  1377	}

View as plain text