Source file
src/runtime/netpoll.go
Documentation: runtime
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "runtime/internal/atomic"
11 "unsafe"
12 )
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 const (
39 pollNoError = 0
40 pollErrClosing = 1
41 pollErrTimeout = 2
42 pollErrNotPollable = 3
43 )
44
45
46
47
48
49
50
51
52
53
54
55
56
57 const (
58 pdReady uintptr = 1
59 pdWait uintptr = 2
60 )
61
62 const pollBlockSize = 4 * 1024
63
64
65
66
67
68
69 type pollDesc struct {
70 link *pollDesc
71
72
73
74
75
76
77
78
79 lock mutex
80 fd uintptr
81 closing bool
82 everr bool
83 user uint32
84 rseq uintptr
85 rg uintptr
86 rt timer
87 rd int64
88 wseq uintptr
89 wg uintptr
90 wt timer
91 wd int64
92 self *pollDesc
93 }
94
95 type pollCache struct {
96 lock mutex
97 first *pollDesc
98
99
100
101
102
103 }
104
105 var (
106 netpollInitLock mutex
107 netpollInited uint32
108
109 pollcache pollCache
110 netpollWaiters uint32
111 )
112
113
114 func poll_runtime_pollServerInit() {
115 netpollGenericInit()
116 }
117
118 func netpollGenericInit() {
119 if atomic.Load(&netpollInited) == 0 {
120 lockInit(&netpollInitLock, lockRankNetpollInit)
121 lock(&netpollInitLock)
122 if netpollInited == 0 {
123 netpollinit()
124 atomic.Store(&netpollInited, 1)
125 }
126 unlock(&netpollInitLock)
127 }
128 }
129
130 func netpollinited() bool {
131 return atomic.Load(&netpollInited) != 0
132 }
133
134
135
136
137
138 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
139 return netpollIsPollDescriptor(fd)
140 }
141
142
143 func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
144 pd := pollcache.alloc()
145 lock(&pd.lock)
146 if pd.wg != 0 && pd.wg != pdReady {
147 throw("runtime: blocked write on free polldesc")
148 }
149 if pd.rg != 0 && pd.rg != pdReady {
150 throw("runtime: blocked read on free polldesc")
151 }
152 pd.fd = fd
153 pd.closing = false
154 pd.everr = false
155 pd.rseq++
156 pd.rg = 0
157 pd.rd = 0
158 pd.wseq++
159 pd.wg = 0
160 pd.wd = 0
161 pd.self = pd
162 unlock(&pd.lock)
163
164 var errno int32
165 errno = netpollopen(fd, pd)
166 return pd, int(errno)
167 }
168
169
170 func poll_runtime_pollClose(pd *pollDesc) {
171 if !pd.closing {
172 throw("runtime: close polldesc w/o unblock")
173 }
174 if pd.wg != 0 && pd.wg != pdReady {
175 throw("runtime: blocked write on closing polldesc")
176 }
177 if pd.rg != 0 && pd.rg != pdReady {
178 throw("runtime: blocked read on closing polldesc")
179 }
180 netpollclose(pd.fd)
181 pollcache.free(pd)
182 }
183
184 func (c *pollCache) free(pd *pollDesc) {
185 lock(&c.lock)
186 pd.link = c.first
187 c.first = pd
188 unlock(&c.lock)
189 }
190
191
192
193
194
195 func poll_runtime_pollReset(pd *pollDesc, mode int) int {
196 errcode := netpollcheckerr(pd, int32(mode))
197 if errcode != pollNoError {
198 return errcode
199 }
200 if mode == 'r' {
201 pd.rg = 0
202 } else if mode == 'w' {
203 pd.wg = 0
204 }
205 return pollNoError
206 }
207
208
209
210
211
212
213 func poll_runtime_pollWait(pd *pollDesc, mode int) int {
214 errcode := netpollcheckerr(pd, int32(mode))
215 if errcode != pollNoError {
216 return errcode
217 }
218
219 if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
220 netpollarm(pd, mode)
221 }
222 for !netpollblock(pd, int32(mode), false) {
223 errcode = netpollcheckerr(pd, int32(mode))
224 if errcode != pollNoError {
225 return errcode
226 }
227
228
229
230 }
231 return pollNoError
232 }
233
234
235 func poll_runtime_pollWaitCanceled(pd *pollDesc, mode int) {
236
237
238 for !netpollblock(pd, int32(mode), true) {
239 }
240 }
241
242
243 func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
244 lock(&pd.lock)
245 if pd.closing {
246 unlock(&pd.lock)
247 return
248 }
249 rd0, wd0 := pd.rd, pd.wd
250 combo0 := rd0 > 0 && rd0 == wd0
251 if d > 0 {
252 d += nanotime()
253 if d <= 0 {
254
255
256 d = 1<<63 - 1
257 }
258 }
259 if mode == 'r' || mode == 'r'+'w' {
260 pd.rd = d
261 }
262 if mode == 'w' || mode == 'r'+'w' {
263 pd.wd = d
264 }
265 combo := pd.rd > 0 && pd.rd == pd.wd
266 rtf := netpollReadDeadline
267 if combo {
268 rtf = netpollDeadline
269 }
270 if pd.rt.f == nil {
271 if pd.rd > 0 {
272 pd.rt.f = rtf
273
274
275
276 pd.rt.arg = pd.makeArg()
277 pd.rt.seq = pd.rseq
278 resettimer(&pd.rt, pd.rd)
279 }
280 } else if pd.rd != rd0 || combo != combo0 {
281 pd.rseq++
282 if pd.rd > 0 {
283 modtimer(&pd.rt, pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
284 } else {
285 deltimer(&pd.rt)
286 pd.rt.f = nil
287 }
288 }
289 if pd.wt.f == nil {
290 if pd.wd > 0 && !combo {
291 pd.wt.f = netpollWriteDeadline
292 pd.wt.arg = pd.makeArg()
293 pd.wt.seq = pd.wseq
294 resettimer(&pd.wt, pd.wd)
295 }
296 } else if pd.wd != wd0 || combo != combo0 {
297 pd.wseq++
298 if pd.wd > 0 && !combo {
299 modtimer(&pd.wt, pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
300 } else {
301 deltimer(&pd.wt)
302 pd.wt.f = nil
303 }
304 }
305
306 var rg, wg *g
307 if pd.rd < 0 || pd.wd < 0 {
308 atomic.StorepNoWB(noescape(unsafe.Pointer(&wg)), nil)
309 if pd.rd < 0 {
310 rg = netpollunblock(pd, 'r', false)
311 }
312 if pd.wd < 0 {
313 wg = netpollunblock(pd, 'w', false)
314 }
315 }
316 unlock(&pd.lock)
317 if rg != nil {
318 netpollgoready(rg, 3)
319 }
320 if wg != nil {
321 netpollgoready(wg, 3)
322 }
323 }
324
325
326 func poll_runtime_pollUnblock(pd *pollDesc) {
327 lock(&pd.lock)
328 if pd.closing {
329 throw("runtime: unblock on closing polldesc")
330 }
331 pd.closing = true
332 pd.rseq++
333 pd.wseq++
334 var rg, wg *g
335 atomic.StorepNoWB(noescape(unsafe.Pointer(&rg)), nil)
336 rg = netpollunblock(pd, 'r', false)
337 wg = netpollunblock(pd, 'w', false)
338 if pd.rt.f != nil {
339 deltimer(&pd.rt)
340 pd.rt.f = nil
341 }
342 if pd.wt.f != nil {
343 deltimer(&pd.wt)
344 pd.wt.f = nil
345 }
346 unlock(&pd.lock)
347 if rg != nil {
348 netpollgoready(rg, 3)
349 }
350 if wg != nil {
351 netpollgoready(wg, 3)
352 }
353 }
354
355
356
357
358
359
360
361
362
363 func netpollready(toRun *gList, pd *pollDesc, mode int32) {
364 var rg, wg *g
365 if mode == 'r' || mode == 'r'+'w' {
366 rg = netpollunblock(pd, 'r', true)
367 }
368 if mode == 'w' || mode == 'r'+'w' {
369 wg = netpollunblock(pd, 'w', true)
370 }
371 if rg != nil {
372 toRun.push(rg)
373 }
374 if wg != nil {
375 toRun.push(wg)
376 }
377 }
378
379 func netpollcheckerr(pd *pollDesc, mode int32) int {
380 if pd.closing {
381 return pollErrClosing
382 }
383 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
384 return pollErrTimeout
385 }
386
387
388
389 if mode == 'r' && pd.everr {
390 return pollErrNotPollable
391 }
392 return pollNoError
393 }
394
395 func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
396 r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
397 if r {
398
399
400
401 atomic.Xadd(&netpollWaiters, 1)
402 }
403 return r
404 }
405
406 func netpollgoready(gp *g, traceskip int) {
407 atomic.Xadd(&netpollWaiters, -1)
408 goready(gp, traceskip+1)
409 }
410
411
412
413 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
414 gpp := &pd.rg
415 if mode == 'w' {
416 gpp = &pd.wg
417 }
418
419
420 for {
421 old := *gpp
422 if old == pdReady {
423 *gpp = 0
424 return true
425 }
426 if old != 0 {
427 throw("runtime: double wait")
428 }
429 if atomic.Casuintptr(gpp, 0, pdWait) {
430 break
431 }
432 }
433
434
435
436
437 if waitio || netpollcheckerr(pd, mode) == 0 {
438 gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
439 }
440
441 old := atomic.Xchguintptr(gpp, 0)
442 if old > pdWait {
443 throw("runtime: corrupted polldesc")
444 }
445 return old == pdReady
446 }
447
448 func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
449 gpp := &pd.rg
450 if mode == 'w' {
451 gpp = &pd.wg
452 }
453
454 for {
455 old := *gpp
456 if old == pdReady {
457 return nil
458 }
459 if old == 0 && !ioready {
460
461
462 return nil
463 }
464 var new uintptr
465 if ioready {
466 new = pdReady
467 }
468 if atomic.Casuintptr(gpp, old, new) {
469 if old == pdWait {
470 old = 0
471 }
472 return (*g)(unsafe.Pointer(old))
473 }
474 }
475 }
476
477 func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
478 lock(&pd.lock)
479
480
481 currentSeq := pd.rseq
482 if !read {
483 currentSeq = pd.wseq
484 }
485 if seq != currentSeq {
486
487 unlock(&pd.lock)
488 return
489 }
490 var rg *g
491 if read {
492 if pd.rd <= 0 || pd.rt.f == nil {
493 throw("runtime: inconsistent read deadline")
494 }
495 pd.rd = -1
496 atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil)
497 rg = netpollunblock(pd, 'r', false)
498 }
499 var wg *g
500 if write {
501 if pd.wd <= 0 || pd.wt.f == nil && !read {
502 throw("runtime: inconsistent write deadline")
503 }
504 pd.wd = -1
505 atomic.StorepNoWB(unsafe.Pointer(&pd.wt.f), nil)
506 wg = netpollunblock(pd, 'w', false)
507 }
508 unlock(&pd.lock)
509 if rg != nil {
510 netpollgoready(rg, 0)
511 }
512 if wg != nil {
513 netpollgoready(wg, 0)
514 }
515 }
516
517 func netpollDeadline(arg interface{}, seq uintptr) {
518 netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
519 }
520
521 func netpollReadDeadline(arg interface{}, seq uintptr) {
522 netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
523 }
524
525 func netpollWriteDeadline(arg interface{}, seq uintptr) {
526 netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
527 }
528
529 func (c *pollCache) alloc() *pollDesc {
530 lock(&c.lock)
531 if c.first == nil {
532 const pdSize = unsafe.Sizeof(pollDesc{})
533 n := pollBlockSize / pdSize
534 if n == 0 {
535 n = 1
536 }
537
538
539 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
540 for i := uintptr(0); i < n; i++ {
541 pd := (*pollDesc)(add(mem, i*pdSize))
542 pd.link = c.first
543 c.first = pd
544 }
545 }
546 pd := c.first
547 c.first = pd.link
548 lockInit(&pd.lock, lockRankPollDesc)
549 unlock(&c.lock)
550 return pd
551 }
552
553
554
555
556
557
558 func (pd *pollDesc) makeArg() (i interface{}) {
559 x := (*eface)(unsafe.Pointer(&i))
560 x._type = pdType
561 x.data = unsafe.Pointer(&pd.self)
562 return
563 }
564
565 var (
566 pdEface interface{} = (*pollDesc)(nil)
567 pdType *_type = efaceOf(&pdEface)._type
568 )
569
View as plain text