1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package sql
17
18 import (
19 "context"
20 "database/sql/driver"
21 "errors"
22 "fmt"
23 "io"
24 "reflect"
25 "runtime"
26 "sort"
27 "strconv"
28 "sync"
29 "sync/atomic"
30 "time"
31 )
32
33 var (
34 driversMu sync.RWMutex
35 drivers = make(map[string]driver.Driver)
36 )
37
38
39 var nowFunc = time.Now
40
41
42
43
44 func Register(name string, driver driver.Driver) {
45 driversMu.Lock()
46 defer driversMu.Unlock()
47 if driver == nil {
48 panic("sql: Register driver is nil")
49 }
50 if _, dup := drivers[name]; dup {
51 panic("sql: Register called twice for driver " + name)
52 }
53 drivers[name] = driver
54 }
55
56 func unregisterAllDrivers() {
57 driversMu.Lock()
58 defer driversMu.Unlock()
59
60 drivers = make(map[string]driver.Driver)
61 }
62
63
64 func Drivers() []string {
65 driversMu.RLock()
66 defer driversMu.RUnlock()
67 list := make([]string, 0, len(drivers))
68 for name := range drivers {
69 list = append(list, name)
70 }
71 sort.Strings(list)
72 return list
73 }
74
75
76
77
78
79
80
81 type NamedArg struct {
82 _Named_Fields_Required struct{}
83
84
85
86
87
88
89
90 Name string
91
92
93
94
95 Value interface{}
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110 func Named(name string, value interface{}) NamedArg {
111
112
113
114
115 return NamedArg{Name: name, Value: value}
116 }
117
118
119 type IsolationLevel int
120
121
122
123
124
125 const (
126 LevelDefault IsolationLevel = iota
127 LevelReadUncommitted
128 LevelReadCommitted
129 LevelWriteCommitted
130 LevelRepeatableRead
131 LevelSnapshot
132 LevelSerializable
133 LevelLinearizable
134 )
135
136
137 func (i IsolationLevel) String() string {
138 switch i {
139 case LevelDefault:
140 return "Default"
141 case LevelReadUncommitted:
142 return "Read Uncommitted"
143 case LevelReadCommitted:
144 return "Read Committed"
145 case LevelWriteCommitted:
146 return "Write Committed"
147 case LevelRepeatableRead:
148 return "Repeatable Read"
149 case LevelSnapshot:
150 return "Snapshot"
151 case LevelSerializable:
152 return "Serializable"
153 case LevelLinearizable:
154 return "Linearizable"
155 default:
156 return "IsolationLevel(" + strconv.Itoa(int(i)) + ")"
157 }
158 }
159
160 var _ fmt.Stringer = LevelDefault
161
162
163 type TxOptions struct {
164
165
166 Isolation IsolationLevel
167 ReadOnly bool
168 }
169
170
171
172
173 type RawBytes []byte
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 type NullString struct {
189 String string
190 Valid bool
191 }
192
193
194 func (ns *NullString) Scan(value interface{}) error {
195 if value == nil {
196 ns.String, ns.Valid = "", false
197 return nil
198 }
199 ns.Valid = true
200 return convertAssign(&ns.String, value)
201 }
202
203
204 func (ns NullString) Value() (driver.Value, error) {
205 if !ns.Valid {
206 return nil, nil
207 }
208 return ns.String, nil
209 }
210
211
212
213
214 type NullInt64 struct {
215 Int64 int64
216 Valid bool
217 }
218
219
220 func (n *NullInt64) Scan(value interface{}) error {
221 if value == nil {
222 n.Int64, n.Valid = 0, false
223 return nil
224 }
225 n.Valid = true
226 return convertAssign(&n.Int64, value)
227 }
228
229
230 func (n NullInt64) Value() (driver.Value, error) {
231 if !n.Valid {
232 return nil, nil
233 }
234 return n.Int64, nil
235 }
236
237
238
239
240 type NullInt32 struct {
241 Int32 int32
242 Valid bool
243 }
244
245
246 func (n *NullInt32) Scan(value interface{}) error {
247 if value == nil {
248 n.Int32, n.Valid = 0, false
249 return nil
250 }
251 n.Valid = true
252 return convertAssign(&n.Int32, value)
253 }
254
255
256 func (n NullInt32) Value() (driver.Value, error) {
257 if !n.Valid {
258 return nil, nil
259 }
260 return int64(n.Int32), nil
261 }
262
263
264
265
266 type NullFloat64 struct {
267 Float64 float64
268 Valid bool
269 }
270
271
272 func (n *NullFloat64) Scan(value interface{}) error {
273 if value == nil {
274 n.Float64, n.Valid = 0, false
275 return nil
276 }
277 n.Valid = true
278 return convertAssign(&n.Float64, value)
279 }
280
281
282 func (n NullFloat64) Value() (driver.Value, error) {
283 if !n.Valid {
284 return nil, nil
285 }
286 return n.Float64, nil
287 }
288
289
290
291
292 type NullBool struct {
293 Bool bool
294 Valid bool
295 }
296
297
298 func (n *NullBool) Scan(value interface{}) error {
299 if value == nil {
300 n.Bool, n.Valid = false, false
301 return nil
302 }
303 n.Valid = true
304 return convertAssign(&n.Bool, value)
305 }
306
307
308 func (n NullBool) Value() (driver.Value, error) {
309 if !n.Valid {
310 return nil, nil
311 }
312 return n.Bool, nil
313 }
314
315
316
317
318 type NullTime struct {
319 Time time.Time
320 Valid bool
321 }
322
323
324 func (n *NullTime) Scan(value interface{}) error {
325 if value == nil {
326 n.Time, n.Valid = time.Time{}, false
327 return nil
328 }
329 n.Valid = true
330 return convertAssign(&n.Time, value)
331 }
332
333
334 func (n NullTime) Value() (driver.Value, error) {
335 if !n.Valid {
336 return nil, nil
337 }
338 return n.Time, nil
339 }
340
341
342 type Scanner interface {
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361 Scan(src interface{}) error
362 }
363
364
365
366
367
368
369
370
371
372 type Out struct {
373 _Named_Fields_Required struct{}
374
375
376
377 Dest interface{}
378
379
380
381
382 In bool
383 }
384
385
386
387
388 var ErrNoRows = errors.New("sql: no rows in result set")
389
390
391
392
393
394
395
396
397
398
399
400
401
402 type DB struct {
403
404
405 waitDuration int64
406
407 connector driver.Connector
408
409
410
411 numClosed uint64
412
413 mu sync.Mutex
414 freeConn []*driverConn
415 connRequests map[uint64]chan connRequest
416 nextRequest uint64
417 numOpen int
418
419
420
421
422
423 openerCh chan struct{}
424 closed bool
425 dep map[finalCloser]depSet
426 lastPut map[*driverConn]string
427 maxIdleCount int
428 maxOpen int
429 maxLifetime time.Duration
430 maxIdleTime time.Duration
431 cleanerCh chan struct{}
432 waitCount int64
433 maxIdleClosed int64
434 maxIdleTimeClosed int64
435 maxLifetimeClosed int64
436
437 stop func()
438 }
439
440
441 type connReuseStrategy uint8
442
443 const (
444
445 alwaysNewConn connReuseStrategy = iota
446
447
448
449 cachedOrNewConn
450 )
451
452
453
454
455
456 type driverConn struct {
457 db *DB
458 createdAt time.Time
459
460 sync.Mutex
461 ci driver.Conn
462 needReset bool
463 closed bool
464 finalClosed bool
465 openStmt map[*driverStmt]bool
466
467
468 inUse bool
469 returnedAt time.Time
470 onPut []func()
471 dbmuClosed bool
472 }
473
474 func (dc *driverConn) releaseConn(err error) {
475 dc.db.putConn(dc, err, true)
476 }
477
478 func (dc *driverConn) removeOpenStmt(ds *driverStmt) {
479 dc.Lock()
480 defer dc.Unlock()
481 delete(dc.openStmt, ds)
482 }
483
484 func (dc *driverConn) expired(timeout time.Duration) bool {
485 if timeout <= 0 {
486 return false
487 }
488 return dc.createdAt.Add(timeout).Before(nowFunc())
489 }
490
491
492
493 func (dc *driverConn) resetSession(ctx context.Context) error {
494 dc.Lock()
495 defer dc.Unlock()
496
497 if !dc.needReset {
498 return nil
499 }
500 if cr, ok := dc.ci.(driver.SessionResetter); ok {
501 return cr.ResetSession(ctx)
502 }
503 return nil
504 }
505
506
507
508 func (dc *driverConn) validateConnection(needsReset bool) bool {
509 dc.Lock()
510 defer dc.Unlock()
511
512 if needsReset {
513 dc.needReset = true
514 }
515 if cv, ok := dc.ci.(driver.Validator); ok {
516 return cv.IsValid()
517 }
518 return true
519 }
520
521
522
523 func (dc *driverConn) prepareLocked(ctx context.Context, cg stmtConnGrabber, query string) (*driverStmt, error) {
524 si, err := ctxDriverPrepare(ctx, dc.ci, query)
525 if err != nil {
526 return nil, err
527 }
528 ds := &driverStmt{Locker: dc, si: si}
529
530
531 if cg != nil {
532 return ds, nil
533 }
534
535
536
537
538
539 if dc.openStmt == nil {
540 dc.openStmt = make(map[*driverStmt]bool)
541 }
542 dc.openStmt[ds] = true
543 return ds, nil
544 }
545
546
547 func (dc *driverConn) closeDBLocked() func() error {
548 dc.Lock()
549 defer dc.Unlock()
550 if dc.closed {
551 return func() error { return errors.New("sql: duplicate driverConn close") }
552 }
553 dc.closed = true
554 return dc.db.removeDepLocked(dc, dc)
555 }
556
557 func (dc *driverConn) Close() error {
558 dc.Lock()
559 if dc.closed {
560 dc.Unlock()
561 return errors.New("sql: duplicate driverConn close")
562 }
563 dc.closed = true
564 dc.Unlock()
565
566
567 dc.db.mu.Lock()
568 dc.dbmuClosed = true
569 fn := dc.db.removeDepLocked(dc, dc)
570 dc.db.mu.Unlock()
571 return fn()
572 }
573
574 func (dc *driverConn) finalClose() error {
575 var err error
576
577
578
579 var openStmt []*driverStmt
580 withLock(dc, func() {
581 openStmt = make([]*driverStmt, 0, len(dc.openStmt))
582 for ds := range dc.openStmt {
583 openStmt = append(openStmt, ds)
584 }
585 dc.openStmt = nil
586 })
587 for _, ds := range openStmt {
588 ds.Close()
589 }
590 withLock(dc, func() {
591 dc.finalClosed = true
592 err = dc.ci.Close()
593 dc.ci = nil
594 })
595
596 dc.db.mu.Lock()
597 dc.db.numOpen--
598 dc.db.maybeOpenNewConnections()
599 dc.db.mu.Unlock()
600
601 atomic.AddUint64(&dc.db.numClosed, 1)
602 return err
603 }
604
605
606
607
608 type driverStmt struct {
609 sync.Locker
610 si driver.Stmt
611 closed bool
612 closeErr error
613 }
614
615
616
617 func (ds *driverStmt) Close() error {
618 ds.Lock()
619 defer ds.Unlock()
620 if ds.closed {
621 return ds.closeErr
622 }
623 ds.closed = true
624 ds.closeErr = ds.si.Close()
625 return ds.closeErr
626 }
627
628
629 type depSet map[interface{}]bool
630
631
632
633 type finalCloser interface {
634
635
636 finalClose() error
637 }
638
639
640
641 func (db *DB) addDep(x finalCloser, dep interface{}) {
642 db.mu.Lock()
643 defer db.mu.Unlock()
644 db.addDepLocked(x, dep)
645 }
646
647 func (db *DB) addDepLocked(x finalCloser, dep interface{}) {
648 if db.dep == nil {
649 db.dep = make(map[finalCloser]depSet)
650 }
651 xdep := db.dep[x]
652 if xdep == nil {
653 xdep = make(depSet)
654 db.dep[x] = xdep
655 }
656 xdep[dep] = true
657 }
658
659
660
661
662
663 func (db *DB) removeDep(x finalCloser, dep interface{}) error {
664 db.mu.Lock()
665 fn := db.removeDepLocked(x, dep)
666 db.mu.Unlock()
667 return fn()
668 }
669
670 func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
671
672 xdep, ok := db.dep[x]
673 if !ok {
674 panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
675 }
676
677 l0 := len(xdep)
678 delete(xdep, dep)
679
680 switch len(xdep) {
681 case l0:
682
683 panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
684 case 0:
685
686 delete(db.dep, x)
687 return x.finalClose
688 default:
689
690 return func() error { return nil }
691 }
692 }
693
694
695
696
697
698
699 var connectionRequestQueueSize = 1000000
700
701 type dsnConnector struct {
702 dsn string
703 driver driver.Driver
704 }
705
706 func (t dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
707 return t.driver.Open(t.dsn)
708 }
709
710 func (t dsnConnector) Driver() driver.Driver {
711 return t.driver
712 }
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730 func OpenDB(c driver.Connector) *DB {
731 ctx, cancel := context.WithCancel(context.Background())
732 db := &DB{
733 connector: c,
734 openerCh: make(chan struct{}, connectionRequestQueueSize),
735 lastPut: make(map[*driverConn]string),
736 connRequests: make(map[uint64]chan connRequest),
737 stop: cancel,
738 }
739
740 go db.connectionOpener(ctx)
741
742 return db
743 }
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762 func Open(driverName, dataSourceName string) (*DB, error) {
763 driversMu.RLock()
764 driveri, ok := drivers[driverName]
765 driversMu.RUnlock()
766 if !ok {
767 return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
768 }
769
770 if driverCtx, ok := driveri.(driver.DriverContext); ok {
771 connector, err := driverCtx.OpenConnector(dataSourceName)
772 if err != nil {
773 return nil, err
774 }
775 return OpenDB(connector), nil
776 }
777
778 return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
779 }
780
781 func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
782 var err error
783 if pinger, ok := dc.ci.(driver.Pinger); ok {
784 withLock(dc, func() {
785 err = pinger.Ping(ctx)
786 })
787 }
788 release(err)
789 return err
790 }
791
792
793
794 func (db *DB) PingContext(ctx context.Context) error {
795 var dc *driverConn
796 var err error
797
798 for i := 0; i < maxBadConnRetries; i++ {
799 dc, err = db.conn(ctx, cachedOrNewConn)
800 if err != driver.ErrBadConn {
801 break
802 }
803 }
804 if err == driver.ErrBadConn {
805 dc, err = db.conn(ctx, alwaysNewConn)
806 }
807 if err != nil {
808 return err
809 }
810
811 return db.pingDC(ctx, dc, dc.releaseConn)
812 }
813
814
815
816 func (db *DB) Ping() error {
817 return db.PingContext(context.Background())
818 }
819
820
821
822
823
824
825
826 func (db *DB) Close() error {
827 db.mu.Lock()
828 if db.closed {
829 db.mu.Unlock()
830 return nil
831 }
832 if db.cleanerCh != nil {
833 close(db.cleanerCh)
834 }
835 var err error
836 fns := make([]func() error, 0, len(db.freeConn))
837 for _, dc := range db.freeConn {
838 fns = append(fns, dc.closeDBLocked())
839 }
840 db.freeConn = nil
841 db.closed = true
842 for _, req := range db.connRequests {
843 close(req)
844 }
845 db.mu.Unlock()
846 for _, fn := range fns {
847 err1 := fn()
848 if err1 != nil {
849 err = err1
850 }
851 }
852 db.stop()
853 return err
854 }
855
856 const defaultMaxIdleConns = 2
857
858 func (db *DB) maxIdleConnsLocked() int {
859 n := db.maxIdleCount
860 switch {
861 case n == 0:
862
863 return defaultMaxIdleConns
864 case n < 0:
865 return 0
866 default:
867 return n
868 }
869 }
870
871 func (db *DB) shortestIdleTimeLocked() time.Duration {
872 if db.maxIdleTime <= 0 {
873 return db.maxLifetime
874 }
875 if db.maxLifetime <= 0 {
876 return db.maxIdleTime
877 }
878
879 min := db.maxIdleTime
880 if min > db.maxLifetime {
881 min = db.maxLifetime
882 }
883 return min
884 }
885
886
887
888
889
890
891
892
893
894
895
896 func (db *DB) SetMaxIdleConns(n int) {
897 db.mu.Lock()
898 if n > 0 {
899 db.maxIdleCount = n
900 } else {
901
902 db.maxIdleCount = -1
903 }
904
905 if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
906 db.maxIdleCount = db.maxOpen
907 }
908 var closing []*driverConn
909 idleCount := len(db.freeConn)
910 maxIdle := db.maxIdleConnsLocked()
911 if idleCount > maxIdle {
912 closing = db.freeConn[maxIdle:]
913 db.freeConn = db.freeConn[:maxIdle]
914 }
915 db.maxIdleClosed += int64(len(closing))
916 db.mu.Unlock()
917 for _, c := range closing {
918 c.Close()
919 }
920 }
921
922
923
924
925
926
927
928
929
930 func (db *DB) SetMaxOpenConns(n int) {
931 db.mu.Lock()
932 db.maxOpen = n
933 if n < 0 {
934 db.maxOpen = 0
935 }
936 syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
937 db.mu.Unlock()
938 if syncMaxIdle {
939 db.SetMaxIdleConns(n)
940 }
941 }
942
943
944
945
946
947
948 func (db *DB) SetConnMaxLifetime(d time.Duration) {
949 if d < 0 {
950 d = 0
951 }
952 db.mu.Lock()
953
954 if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
955 select {
956 case db.cleanerCh <- struct{}{}:
957 default:
958 }
959 }
960 db.maxLifetime = d
961 db.startCleanerLocked()
962 db.mu.Unlock()
963 }
964
965
966
967
968
969
970 func (db *DB) SetConnMaxIdleTime(d time.Duration) {
971 if d < 0 {
972 d = 0
973 }
974 db.mu.Lock()
975 defer db.mu.Unlock()
976
977
978 if d > 0 && d < db.maxIdleTime && db.cleanerCh != nil {
979 select {
980 case db.cleanerCh <- struct{}{}:
981 default:
982 }
983 }
984 db.maxIdleTime = d
985 db.startCleanerLocked()
986 }
987
988
989 func (db *DB) startCleanerLocked() {
990 if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
991 db.cleanerCh = make(chan struct{}, 1)
992 go db.connectionCleaner(db.shortestIdleTimeLocked())
993 }
994 }
995
996 func (db *DB) connectionCleaner(d time.Duration) {
997 const minInterval = time.Second
998
999 if d < minInterval {
1000 d = minInterval
1001 }
1002 t := time.NewTimer(d)
1003
1004 for {
1005 select {
1006 case <-t.C:
1007 case <-db.cleanerCh:
1008 }
1009
1010 db.mu.Lock()
1011
1012 d = db.shortestIdleTimeLocked()
1013 if db.closed || db.numOpen == 0 || d <= 0 {
1014 db.cleanerCh = nil
1015 db.mu.Unlock()
1016 return
1017 }
1018
1019 closing := db.connectionCleanerRunLocked()
1020 db.mu.Unlock()
1021 for _, c := range closing {
1022 c.Close()
1023 }
1024
1025 if d < minInterval {
1026 d = minInterval
1027 }
1028 t.Reset(d)
1029 }
1030 }
1031
1032 func (db *DB) connectionCleanerRunLocked() (closing []*driverConn) {
1033 if db.maxLifetime > 0 {
1034 expiredSince := nowFunc().Add(-db.maxLifetime)
1035 for i := 0; i < len(db.freeConn); i++ {
1036 c := db.freeConn[i]
1037 if c.createdAt.Before(expiredSince) {
1038 closing = append(closing, c)
1039 last := len(db.freeConn) - 1
1040 db.freeConn[i] = db.freeConn[last]
1041 db.freeConn[last] = nil
1042 db.freeConn = db.freeConn[:last]
1043 i--
1044 }
1045 }
1046 db.maxLifetimeClosed += int64(len(closing))
1047 }
1048
1049 if db.maxIdleTime > 0 {
1050 expiredSince := nowFunc().Add(-db.maxIdleTime)
1051 var expiredCount int64
1052 for i := 0; i < len(db.freeConn); i++ {
1053 c := db.freeConn[i]
1054 if db.maxIdleTime > 0 && c.returnedAt.Before(expiredSince) {
1055 closing = append(closing, c)
1056 expiredCount++
1057 last := len(db.freeConn) - 1
1058 db.freeConn[i] = db.freeConn[last]
1059 db.freeConn[last] = nil
1060 db.freeConn = db.freeConn[:last]
1061 i--
1062 }
1063 }
1064 db.maxIdleTimeClosed += expiredCount
1065 }
1066 return
1067 }
1068
1069
1070 type DBStats struct {
1071 MaxOpenConnections int
1072
1073
1074 OpenConnections int
1075 InUse int
1076 Idle int
1077
1078
1079 WaitCount int64
1080 WaitDuration time.Duration
1081 MaxIdleClosed int64
1082 MaxIdleTimeClosed int64
1083 MaxLifetimeClosed int64
1084 }
1085
1086
1087 func (db *DB) Stats() DBStats {
1088 wait := atomic.LoadInt64(&db.waitDuration)
1089
1090 db.mu.Lock()
1091 defer db.mu.Unlock()
1092
1093 stats := DBStats{
1094 MaxOpenConnections: db.maxOpen,
1095
1096 Idle: len(db.freeConn),
1097 OpenConnections: db.numOpen,
1098 InUse: db.numOpen - len(db.freeConn),
1099
1100 WaitCount: db.waitCount,
1101 WaitDuration: time.Duration(wait),
1102 MaxIdleClosed: db.maxIdleClosed,
1103 MaxIdleTimeClosed: db.maxIdleTimeClosed,
1104 MaxLifetimeClosed: db.maxLifetimeClosed,
1105 }
1106 return stats
1107 }
1108
1109
1110
1111
1112 func (db *DB) maybeOpenNewConnections() {
1113 numRequests := len(db.connRequests)
1114 if db.maxOpen > 0 {
1115 numCanOpen := db.maxOpen - db.numOpen
1116 if numRequests > numCanOpen {
1117 numRequests = numCanOpen
1118 }
1119 }
1120 for numRequests > 0 {
1121 db.numOpen++
1122 numRequests--
1123 if db.closed {
1124 return
1125 }
1126 db.openerCh <- struct{}{}
1127 }
1128 }
1129
1130
1131 func (db *DB) connectionOpener(ctx context.Context) {
1132 for {
1133 select {
1134 case <-ctx.Done():
1135 return
1136 case <-db.openerCh:
1137 db.openNewConnection(ctx)
1138 }
1139 }
1140 }
1141
1142
1143 func (db *DB) openNewConnection(ctx context.Context) {
1144
1145
1146
1147 ci, err := db.connector.Connect(ctx)
1148 db.mu.Lock()
1149 defer db.mu.Unlock()
1150 if db.closed {
1151 if err == nil {
1152 ci.Close()
1153 }
1154 db.numOpen--
1155 return
1156 }
1157 if err != nil {
1158 db.numOpen--
1159 db.putConnDBLocked(nil, err)
1160 db.maybeOpenNewConnections()
1161 return
1162 }
1163 dc := &driverConn{
1164 db: db,
1165 createdAt: nowFunc(),
1166 returnedAt: nowFunc(),
1167 ci: ci,
1168 }
1169 if db.putConnDBLocked(dc, err) {
1170 db.addDepLocked(dc, dc)
1171 } else {
1172 db.numOpen--
1173 ci.Close()
1174 }
1175 }
1176
1177
1178
1179
1180 type connRequest struct {
1181 conn *driverConn
1182 err error
1183 }
1184
1185 var errDBClosed = errors.New("sql: database is closed")
1186
1187
1188
1189 func (db *DB) nextRequestKeyLocked() uint64 {
1190 next := db.nextRequest
1191 db.nextRequest++
1192 return next
1193 }
1194
1195
1196 func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
1197 db.mu.Lock()
1198 if db.closed {
1199 db.mu.Unlock()
1200 return nil, errDBClosed
1201 }
1202
1203 select {
1204 default:
1205 case <-ctx.Done():
1206 db.mu.Unlock()
1207 return nil, ctx.Err()
1208 }
1209 lifetime := db.maxLifetime
1210
1211
1212 numFree := len(db.freeConn)
1213 if strategy == cachedOrNewConn && numFree > 0 {
1214 conn := db.freeConn[0]
1215 copy(db.freeConn, db.freeConn[1:])
1216 db.freeConn = db.freeConn[:numFree-1]
1217 conn.inUse = true
1218 if conn.expired(lifetime) {
1219 db.maxLifetimeClosed++
1220 db.mu.Unlock()
1221 conn.Close()
1222 return nil, driver.ErrBadConn
1223 }
1224 db.mu.Unlock()
1225
1226
1227 if err := conn.resetSession(ctx); err == driver.ErrBadConn {
1228 conn.Close()
1229 return nil, driver.ErrBadConn
1230 }
1231
1232 return conn, nil
1233 }
1234
1235
1236
1237 if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
1238
1239
1240 req := make(chan connRequest, 1)
1241 reqKey := db.nextRequestKeyLocked()
1242 db.connRequests[reqKey] = req
1243 db.waitCount++
1244 db.mu.Unlock()
1245
1246 waitStart := nowFunc()
1247
1248
1249 select {
1250 case <-ctx.Done():
1251
1252
1253 db.mu.Lock()
1254 delete(db.connRequests, reqKey)
1255 db.mu.Unlock()
1256
1257 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
1258
1259 select {
1260 default:
1261 case ret, ok := <-req:
1262 if ok && ret.conn != nil {
1263 db.putConn(ret.conn, ret.err, false)
1264 }
1265 }
1266 return nil, ctx.Err()
1267 case ret, ok := <-req:
1268 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
1269
1270 if !ok {
1271 return nil, errDBClosed
1272 }
1273
1274
1275
1276
1277
1278
1279 if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
1280 db.mu.Lock()
1281 db.maxLifetimeClosed++
1282 db.mu.Unlock()
1283 ret.conn.Close()
1284 return nil, driver.ErrBadConn
1285 }
1286 if ret.conn == nil {
1287 return nil, ret.err
1288 }
1289
1290
1291 if err := ret.conn.resetSession(ctx); err == driver.ErrBadConn {
1292 ret.conn.Close()
1293 return nil, driver.ErrBadConn
1294 }
1295 return ret.conn, ret.err
1296 }
1297 }
1298
1299 db.numOpen++
1300 db.mu.Unlock()
1301 ci, err := db.connector.Connect(ctx)
1302 if err != nil {
1303 db.mu.Lock()
1304 db.numOpen--
1305 db.maybeOpenNewConnections()
1306 db.mu.Unlock()
1307 return nil, err
1308 }
1309 db.mu.Lock()
1310 dc := &driverConn{
1311 db: db,
1312 createdAt: nowFunc(),
1313 returnedAt: nowFunc(),
1314 ci: ci,
1315 inUse: true,
1316 }
1317 db.addDepLocked(dc, dc)
1318 db.mu.Unlock()
1319 return dc, nil
1320 }
1321
1322
1323 var putConnHook func(*DB, *driverConn)
1324
1325
1326
1327
1328 func (db *DB) noteUnusedDriverStatement(c *driverConn, ds *driverStmt) {
1329 db.mu.Lock()
1330 defer db.mu.Unlock()
1331 if c.inUse {
1332 c.onPut = append(c.onPut, func() {
1333 ds.Close()
1334 })
1335 } else {
1336 c.Lock()
1337 fc := c.finalClosed
1338 c.Unlock()
1339 if !fc {
1340 ds.Close()
1341 }
1342 }
1343 }
1344
1345
1346
1347 const debugGetPut = false
1348
1349
1350
1351 func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
1352 if err != driver.ErrBadConn {
1353 if !dc.validateConnection(resetSession) {
1354 err = driver.ErrBadConn
1355 }
1356 }
1357 db.mu.Lock()
1358 if !dc.inUse {
1359 db.mu.Unlock()
1360 if debugGetPut {
1361 fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
1362 }
1363 panic("sql: connection returned that was never out")
1364 }
1365
1366 if err != driver.ErrBadConn && dc.expired(db.maxLifetime) {
1367 db.maxLifetimeClosed++
1368 err = driver.ErrBadConn
1369 }
1370 if debugGetPut {
1371 db.lastPut[dc] = stack()
1372 }
1373 dc.inUse = false
1374 dc.returnedAt = nowFunc()
1375
1376 for _, fn := range dc.onPut {
1377 fn()
1378 }
1379 dc.onPut = nil
1380
1381 if err == driver.ErrBadConn {
1382
1383
1384
1385
1386 db.maybeOpenNewConnections()
1387 db.mu.Unlock()
1388 dc.Close()
1389 return
1390 }
1391 if putConnHook != nil {
1392 putConnHook(db, dc)
1393 }
1394 added := db.putConnDBLocked(dc, nil)
1395 db.mu.Unlock()
1396
1397 if !added {
1398 dc.Close()
1399 return
1400 }
1401 }
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412 func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
1413 if db.closed {
1414 return false
1415 }
1416 if db.maxOpen > 0 && db.numOpen > db.maxOpen {
1417 return false
1418 }
1419 if c := len(db.connRequests); c > 0 {
1420 var req chan connRequest
1421 var reqKey uint64
1422 for reqKey, req = range db.connRequests {
1423 break
1424 }
1425 delete(db.connRequests, reqKey)
1426 if err == nil {
1427 dc.inUse = true
1428 }
1429 req <- connRequest{
1430 conn: dc,
1431 err: err,
1432 }
1433 return true
1434 } else if err == nil && !db.closed {
1435 if db.maxIdleConnsLocked() > len(db.freeConn) {
1436 db.freeConn = append(db.freeConn, dc)
1437 db.startCleanerLocked()
1438 return true
1439 }
1440 db.maxIdleClosed++
1441 }
1442 return false
1443 }
1444
1445
1446
1447
1448 const maxBadConnRetries = 2
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458 func (db *DB) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1459 var stmt *Stmt
1460 var err error
1461 for i := 0; i < maxBadConnRetries; i++ {
1462 stmt, err = db.prepare(ctx, query, cachedOrNewConn)
1463 if err != driver.ErrBadConn {
1464 break
1465 }
1466 }
1467 if err == driver.ErrBadConn {
1468 return db.prepare(ctx, query, alwaysNewConn)
1469 }
1470 return stmt, err
1471 }
1472
1473
1474
1475
1476
1477
1478 func (db *DB) Prepare(query string) (*Stmt, error) {
1479 return db.PrepareContext(context.Background(), query)
1480 }
1481
1482 func (db *DB) prepare(ctx context.Context, query string, strategy connReuseStrategy) (*Stmt, error) {
1483
1484
1485
1486
1487
1488
1489 dc, err := db.conn(ctx, strategy)
1490 if err != nil {
1491 return nil, err
1492 }
1493 return db.prepareDC(ctx, dc, dc.releaseConn, nil, query)
1494 }
1495
1496
1497
1498
1499 func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
1500 var ds *driverStmt
1501 var err error
1502 defer func() {
1503 release(err)
1504 }()
1505 withLock(dc, func() {
1506 ds, err = dc.prepareLocked(ctx, cg, query)
1507 })
1508 if err != nil {
1509 return nil, err
1510 }
1511 stmt := &Stmt{
1512 db: db,
1513 query: query,
1514 cg: cg,
1515 cgds: ds,
1516 }
1517
1518
1519
1520
1521 if cg == nil {
1522 stmt.css = []connStmt{{dc, ds}}
1523 stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
1524 db.addDep(stmt, stmt)
1525 }
1526 return stmt, nil
1527 }
1528
1529
1530
1531 func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
1532 var res Result
1533 var err error
1534 for i := 0; i < maxBadConnRetries; i++ {
1535 res, err = db.exec(ctx, query, args, cachedOrNewConn)
1536 if err != driver.ErrBadConn {
1537 break
1538 }
1539 }
1540 if err == driver.ErrBadConn {
1541 return db.exec(ctx, query, args, alwaysNewConn)
1542 }
1543 return res, err
1544 }
1545
1546
1547
1548 func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
1549 return db.ExecContext(context.Background(), query, args...)
1550 }
1551
1552 func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
1553 dc, err := db.conn(ctx, strategy)
1554 if err != nil {
1555 return nil, err
1556 }
1557 return db.execDC(ctx, dc, dc.releaseConn, query, args)
1558 }
1559
1560 func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
1561 defer func() {
1562 release(err)
1563 }()
1564 execerCtx, ok := dc.ci.(driver.ExecerContext)
1565 var execer driver.Execer
1566 if !ok {
1567 execer, ok = dc.ci.(driver.Execer)
1568 }
1569 if ok {
1570 var nvdargs []driver.NamedValue
1571 var resi driver.Result
1572 withLock(dc, func() {
1573 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1574 if err != nil {
1575 return
1576 }
1577 resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
1578 })
1579 if err != driver.ErrSkip {
1580 if err != nil {
1581 return nil, err
1582 }
1583 return driverResult{dc, resi}, nil
1584 }
1585 }
1586
1587 var si driver.Stmt
1588 withLock(dc, func() {
1589 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1590 })
1591 if err != nil {
1592 return nil, err
1593 }
1594 ds := &driverStmt{Locker: dc, si: si}
1595 defer ds.Close()
1596 return resultFromStatement(ctx, dc.ci, ds, args...)
1597 }
1598
1599
1600
1601 func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
1602 var rows *Rows
1603 var err error
1604 for i := 0; i < maxBadConnRetries; i++ {
1605 rows, err = db.query(ctx, query, args, cachedOrNewConn)
1606 if err != driver.ErrBadConn {
1607 break
1608 }
1609 }
1610 if err == driver.ErrBadConn {
1611 return db.query(ctx, query, args, alwaysNewConn)
1612 }
1613 return rows, err
1614 }
1615
1616
1617
1618 func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
1619 return db.QueryContext(context.Background(), query, args...)
1620 }
1621
1622 func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
1623 dc, err := db.conn(ctx, strategy)
1624 if err != nil {
1625 return nil, err
1626 }
1627
1628 return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
1629 }
1630
1631
1632
1633
1634
1635 func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
1636 queryerCtx, ok := dc.ci.(driver.QueryerContext)
1637 var queryer driver.Queryer
1638 if !ok {
1639 queryer, ok = dc.ci.(driver.Queryer)
1640 }
1641 if ok {
1642 var nvdargs []driver.NamedValue
1643 var rowsi driver.Rows
1644 var err error
1645 withLock(dc, func() {
1646 nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
1647 if err != nil {
1648 return
1649 }
1650 rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs)
1651 })
1652 if err != driver.ErrSkip {
1653 if err != nil {
1654 releaseConn(err)
1655 return nil, err
1656 }
1657
1658
1659 rows := &Rows{
1660 dc: dc,
1661 releaseConn: releaseConn,
1662 rowsi: rowsi,
1663 }
1664 rows.initContextClose(ctx, txctx)
1665 return rows, nil
1666 }
1667 }
1668
1669 var si driver.Stmt
1670 var err error
1671 withLock(dc, func() {
1672 si, err = ctxDriverPrepare(ctx, dc.ci, query)
1673 })
1674 if err != nil {
1675 releaseConn(err)
1676 return nil, err
1677 }
1678
1679 ds := &driverStmt{Locker: dc, si: si}
1680 rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...)
1681 if err != nil {
1682 ds.Close()
1683 releaseConn(err)
1684 return nil, err
1685 }
1686
1687
1688
1689 rows := &Rows{
1690 dc: dc,
1691 releaseConn: releaseConn,
1692 rowsi: rowsi,
1693 closeStmt: ds,
1694 }
1695 rows.initContextClose(ctx, txctx)
1696 return rows, nil
1697 }
1698
1699
1700
1701
1702
1703
1704
1705 func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
1706 rows, err := db.QueryContext(ctx, query, args...)
1707 return &Row{rows: rows, err: err}
1708 }
1709
1710
1711
1712
1713
1714
1715
1716 func (db *DB) QueryRow(query string, args ...interface{}) *Row {
1717 return db.QueryRowContext(context.Background(), query, args...)
1718 }
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730 func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1731 var tx *Tx
1732 var err error
1733 for i := 0; i < maxBadConnRetries; i++ {
1734 tx, err = db.begin(ctx, opts, cachedOrNewConn)
1735 if err != driver.ErrBadConn {
1736 break
1737 }
1738 }
1739 if err == driver.ErrBadConn {
1740 return db.begin(ctx, opts, alwaysNewConn)
1741 }
1742 return tx, err
1743 }
1744
1745
1746
1747 func (db *DB) Begin() (*Tx, error) {
1748 return db.BeginTx(context.Background(), nil)
1749 }
1750
1751 func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
1752 dc, err := db.conn(ctx, strategy)
1753 if err != nil {
1754 return nil, err
1755 }
1756 return db.beginDC(ctx, dc, dc.releaseConn, opts)
1757 }
1758
1759
1760 func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
1761 var txi driver.Tx
1762 keepConnOnRollback := false
1763 withLock(dc, func() {
1764 _, hasSessionResetter := dc.ci.(driver.SessionResetter)
1765 _, hasConnectionValidator := dc.ci.(driver.Validator)
1766 keepConnOnRollback = hasSessionResetter && hasConnectionValidator
1767 txi, err = ctxDriverBegin(ctx, opts, dc.ci)
1768 })
1769 if err != nil {
1770 release(err)
1771 return nil, err
1772 }
1773
1774
1775
1776 ctx, cancel := context.WithCancel(ctx)
1777 tx = &Tx{
1778 db: db,
1779 dc: dc,
1780 releaseConn: release,
1781 txi: txi,
1782 cancel: cancel,
1783 keepConnOnRollback: keepConnOnRollback,
1784 ctx: ctx,
1785 }
1786 go tx.awaitDone()
1787 return tx, nil
1788 }
1789
1790
1791 func (db *DB) Driver() driver.Driver {
1792 return db.connector.Driver()
1793 }
1794
1795
1796
1797 var ErrConnDone = errors.New("sql: connection is already closed")
1798
1799
1800
1801
1802
1803
1804
1805
1806 func (db *DB) Conn(ctx context.Context) (*Conn, error) {
1807 var dc *driverConn
1808 var err error
1809 for i := 0; i < maxBadConnRetries; i++ {
1810 dc, err = db.conn(ctx, cachedOrNewConn)
1811 if err != driver.ErrBadConn {
1812 break
1813 }
1814 }
1815 if err == driver.ErrBadConn {
1816 dc, err = db.conn(ctx, alwaysNewConn)
1817 }
1818 if err != nil {
1819 return nil, err
1820 }
1821
1822 conn := &Conn{
1823 db: db,
1824 dc: dc,
1825 }
1826 return conn, nil
1827 }
1828
1829 type releaseConn func(error)
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840 type Conn struct {
1841 db *DB
1842
1843
1844
1845
1846 closemu sync.RWMutex
1847
1848
1849
1850 dc *driverConn
1851
1852
1853
1854
1855 done int32
1856 }
1857
1858
1859
1860 func (c *Conn) grabConn(context.Context) (*driverConn, releaseConn, error) {
1861 if atomic.LoadInt32(&c.done) != 0 {
1862 return nil, nil, ErrConnDone
1863 }
1864 c.closemu.RLock()
1865 return c.dc, c.closemuRUnlockCondReleaseConn, nil
1866 }
1867
1868
1869 func (c *Conn) PingContext(ctx context.Context) error {
1870 dc, release, err := c.grabConn(ctx)
1871 if err != nil {
1872 return err
1873 }
1874 return c.db.pingDC(ctx, dc, release)
1875 }
1876
1877
1878
1879 func (c *Conn) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
1880 dc, release, err := c.grabConn(ctx)
1881 if err != nil {
1882 return nil, err
1883 }
1884 return c.db.execDC(ctx, dc, release, query, args)
1885 }
1886
1887
1888
1889 func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
1890 dc, release, err := c.grabConn(ctx)
1891 if err != nil {
1892 return nil, err
1893 }
1894 return c.db.queryDC(ctx, nil, dc, release, query, args)
1895 }
1896
1897
1898
1899
1900
1901
1902
1903 func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
1904 rows, err := c.QueryContext(ctx, query, args...)
1905 return &Row{rows: rows, err: err}
1906 }
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916 func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
1917 dc, release, err := c.grabConn(ctx)
1918 if err != nil {
1919 return nil, err
1920 }
1921 return c.db.prepareDC(ctx, dc, release, c, query)
1922 }
1923
1924
1925
1926
1927
1928
1929 func (c *Conn) Raw(f func(driverConn interface{}) error) (err error) {
1930 var dc *driverConn
1931 var release releaseConn
1932
1933
1934 dc, release, err = c.grabConn(nil)
1935 if err != nil {
1936 return
1937 }
1938 fPanic := true
1939 dc.Mutex.Lock()
1940 defer func() {
1941 dc.Mutex.Unlock()
1942
1943
1944
1945
1946 if fPanic {
1947 err = driver.ErrBadConn
1948 }
1949 release(err)
1950 }()
1951 err = f(dc.ci)
1952 fPanic = false
1953
1954 return
1955 }
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967 func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
1968 dc, release, err := c.grabConn(ctx)
1969 if err != nil {
1970 return nil, err
1971 }
1972 return c.db.beginDC(ctx, dc, release, opts)
1973 }
1974
1975
1976
1977 func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
1978 c.closemu.RUnlock()
1979 if err == driver.ErrBadConn {
1980 c.close(err)
1981 }
1982 }
1983
1984 func (c *Conn) txCtx() context.Context {
1985 return nil
1986 }
1987
1988 func (c *Conn) close(err error) error {
1989 if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
1990 return ErrConnDone
1991 }
1992
1993
1994
1995 c.closemu.Lock()
1996 defer c.closemu.Unlock()
1997
1998 c.dc.releaseConn(err)
1999 c.dc = nil
2000 c.db = nil
2001 return err
2002 }
2003
2004
2005
2006
2007
2008
2009 func (c *Conn) Close() error {
2010 return c.close(nil)
2011 }
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023 type Tx struct {
2024 db *DB
2025
2026
2027
2028
2029 closemu sync.RWMutex
2030
2031
2032
2033 dc *driverConn
2034 txi driver.Tx
2035
2036
2037
2038 releaseConn func(error)
2039
2040
2041
2042
2043
2044 done int32
2045
2046
2047
2048
2049 keepConnOnRollback bool
2050
2051
2052
2053 stmts struct {
2054 sync.Mutex
2055 v []*Stmt
2056 }
2057
2058
2059 cancel func()
2060
2061
2062 ctx context.Context
2063 }
2064
2065
2066
2067 func (tx *Tx) awaitDone() {
2068
2069
2070 <-tx.ctx.Done()
2071
2072
2073
2074
2075
2076
2077
2078 discardConnection := !tx.keepConnOnRollback
2079 tx.rollback(discardConnection)
2080 }
2081
2082 func (tx *Tx) isDone() bool {
2083 return atomic.LoadInt32(&tx.done) != 0
2084 }
2085
2086
2087
2088 var ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
2089
2090
2091
2092
2093 func (tx *Tx) close(err error) {
2094 tx.releaseConn(err)
2095 tx.dc = nil
2096 tx.txi = nil
2097 }
2098
2099
2100
2101 var hookTxGrabConn func()
2102
2103 func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
2104 select {
2105 default:
2106 case <-ctx.Done():
2107 return nil, nil, ctx.Err()
2108 }
2109
2110
2111
2112 tx.closemu.RLock()
2113 if tx.isDone() {
2114 tx.closemu.RUnlock()
2115 return nil, nil, ErrTxDone
2116 }
2117 if hookTxGrabConn != nil {
2118 hookTxGrabConn()
2119 }
2120 return tx.dc, tx.closemuRUnlockRelease, nil
2121 }
2122
2123 func (tx *Tx) txCtx() context.Context {
2124 return tx.ctx
2125 }
2126
2127
2128
2129
2130
2131 func (tx *Tx) closemuRUnlockRelease(error) {
2132 tx.closemu.RUnlock()
2133 }
2134
2135
2136 func (tx *Tx) closePrepared() {
2137 tx.stmts.Lock()
2138 defer tx.stmts.Unlock()
2139 for _, stmt := range tx.stmts.v {
2140 stmt.Close()
2141 }
2142 }
2143
2144
2145 func (tx *Tx) Commit() error {
2146
2147
2148
2149 select {
2150 default:
2151 case <-tx.ctx.Done():
2152 if atomic.LoadInt32(&tx.done) == 1 {
2153 return ErrTxDone
2154 }
2155 return tx.ctx.Err()
2156 }
2157 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
2158 return ErrTxDone
2159 }
2160
2161
2162
2163
2164
2165 tx.cancel()
2166 tx.closemu.Lock()
2167 tx.closemu.Unlock()
2168
2169 var err error
2170 withLock(tx.dc, func() {
2171 err = tx.txi.Commit()
2172 })
2173 if err != driver.ErrBadConn {
2174 tx.closePrepared()
2175 }
2176 tx.close(err)
2177 return err
2178 }
2179
2180 var rollbackHook func()
2181
2182
2183
2184 func (tx *Tx) rollback(discardConn bool) error {
2185 if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
2186 return ErrTxDone
2187 }
2188
2189 if rollbackHook != nil {
2190 rollbackHook()
2191 }
2192
2193
2194
2195
2196
2197 tx.cancel()
2198 tx.closemu.Lock()
2199 tx.closemu.Unlock()
2200
2201 var err error
2202 withLock(tx.dc, func() {
2203 err = tx.txi.Rollback()
2204 })
2205 if err != driver.ErrBadConn {
2206 tx.closePrepared()
2207 }
2208 if discardConn {
2209 err = driver.ErrBadConn
2210 }
2211 tx.close(err)
2212 return err
2213 }
2214
2215
2216 func (tx *Tx) Rollback() error {
2217 return tx.rollback(false)
2218 }
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230 func (tx *Tx) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
2231 dc, release, err := tx.grabConn(ctx)
2232 if err != nil {
2233 return nil, err
2234 }
2235
2236 stmt, err := tx.db.prepareDC(ctx, dc, release, tx, query)
2237 if err != nil {
2238 return nil, err
2239 }
2240 tx.stmts.Lock()
2241 tx.stmts.v = append(tx.stmts.v, stmt)
2242 tx.stmts.Unlock()
2243 return stmt, nil
2244 }
2245
2246
2247
2248
2249
2250
2251
2252 func (tx *Tx) Prepare(query string) (*Stmt, error) {
2253 return tx.PrepareContext(context.Background(), query)
2254 }
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271 func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
2272 dc, release, err := tx.grabConn(ctx)
2273 if err != nil {
2274 return &Stmt{stickyErr: err}
2275 }
2276 defer release(nil)
2277
2278 if tx.db != stmt.db {
2279 return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
2280 }
2281 var si driver.Stmt
2282 var parentStmt *Stmt
2283 stmt.mu.Lock()
2284 if stmt.closed || stmt.cg != nil {
2285
2286
2287
2288
2289
2290
2291 stmt.mu.Unlock()
2292 withLock(dc, func() {
2293 si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
2294 })
2295 if err != nil {
2296 return &Stmt{stickyErr: err}
2297 }
2298 } else {
2299 stmt.removeClosedStmtLocked()
2300
2301
2302 for _, v := range stmt.css {
2303 if v.dc == dc {
2304 si = v.ds.si
2305 break
2306 }
2307 }
2308
2309 stmt.mu.Unlock()
2310
2311 if si == nil {
2312 var ds *driverStmt
2313 withLock(dc, func() {
2314 ds, err = stmt.prepareOnConnLocked(ctx, dc)
2315 })
2316 if err != nil {
2317 return &Stmt{stickyErr: err}
2318 }
2319 si = ds.si
2320 }
2321 parentStmt = stmt
2322 }
2323
2324 txs := &Stmt{
2325 db: tx.db,
2326 cg: tx,
2327 cgds: &driverStmt{
2328 Locker: dc,
2329 si: si,
2330 },
2331 parentStmt: parentStmt,
2332 query: stmt.query,
2333 }
2334 if parentStmt != nil {
2335 tx.db.addDep(parentStmt, txs)
2336 }
2337 tx.stmts.Lock()
2338 tx.stmts.v = append(tx.stmts.v, txs)
2339 tx.stmts.Unlock()
2340 return txs
2341 }
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355 func (tx *Tx) Stmt(stmt *Stmt) *Stmt {
2356 return tx.StmtContext(context.Background(), stmt)
2357 }
2358
2359
2360
2361 func (tx *Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
2362 dc, release, err := tx.grabConn(ctx)
2363 if err != nil {
2364 return nil, err
2365 }
2366 return tx.db.execDC(ctx, dc, release, query, args)
2367 }
2368
2369
2370
2371 func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
2372 return tx.ExecContext(context.Background(), query, args...)
2373 }
2374
2375
2376 func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
2377 dc, release, err := tx.grabConn(ctx)
2378 if err != nil {
2379 return nil, err
2380 }
2381
2382 return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args)
2383 }
2384
2385
2386 func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
2387 return tx.QueryContext(context.Background(), query, args...)
2388 }
2389
2390
2391
2392
2393
2394
2395
2396 func (tx *Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
2397 rows, err := tx.QueryContext(ctx, query, args...)
2398 return &Row{rows: rows, err: err}
2399 }
2400
2401
2402
2403
2404
2405
2406
2407 func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
2408 return tx.QueryRowContext(context.Background(), query, args...)
2409 }
2410
2411
2412 type connStmt struct {
2413 dc *driverConn
2414 ds *driverStmt
2415 }
2416
2417
2418
2419 type stmtConnGrabber interface {
2420
2421
2422 grabConn(context.Context) (*driverConn, releaseConn, error)
2423
2424
2425
2426
2427 txCtx() context.Context
2428 }
2429
2430 var (
2431 _ stmtConnGrabber = &Tx{}
2432 _ stmtConnGrabber = &Conn{}
2433 )
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444 type Stmt struct {
2445
2446 db *DB
2447 query string
2448 stickyErr error
2449
2450 closemu sync.RWMutex
2451
2452
2453
2454
2455
2456
2457 cg stmtConnGrabber
2458 cgds *driverStmt
2459
2460
2461
2462
2463
2464
2465
2466 parentStmt *Stmt
2467
2468 mu sync.Mutex
2469 closed bool
2470
2471
2472
2473
2474
2475 css []connStmt
2476
2477
2478
2479 lastNumClosed uint64
2480 }
2481
2482
2483
2484 func (s *Stmt) ExecContext(ctx context.Context, args ...interface{}) (Result, error) {
2485 s.closemu.RLock()
2486 defer s.closemu.RUnlock()
2487
2488 var res Result
2489 strategy := cachedOrNewConn
2490 for i := 0; i < maxBadConnRetries+1; i++ {
2491 if i == maxBadConnRetries {
2492 strategy = alwaysNewConn
2493 }
2494 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2495 if err != nil {
2496 if err == driver.ErrBadConn {
2497 continue
2498 }
2499 return nil, err
2500 }
2501
2502 res, err = resultFromStatement(ctx, dc.ci, ds, args...)
2503 releaseConn(err)
2504 if err != driver.ErrBadConn {
2505 return res, err
2506 }
2507 }
2508 return nil, driver.ErrBadConn
2509 }
2510
2511
2512
2513 func (s *Stmt) Exec(args ...interface{}) (Result, error) {
2514 return s.ExecContext(context.Background(), args...)
2515 }
2516
2517 func resultFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (Result, error) {
2518 ds.Lock()
2519 defer ds.Unlock()
2520
2521 dargs, err := driverArgsConnLocked(ci, ds, args)
2522 if err != nil {
2523 return nil, err
2524 }
2525
2526 resi, err := ctxDriverStmtExec(ctx, ds.si, dargs)
2527 if err != nil {
2528 return nil, err
2529 }
2530 return driverResult{ds.Locker, resi}, nil
2531 }
2532
2533
2534
2535
2536
2537 func (s *Stmt) removeClosedStmtLocked() {
2538 t := len(s.css)/2 + 1
2539 if t > 10 {
2540 t = 10
2541 }
2542 dbClosed := atomic.LoadUint64(&s.db.numClosed)
2543 if dbClosed-s.lastNumClosed < uint64(t) {
2544 return
2545 }
2546
2547 s.db.mu.Lock()
2548 for i := 0; i < len(s.css); i++ {
2549 if s.css[i].dc.dbmuClosed {
2550 s.css[i] = s.css[len(s.css)-1]
2551 s.css = s.css[:len(s.css)-1]
2552 i--
2553 }
2554 }
2555 s.db.mu.Unlock()
2556 s.lastNumClosed = dbClosed
2557 }
2558
2559
2560
2561
2562 func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
2563 if err = s.stickyErr; err != nil {
2564 return
2565 }
2566 s.mu.Lock()
2567 if s.closed {
2568 s.mu.Unlock()
2569 err = errors.New("sql: statement is closed")
2570 return
2571 }
2572
2573
2574
2575 if s.cg != nil {
2576 s.mu.Unlock()
2577 dc, releaseConn, err = s.cg.grabConn(ctx)
2578 if err != nil {
2579 return
2580 }
2581 return dc, releaseConn, s.cgds, nil
2582 }
2583
2584 s.removeClosedStmtLocked()
2585 s.mu.Unlock()
2586
2587 dc, err = s.db.conn(ctx, strategy)
2588 if err != nil {
2589 return nil, nil, nil, err
2590 }
2591
2592 s.mu.Lock()
2593 for _, v := range s.css {
2594 if v.dc == dc {
2595 s.mu.Unlock()
2596 return dc, dc.releaseConn, v.ds, nil
2597 }
2598 }
2599 s.mu.Unlock()
2600
2601
2602 withLock(dc, func() {
2603 ds, err = s.prepareOnConnLocked(ctx, dc)
2604 })
2605 if err != nil {
2606 dc.releaseConn(err)
2607 return nil, nil, nil, err
2608 }
2609
2610 return dc, dc.releaseConn, ds, nil
2611 }
2612
2613
2614
2615 func (s *Stmt) prepareOnConnLocked(ctx context.Context, dc *driverConn) (*driverStmt, error) {
2616 si, err := dc.prepareLocked(ctx, s.cg, s.query)
2617 if err != nil {
2618 return nil, err
2619 }
2620 cs := connStmt{dc, si}
2621 s.mu.Lock()
2622 s.css = append(s.css, cs)
2623 s.mu.Unlock()
2624 return cs.ds, nil
2625 }
2626
2627
2628
2629 func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error) {
2630 s.closemu.RLock()
2631 defer s.closemu.RUnlock()
2632
2633 var rowsi driver.Rows
2634 strategy := cachedOrNewConn
2635 for i := 0; i < maxBadConnRetries+1; i++ {
2636 if i == maxBadConnRetries {
2637 strategy = alwaysNewConn
2638 }
2639 dc, releaseConn, ds, err := s.connStmt(ctx, strategy)
2640 if err != nil {
2641 if err == driver.ErrBadConn {
2642 continue
2643 }
2644 return nil, err
2645 }
2646
2647 rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...)
2648 if err == nil {
2649
2650
2651 rows := &Rows{
2652 dc: dc,
2653 rowsi: rowsi,
2654
2655 }
2656
2657
2658 s.db.addDep(s, rows)
2659
2660
2661
2662 rows.releaseConn = func(err error) {
2663 releaseConn(err)
2664 s.db.removeDep(s, rows)
2665 }
2666 var txctx context.Context
2667 if s.cg != nil {
2668 txctx = s.cg.txCtx()
2669 }
2670 rows.initContextClose(ctx, txctx)
2671 return rows, nil
2672 }
2673
2674 releaseConn(err)
2675 if err != driver.ErrBadConn {
2676 return nil, err
2677 }
2678 }
2679 return nil, driver.ErrBadConn
2680 }
2681
2682
2683
2684 func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
2685 return s.QueryContext(context.Background(), args...)
2686 }
2687
2688 func rowsiFromStatement(ctx context.Context, ci driver.Conn, ds *driverStmt, args ...interface{}) (driver.Rows, error) {
2689 ds.Lock()
2690 defer ds.Unlock()
2691 dargs, err := driverArgsConnLocked(ci, ds, args)
2692 if err != nil {
2693 return nil, err
2694 }
2695 return ctxDriverStmtQuery(ctx, ds.si, dargs)
2696 }
2697
2698
2699
2700
2701
2702
2703
2704 func (s *Stmt) QueryRowContext(ctx context.Context, args ...interface{}) *Row {
2705 rows, err := s.QueryContext(ctx, args...)
2706 if err != nil {
2707 return &Row{err: err}
2708 }
2709 return &Row{rows: rows}
2710 }
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723 func (s *Stmt) QueryRow(args ...interface{}) *Row {
2724 return s.QueryRowContext(context.Background(), args...)
2725 }
2726
2727
2728 func (s *Stmt) Close() error {
2729 s.closemu.Lock()
2730 defer s.closemu.Unlock()
2731
2732 if s.stickyErr != nil {
2733 return s.stickyErr
2734 }
2735 s.mu.Lock()
2736 if s.closed {
2737 s.mu.Unlock()
2738 return nil
2739 }
2740 s.closed = true
2741 txds := s.cgds
2742 s.cgds = nil
2743
2744 s.mu.Unlock()
2745
2746 if s.cg == nil {
2747 return s.db.removeDep(s, s)
2748 }
2749
2750 if s.parentStmt != nil {
2751
2752
2753 return s.db.removeDep(s.parentStmt, s)
2754 }
2755 return txds.Close()
2756 }
2757
2758 func (s *Stmt) finalClose() error {
2759 s.mu.Lock()
2760 defer s.mu.Unlock()
2761 if s.css != nil {
2762 for _, v := range s.css {
2763 s.db.noteUnusedDriverStatement(v.dc, v.ds)
2764 v.dc.removeOpenStmt(v.ds)
2765 }
2766 s.css = nil
2767 }
2768 return nil
2769 }
2770
2771
2772
2773 type Rows struct {
2774 dc *driverConn
2775 releaseConn func(error)
2776 rowsi driver.Rows
2777 cancel func()
2778 closeStmt *driverStmt
2779
2780
2781
2782
2783
2784
2785 closemu sync.RWMutex
2786 closed bool
2787 lasterr error
2788
2789
2790
2791 lastcols []driver.Value
2792 }
2793
2794
2795
2796 func (rs *Rows) lasterrOrErrLocked(err error) error {
2797 if rs.lasterr != nil && rs.lasterr != io.EOF {
2798 return rs.lasterr
2799 }
2800 return err
2801 }
2802
2803
2804
2805 var bypassRowsAwaitDone = false
2806
2807 func (rs *Rows) initContextClose(ctx, txctx context.Context) {
2808 if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) {
2809 return
2810 }
2811 if bypassRowsAwaitDone {
2812 return
2813 }
2814 ctx, rs.cancel = context.WithCancel(ctx)
2815 go rs.awaitDone(ctx, txctx)
2816 }
2817
2818
2819
2820
2821
2822 func (rs *Rows) awaitDone(ctx, txctx context.Context) {
2823 var txctxDone <-chan struct{}
2824 if txctx != nil {
2825 txctxDone = txctx.Done()
2826 }
2827 select {
2828 case <-ctx.Done():
2829 case <-txctxDone:
2830 }
2831 rs.close(ctx.Err())
2832 }
2833
2834
2835
2836
2837
2838
2839
2840 func (rs *Rows) Next() bool {
2841 var doClose, ok bool
2842 withLock(rs.closemu.RLocker(), func() {
2843 doClose, ok = rs.nextLocked()
2844 })
2845 if doClose {
2846 rs.Close()
2847 }
2848 return ok
2849 }
2850
2851 func (rs *Rows) nextLocked() (doClose, ok bool) {
2852 if rs.closed {
2853 return false, false
2854 }
2855
2856
2857
2858 rs.dc.Lock()
2859 defer rs.dc.Unlock()
2860
2861 if rs.lastcols == nil {
2862 rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns()))
2863 }
2864
2865 rs.lasterr = rs.rowsi.Next(rs.lastcols)
2866 if rs.lasterr != nil {
2867
2868 if rs.lasterr != io.EOF {
2869 return true, false
2870 }
2871 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
2872 if !ok {
2873 return true, false
2874 }
2875
2876
2877
2878 if !nextResultSet.HasNextResultSet() {
2879 doClose = true
2880 }
2881 return doClose, false
2882 }
2883 return false, true
2884 }
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894 func (rs *Rows) NextResultSet() bool {
2895 var doClose bool
2896 defer func() {
2897 if doClose {
2898 rs.Close()
2899 }
2900 }()
2901 rs.closemu.RLock()
2902 defer rs.closemu.RUnlock()
2903
2904 if rs.closed {
2905 return false
2906 }
2907
2908 rs.lastcols = nil
2909 nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet)
2910 if !ok {
2911 doClose = true
2912 return false
2913 }
2914
2915
2916
2917 rs.dc.Lock()
2918 defer rs.dc.Unlock()
2919
2920 rs.lasterr = nextResultSet.NextResultSet()
2921 if rs.lasterr != nil {
2922 doClose = true
2923 return false
2924 }
2925 return true
2926 }
2927
2928
2929
2930 func (rs *Rows) Err() error {
2931 rs.closemu.RLock()
2932 defer rs.closemu.RUnlock()
2933 return rs.lasterrOrErrLocked(nil)
2934 }
2935
2936 var errRowsClosed = errors.New("sql: Rows are closed")
2937 var errNoRows = errors.New("sql: no Rows available")
2938
2939
2940
2941 func (rs *Rows) Columns() ([]string, error) {
2942 rs.closemu.RLock()
2943 defer rs.closemu.RUnlock()
2944 if rs.closed {
2945 return nil, rs.lasterrOrErrLocked(errRowsClosed)
2946 }
2947 if rs.rowsi == nil {
2948 return nil, rs.lasterrOrErrLocked(errNoRows)
2949 }
2950 rs.dc.Lock()
2951 defer rs.dc.Unlock()
2952
2953 return rs.rowsi.Columns(), nil
2954 }
2955
2956
2957
2958 func (rs *Rows) ColumnTypes() ([]*ColumnType, error) {
2959 rs.closemu.RLock()
2960 defer rs.closemu.RUnlock()
2961 if rs.closed {
2962 return nil, rs.lasterrOrErrLocked(errRowsClosed)
2963 }
2964 if rs.rowsi == nil {
2965 return nil, rs.lasterrOrErrLocked(errNoRows)
2966 }
2967 rs.dc.Lock()
2968 defer rs.dc.Unlock()
2969
2970 return rowsColumnInfoSetupConnLocked(rs.rowsi), nil
2971 }
2972
2973
2974 type ColumnType struct {
2975 name string
2976
2977 hasNullable bool
2978 hasLength bool
2979 hasPrecisionScale bool
2980
2981 nullable bool
2982 length int64
2983 databaseType string
2984 precision int64
2985 scale int64
2986 scanType reflect.Type
2987 }
2988
2989
2990 func (ci *ColumnType) Name() string {
2991 return ci.name
2992 }
2993
2994
2995
2996
2997
2998
2999 func (ci *ColumnType) Length() (length int64, ok bool) {
3000 return ci.length, ci.hasLength
3001 }
3002
3003
3004
3005 func (ci *ColumnType) DecimalSize() (precision, scale int64, ok bool) {
3006 return ci.precision, ci.scale, ci.hasPrecisionScale
3007 }
3008
3009
3010
3011
3012 func (ci *ColumnType) ScanType() reflect.Type {
3013 return ci.scanType
3014 }
3015
3016
3017
3018 func (ci *ColumnType) Nullable() (nullable, ok bool) {
3019 return ci.nullable, ci.hasNullable
3020 }
3021
3022
3023
3024
3025
3026
3027
3028 func (ci *ColumnType) DatabaseTypeName() string {
3029 return ci.databaseType
3030 }
3031
3032 func rowsColumnInfoSetupConnLocked(rowsi driver.Rows) []*ColumnType {
3033 names := rowsi.Columns()
3034
3035 list := make([]*ColumnType, len(names))
3036 for i := range list {
3037 ci := &ColumnType{
3038 name: names[i],
3039 }
3040 list[i] = ci
3041
3042 if prop, ok := rowsi.(driver.RowsColumnTypeScanType); ok {
3043 ci.scanType = prop.ColumnTypeScanType(i)
3044 } else {
3045 ci.scanType = reflect.TypeOf(new(interface{})).Elem()
3046 }
3047 if prop, ok := rowsi.(driver.RowsColumnTypeDatabaseTypeName); ok {
3048 ci.databaseType = prop.ColumnTypeDatabaseTypeName(i)
3049 }
3050 if prop, ok := rowsi.(driver.RowsColumnTypeLength); ok {
3051 ci.length, ci.hasLength = prop.ColumnTypeLength(i)
3052 }
3053 if prop, ok := rowsi.(driver.RowsColumnTypeNullable); ok {
3054 ci.nullable, ci.hasNullable = prop.ColumnTypeNullable(i)
3055 }
3056 if prop, ok := rowsi.(driver.RowsColumnTypePrecisionScale); ok {
3057 ci.precision, ci.scale, ci.hasPrecisionScale = prop.ColumnTypePrecisionScale(i)
3058 }
3059 }
3060 return list
3061 }
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086
3087
3088
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101
3102
3103
3104
3105
3106
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123 func (rs *Rows) Scan(dest ...interface{}) error {
3124 rs.closemu.RLock()
3125
3126 if rs.lasterr != nil && rs.lasterr != io.EOF {
3127 rs.closemu.RUnlock()
3128 return rs.lasterr
3129 }
3130 if rs.closed {
3131 err := rs.lasterrOrErrLocked(errRowsClosed)
3132 rs.closemu.RUnlock()
3133 return err
3134 }
3135 rs.closemu.RUnlock()
3136
3137 if rs.lastcols == nil {
3138 return errors.New("sql: Scan called without calling Next")
3139 }
3140 if len(dest) != len(rs.lastcols) {
3141 return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest))
3142 }
3143 for i, sv := range rs.lastcols {
3144 err := convertAssignRows(dest[i], sv, rs)
3145 if err != nil {
3146 return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err)
3147 }
3148 }
3149 return nil
3150 }
3151
3152
3153
3154 var rowsCloseHook = func() func(*Rows, *error) { return nil }
3155
3156
3157
3158
3159
3160 func (rs *Rows) Close() error {
3161 return rs.close(nil)
3162 }
3163
3164 func (rs *Rows) close(err error) error {
3165 rs.closemu.Lock()
3166 defer rs.closemu.Unlock()
3167
3168 if rs.closed {
3169 return nil
3170 }
3171 rs.closed = true
3172
3173 if rs.lasterr == nil {
3174 rs.lasterr = err
3175 }
3176
3177 withLock(rs.dc, func() {
3178 err = rs.rowsi.Close()
3179 })
3180 if fn := rowsCloseHook(); fn != nil {
3181 fn(rs, &err)
3182 }
3183 if rs.cancel != nil {
3184 rs.cancel()
3185 }
3186
3187 if rs.closeStmt != nil {
3188 rs.closeStmt.Close()
3189 }
3190 rs.releaseConn(err)
3191 return err
3192 }
3193
3194
3195 type Row struct {
3196
3197 err error
3198 rows *Rows
3199 }
3200
3201
3202
3203
3204
3205
3206 func (r *Row) Scan(dest ...interface{}) error {
3207 if r.err != nil {
3208 return r.err
3209 }
3210
3211
3212
3213
3214
3215
3216
3217
3218
3219
3220
3221
3222
3223
3224 defer r.rows.Close()
3225 for _, dp := range dest {
3226 if _, ok := dp.(*RawBytes); ok {
3227 return errors.New("sql: RawBytes isn't allowed on Row.Scan")
3228 }
3229 }
3230
3231 if !r.rows.Next() {
3232 if err := r.rows.Err(); err != nil {
3233 return err
3234 }
3235 return ErrNoRows
3236 }
3237 err := r.rows.Scan(dest...)
3238 if err != nil {
3239 return err
3240 }
3241
3242 return r.rows.Close()
3243 }
3244
3245
3246
3247
3248
3249 func (r *Row) Err() error {
3250 return r.err
3251 }
3252
3253
3254 type Result interface {
3255
3256
3257
3258
3259
3260 LastInsertId() (int64, error)
3261
3262
3263
3264
3265 RowsAffected() (int64, error)
3266 }
3267
3268 type driverResult struct {
3269 sync.Locker
3270 resi driver.Result
3271 }
3272
3273 func (dr driverResult) LastInsertId() (int64, error) {
3274 dr.Lock()
3275 defer dr.Unlock()
3276 return dr.resi.LastInsertId()
3277 }
3278
3279 func (dr driverResult) RowsAffected() (int64, error) {
3280 dr.Lock()
3281 defer dr.Unlock()
3282 return dr.resi.RowsAffected()
3283 }
3284
3285 func stack() string {
3286 var buf [2 << 10]byte
3287 return string(buf[:runtime.Stack(buf[:], false)])
3288 }
3289
3290
3291 func withLock(lk sync.Locker, fn func()) {
3292 lk.Lock()
3293 defer lk.Unlock()
3294 fn()
3295 }
3296
View as plain text