Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x/net/http2: Above go1.19, the server cannot send packets to the client #56753

Closed
wanglong001 opened this issue Nov 16, 2022 · 3 comments
Closed
Assignees
Labels
FrozenDueToAge NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one.
Milestone

Comments

@wanglong001
Copy link

What version of Go are you using (go version)?

$ go version
go version go1.19 linux/amd64

Does this issue reproduce with the latest release?

Yes

What operating system and processor architecture are you using (go env)?

go env Output
$ go env
GO111MODULE=""
GOARCH="amd64"
GOBIN=""
GOCACHE="/home/ubuntu/.cache/go-build"
GOENV="/home/ubuntu/.config/go/env"
GOEXE=""
GOEXPERIMENT=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="linux"
GOINSECURE=""
GOMODCACHE="/home/ubuntu/.gvm/pkgsets/system/global/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="linux"
GOPATH="/home/ubuntu/.gvm/pkgsets/system/global"
GOPRIVATE=""
GOPROXY="https://proxy.golang.org,direct"
GOROOT="/home/ubuntu/go"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/home/ubuntu/go/pkg/tool/linux_amd64"
GOVCS=""
GOVERSION="go1.19"
GCCGO="gccgo"
GOAMD64="v1"
AR="ar"
CC="gcc"
CXX="g++"
CGO_ENABLED="1"
GOMOD="/home/ubuntu/software/debug_net/go.mod"
GOWORK=""
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -pthread -Wl,--no-gc-sections -fmessage-length=0 -fdebug-prefix-map=/tmp/go-build1322243267=/tmp/go-build -gno-record-gcc-switches"

What did you do?

image
image

From the packet capture point of view, the server does not send data packets to the client

server.go

package main

import (
	"context"
	"fmt"
	"io"
	"net"
	"net/http"
	"strings"
	"time"

	"github.com/felixge/tcpkeepalive"
	httputil "github.com/qiniupd/qiniu-go-sdk/x/httputil.v1"
	"golang.org/x/net/http2"
)

type B struct {
}

func (b *B) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	if strings.Contains(req.RequestURI, "long") {
		fw := FlushWriter{W: resp}
		fw.Write([]byte("long"))
		time.Sleep(2 * time.Second)
		fw.Write([]byte("long end"))
		return
	}
}

type A struct {
}

func (a *A) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	fmt.Println("url", req.URL.String())
	conn, err := UpgradeServerTCP(resp, req)
	if err != nil {
		fmt.Printf(" UpgradeServerTCP: %v", err)
		return
	}
	fmt.Println(resp)
	go func() {
		server := &http2.Server{NewWriteScheduler: func() http2.WriteScheduler { return http2.NewPriorityWriteScheduler(nil) }}
		server.ServeConn(conn, &http2.ServeConnOpts{
			Handler: &B{},
		})
	}()
}

type FlushWriter struct {
	W io.Writer
}

func (fw FlushWriter) Write(p []byte) (n int, err error) {
	n, err = fw.W.Write(p)
	if f, ok := fw.W.(http.Flusher); ok {
		f.Flush()
	}
	return
}

type hijackedTcpConn struct {
	net.Conn
	cancelf context.CancelFunc
}

func (p *hijackedTcpConn) Read(b []byte) (int, error) {
	n, err := p.Conn.Read(b)
	if err != nil {
		p.cancelf()
	}
	return n, err
}

func (p *hijackedTcpConn) Write(b []byte) (int, error) {
	n, err := p.Conn.Write(b)
	if err != nil {
		p.cancelf()
	}
	return n, err
}

func UpgradeServerTCP(w http.ResponseWriter, req *http.Request) (c net.Conn, err error) {
	if !IsUpgradeTcp(req.Header) {
		httputil.ReplyErr(w, 400, "http upgrade needed")
		return nil, fmt.Errorf("http upgrade needed")
	}
	hijacker, ok := httputil.GetHijacker(w)
	if !ok {
		httputil.ReplyErr(w, 500, "http upgrade failed")
		return nil, fmt.Errorf("http upgrade failed")
	}
	httputil.Reply(w, 101, nil)
	conn, _, err := hijacker.Hijack()
	if err != nil {
		return nil, err
	}
	if err = keepAlive(conn); err != nil {
		conn.Close()
		return nil, err
	}
	ctx, cancelf := context.WithCancel(req.Context())
	req1 := req.WithContext(ctx)
	*req = *req1
	return &hijackedTcpConn{conn, cancelf}, nil
}

func keepAlive(conn net.Conn) error {
	DefaultKeepAliveIdleTime := 13 * time.Second
	DefaultKeepAliveCount := 8
	DefaultKeepAliveInterval := 5 * time.Second
	return tcpkeepalive.SetKeepAlive(conn, DefaultKeepAliveIdleTime, DefaultKeepAliveCount, DefaultKeepAliveInterval)
}

func IsUpgradeTcp(h http.Header) bool {
	return strings.Contains(strings.ToLower(h.Get("Connection")), "upgrade") && h.Get("Upgrade") != ""
}

func main() {
	err := http.ListenAndServe("127.0.0.1:10001", &A{})
	if err != nil {
		panic(err)
	}
}

client.go

package main

import (
	"bufio"
	"bytes"
	"context"
	"crypto/tls"
	"fmt"
	"io"
	"net"
	"net/http"
	"net/url"
	"time"

	"github.com/felixge/tcpkeepalive"
	"golang.org/x/net/http2"
)

type readCloserWrap struct {
	io.Reader
	io.Closer
}
type CP struct {
	client *http2.ClientConn
}

func (c *CP) GetClientConn(req *http.Request, addr string) (*http2.ClientConn, error) {
	fmt.Println("getClient: ", c.client)
	return c.client, nil
}

func (c *CP) MarkDead(conn *http2.ClientConn) {
	conn.Close()
}

func UpgradeClientTCP(ctx context.Context, isProxy bool, conn net.Conn, req *http.Request) (c net.Conn, err error) {
	req.Header.Set("Upgrade", "Tcp")
	req.Header.Set("Connection", "Upgrade")
	req.Header.Set("Content-Type", "application/json")

	defer func() {
		if err != nil {
			conn.Close()
		}
	}()
	if isProxy {
		err = req.WriteProxy(conn)
	} else {
		err = req.Write(conn)
	}
	if err != nil {
		return nil, err
	}
	br := bufio.NewReader(conn)
	resp, err := http.ReadResponse(br, req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode != 101 {
		return nil, fmt.Errorf("statusCode - %d", resp.StatusCode)
	}
	return &connWithBufReader{conn, br}, nil
}

type connWithBufReader struct {
	net.Conn
	br *bufio.Reader
}

func (p *connWithBufReader) Read(b []byte) (int, error) {
	return p.br.Read(b)
}

type ClientConn struct {
	ClientConfig
}

type ClientConfig struct {
	DialTimeout time.Duration
	Dial        func(network, addr string) (net.Conn, error)

	TLSClientConfig     *tls.Config
	TLSHandshakeTimeout time.Duration
}

func NewClientConn(cfg ClientConfig) *ClientConn {
	if cfg.DialTimeout == 0 {
		cfg.DialTimeout = 10 * time.Second
	}
	if cfg.Dial == nil {
		cfg.Dial = (&net.Dialer{
			Timeout:   cfg.DialTimeout,
			KeepAlive: 30 * time.Second,
		}).Dial
	}
	if cfg.TLSHandshakeTimeout == 0 {
		cfg.TLSHandshakeTimeout = 30 * time.Second
	}
	return &ClientConn{cfg}
}

func (p *ClientConn) DialConn(targetUrl string) (net.Conn, error) {
	ut, err := url.Parse(targetUrl)
	if err != nil {
		return nil, err
	}
	var conn net.Conn
	targetAddr := ut.Host
	if ut.Port() == "" {
		switch ut.Scheme {
		case "http":
			targetAddr += ":80"
		case "https":
			targetAddr += ":443"
		}
	}

	conn, err = p.Dial("tcp", targetAddr)
	if err != nil {
		return nil, &net.OpError{Op: "connect", Net: "tcp", Err: err}
	}
	if err = keepAlive(conn); err != nil {
		conn.Close()
		return nil, err
	}

	return conn, nil
}

func keepAlive(conn net.Conn) error {
	DefaultKeepAliveIdleTime := 13 * time.Second
	DefaultKeepAliveCount := 8
	DefaultKeepAliveInterval := 5 * time.Second
	return tcpkeepalive.SetKeepAlive(conn, DefaultKeepAliveIdleTime, DefaultKeepAliveCount, DefaultKeepAliveInterval)
}

func main() {

	conn, err := NewClientConn(ClientConfig{
		DialTimeout: 10 * time.Second,
	}).DialConn("http://127.0.0.1:10001")

	req, err := http.NewRequest("POST", "/", bytes.NewReader([]byte("{}")))
	if err != nil {
		fmt.Println("NewRequest:", err)
		conn.Close()
		return
	}
	req.Header.Set("Content-Type", "application/json")

	rawConn, err := UpgradeClientTCP(context.Background(), false, conn, req)
	if err != nil {
		fmt.Println(" UpgradeClientTCP:", err)
		return
	}

	pr, _ := io.Pipe()
	mr := io.MultiReader(bytes.NewReader([]byte(`{"a":"a"}`)), pr)

	req, _ = http.NewRequest("POST", "http://127.0.0.1:10001/long", &readCloserWrap{mr, pr})
	req.Header.Set("Content-Type", "application/json")

	cp := &CP{}
	h2t := &http2.Transport{ConnPool: cp, AllowHTTP: true}
	h2c, err := h2t.NewClientConn(rawConn)
	if err != nil {
		fmt.Println(err)
		return
	}
	cp.client = h2c

	cli := &http.Client{
		Transport: h2t,
		CheckRedirect: func(req *http.Request, via []*http.Request) error {
			return http.ErrUseLastResponse
		},
	}
	resp, err := cli.Do(req)
	if err != nil {
		fmt.Println("err ", err)
		return
	}
	defer resp.Body.Close()

	data, _ := io.ReadAll(resp.Body)
	fmt.Println(string(data))

}

What did you expect to see?

It is normal for versions below go 1.18.8

image
image

What did you see instead?

@joedian joedian added the NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one. label Nov 16, 2022
@neild
Copy link
Contributor

neild commented Nov 17, 2022

This code is doing something extremely complicated and unusual involving sending a request on a connection, hijacking the connection, and then starting a new HTTP/2 session on the hijacked connection.

This could use a much more focused reproduction case to understand what might have changed.

@wanglong001
Copy link
Author

wanglong001 commented Nov 17, 2022

The client can work and run regardless of the go version, the main problem is the server (this problem will occur above 1.19)
It is the underlying code that our framework depends on and has not changed
As of go 1.16, and possibly earlier, this kind of code works fine.

and I simplify the code :

server

package main

import (
	"fmt"
	"net"
	"net/http"
	"time"

	"golang.org/x/net/http2"
)

type B struct {
}

func (b *B) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	resp.Write([]byte("long"))
	resp.(http.Flusher).Flush()
	time.Sleep(2 * time.Second)
	resp.Write([]byte("long end"))
	resp.(http.Flusher).Flush()
	return
}

type A struct {
}

func (a *A) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
	conn, err := UpgradeServerTCP(resp, req)
	if err != nil {
		return
	}
	fmt.Println(resp)
	go func() {
		server := &http2.Server{}
		server.ServeConn(conn, &http2.ServeConnOpts{
			Handler: &B{},
		})
	}()
}

func UpgradeServerTCP(w http.ResponseWriter, req *http.Request) (c net.Conn, err error) {
	hijacker, ok := w.(http.Hijacker)
	if !ok {
		return nil, fmt.Errorf("http upgrade failed")
	}

	h := w.Header()
	h.Set("Content-Length", "2")
	h.Set("Content-Type", "application/json")
	w.WriteHeader(101)
	w.Write([]byte("{}"))

	conn, _, err := hijacker.Hijack()
	if err != nil {
		return nil, err
	}
	return conn, nil
}

func main() {
	err := http.ListenAndServe("127.0.0.1:10001", &A{})
	if err != nil {
		panic(err)
	}
}

client

package main

import (
	"bufio"
	"bytes"
	"context"
	"fmt"
	"io"
	"net"
	"net/http"

	"golang.org/x/net/http2"
)

type readCloserWrap struct {
	io.Reader
	io.Closer
}
type CP struct {
	client *http2.ClientConn
}

func (c *CP) GetClientConn(req *http.Request, addr string) (*http2.ClientConn, error) {
	return c.client, nil
}

func (c *CP) MarkDead(conn *http2.ClientConn) {
	conn.Close()
}

func UpgradeClientTCP(ctx context.Context, isProxy bool, conn net.Conn, req *http.Request) (c net.Conn, err error) {
	req.Header.Set("Upgrade", "Tcp")
	req.Header.Set("Connection", "Upgrade")
	req.Header.Set("Content-Type", "application/json")

	defer func() {
		if err != nil {
			conn.Close()
		}
	}()
	err = req.Write(conn)
	if err != nil {
		return nil, err
	}
	br := bufio.NewReader(conn)
	resp, err := http.ReadResponse(br, req)
	if err != nil {
		return nil, err
	}
	if resp.StatusCode != 101 {
		return nil, fmt.Errorf("statusCode - %d", resp.StatusCode)
	}
	return &connWithBufReader{conn, br}, nil
}

type connWithBufReader struct {
	net.Conn
	br *bufio.Reader
}

func (p *connWithBufReader) Read(b []byte) (int, error) {
	return p.br.Read(b)
}

func main() {

	conn, _ := (&net.Dialer{}).Dial("tcp", "127.0.0.1:10001")

	req, err := http.NewRequest("POST", "/", bytes.NewReader([]byte("{}")))
	if err != nil {
		conn.Close()
		return
	}
	req.Header.Set("Content-Type", "application/json")

	rawConn, err := UpgradeClientTCP(context.Background(), false, conn, req)
	if err != nil {
		return
	}

	pr, _ := io.Pipe()
	mr := io.MultiReader(bytes.NewReader([]byte(`{"a":"a"}`)), pr)

	req, _ = http.NewRequest("POST", "http://127.0.0.1:10001/long", &readCloserWrap{mr, pr})
	req.Header.Set("Content-Type", "application/json")

	cp := &CP{}
	h2t := &http2.Transport{ConnPool: cp, AllowHTTP: true}
	h2c, err := h2t.NewClientConn(rawConn)
	if err != nil {
		fmt.Println(err)
		return
	}
	cp.client = h2c

	cli := &http.Client{
		Transport: h2t,
		CheckRedirect: func(req *http.Request, via []*http.Request) error {
			return http.ErrUseLastResponse
		},
	}
	resp, err := cli.Do(req)
	if err != nil {
		fmt.Println("err ", err)
		return
	}
	defer resp.Body.Close()

	data, _ := io.ReadAll(resp.Body)
	fmt.Println(string(data))
}

@seankhliao seankhliao added this to the Unplanned milestone Nov 19, 2022
@wanglong001
Copy link
Author

I know why this happened:

1.18.8 vs 1.19
image

The response.writeHeader of 1.19, when code >= 100 and code <= 900, will send a frame separately, no need to manually wrap a frame on the user code;

Just call writeHeader

h := w.Header()
h.Set("Content-Length", "2")
h.Set("Content-Type", "application/json")
w.WriteHeader(101)
w.Write([]byte("{}"))

And this writing is compatible with before 1.18.8.
My previous code is not good, so I don’t need to write a frame by myself,
otherwise I will send an extra frame, and the frame will block other subsequent frames

[Reassembly error, protocol TCP: New fragment overlaps old data (retransmission?)]

@golang golang locked and limited conversation to collaborators Nov 24, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
FrozenDueToAge NeedsInvestigation Someone must examine and confirm this is a valid issue and not a duplicate of an existing one.
Projects
None yet
Development

No branches or pull requests

5 participants