Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: io: add a buffered pipe #28790

Closed
mvdan opened this issue Nov 14, 2018 · 28 comments
Closed

Proposal: io: add a buffered pipe #28790

mvdan opened this issue Nov 14, 2018 · 28 comments

Comments

@mvdan
Copy link
Member

mvdan commented Nov 14, 2018

Summary

Add a way to easily create a buffered io.Pipe. Currently, one has to write their own from scratch. This leads to dozens of separate implementations in the wild, and likely many correctness issues and hidden bugs.

Description

io.Pipe is synchronous, as per https://golang.org/pkg/io/#Pipe:

Reads and Writes on the pipe are matched one to one except when multiple Reads are needed to consume a single Write. That is, each Write to the PipeWriter blocks until it has satisfied one or more Reads from the PipeReader that fully consume the written data. The data is copied directly from the Write to the corresponding Read (or Reads); there is no internal buffering.

This is fine for most use cases. However, a buffered pipe can still be useful or necessary in many situations. Below is a list of third-party implementations I was able to find after a ten-minute search:

Note the first two, which live in the standard library itself. They're also interesting, because net's requires deadlines, and http's supports any underlying pipe buffer implementation.

The main point of the proposal is that there are dozens of well-used buffered pipe implementations out there. This is particularly worrying, because writing a correct one isn't easy. It's very likely that some of the implementations out there have subtle bugs, such as misusing sync.Cond or being racy when many reads and writes happen concurrently.

I'm raising this proposal precisely because I had written my own buggy buffered pipe, which made go test -race find data races about 5% of the time. I "fixed" that by rewriting my tests to work around the lack of buffering, and using io.Pipe instead: mvdan/sh@5ffc4d9

Granted, my code being incorrect doesn't mean everyone's code is too. But it does help illustrate the point; writing a buffered pipe seems easy, and the lack of an implementation in the standard library seems to encourage that people write their own, likely buggy implementation.

This has been discussed in golang-dev before, although no common implementation was found or proposed: https://groups.google.com/forum/#!topic/golang-dev/k0bSal8eDyE

There's also a proposal to make net.Pipe asynchronous in Go2; #24205. That proposal is somewhat related to this one, since net.Pipe currently uses io.Pipe under the hood. This could however be changed in Go2.

Proposed API

The simplest change would be to add something like:

// BufferedPipe creates an asynchronous in-memory pipe, internally buffering up
// to n bytes.
func BufferedPipe(size int) (*PipeReader, *PipeWriter)

Ideally PipeReader and PipeWriter would have been interface types from the start. Within Go1, we can work around that by having the PipeWriter struct simply embed the actual pipe implementation, be it synchronous or asynchronous.

Another possible path in Go1 is to go the http2 route, allowing any underlying pipe buffer implementation. I personally think this would be a bit too niche for the io package, but perhaps @bradfitz has opinions on that. Aside from http2, all buffered pipe packages seem to build on the current io.Pipe semantics.

If the net package needs read and write deadlines, they could always be added to the reader and writer returned by the constructors. This should be a backwards compatible change, and could be discussed as a separate proposal.

Finally, if we were to consider this as a Go2 change, the cleaned up version of my first proposed change would be:

type PipeReader interface {
	io.Reader
	io.Closer
	CloseWithError(err error) error
}

type PipeWriter interface {
	io.Writer
	io.Closer
	CloseWithError(err error) error
}

func Pipe() (PipeReader, PipeWriter)
func BufferedPipe(size int) (PipeReader, PipeWriter)

We could even go one step further and mirror make(chan T, size), only having func Pipe(size int) where Pipe(0) would return a synchronous/unbuffered pipe.

The net package could either mirror these proposed Go2 changes, or break with the connection to the io package entirely.

/cc @bcmills @dsnet @bradfitz @rogpeppe (and @neelance for net_fake.go, and @djs55 for vpnkit's loopbackconn.go)

@mvdan mvdan added the Proposal label Nov 14, 2018
@mvdan mvdan added this to the Proposal milestone Nov 14, 2018
@robpike
Copy link
Contributor

robpike commented Nov 14, 2018

I must be missing something. What's lacking from using bufio.NewReader on the read side and bufio.NewWriter on the write side?

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

I imagine that would be different; for example, it's not guaranteed that a large enough write could unblock a read, if the writer forgot to flush the buffered writer. This appears to be mentioned in vpnkit's implementation.

@DeedleFake
Copy link

DeedleFake commented Nov 14, 2018

I don't think that's necessary, actually. The implementation of (*bufio.Writer).Write() is

func (b *Writer) Write(p []byte) (nn int, err error) {
	for len(p) > b.Available() && b.err == nil {
		var n int
		if b.Buffered() == 0 {
			// Large write, empty buffer.
			// Write directly from p to avoid copy.
			n, b.err = b.wr.Write(p)
		} else {
			n = copy(b.buf[b.n:], p)
			b.n += n
			b.Flush()
		}
		nn += n
		p = p[n:]
	}
	if b.err != nil {
		return nn, b.err
	}
	n := copy(b.buf[b.n:], p)
	b.n += n
	nn += n
	return nn, nil
}

It flushes automatically whenever a write is larger than the buffer or the buffer has filled up, and there's a bufio.NewWriterSize() that lets you pick the buffer size. A quick look through the code makes it look like the buffer never gets resized, either. The only potentially awkward thing seems to be the

	// Is it already a Writer?
	b, ok := w.(*Writer)
	if ok && len(b.buf) >= size {
		return b
	}

from bufio.NewWriterSize(), which will result in a buffer of a larger size than requested if a writer with a bigger size is passed to it.

Maybe this is primarily a documentation issue. bufio.Writer doesn't seem to mention in its documentation when the flushes happen automatically, just that you should manually flush after the last write to make sure that the buffer gets emptied at the end.

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

It flushes automatically whenever a write is larger than the buffer or the buffer has filled up

Yes, a bufio.Writer will eventually flush on its own, but note that that's not my point. If the bufio.Writer buffer size is large enough, a write to this pipe wrapped by a bufio.Writer may not unblock a smaller read from the pipe.

An io.Pipe with a bufio.Reader and bufio.Writer is just semantically different to my proposal, besides being doubly buffered. Another way to put this - if they were equivalent, why are none of the existing implementations simply doing that?

@bcmills
Copy link
Contributor

bcmills commented Nov 14, 2018

a buffered pipe can still be useful or necessary in many situations.

“Buffered pipe” seems like a very vague concept — I'd like to understand the use-cases better.

How many of the existing implementations you found could this proposal replace, and how much (if any) overhead would it introduce relative to what they're using today? What properties do those implementations share in common, and what variations do they require?

@DeedleFake
Copy link

DeedleFake commented Nov 14, 2018

I wrote up a huge post, complete with playground example, read through the proposal again, and realized that I was completely misunderstanding this. You're asking for a pipe that's not just buffered, but asynchronous. You want the writing side to be able to just write something into a buffer and not have to worry about whether or not a reading side ever sees it, while a reading side would block until something was available, much like a buffered channel.

I'm still not exactly sure why just using both a bufio.Writer and a bufio.Reader on each end won't work, like this, but now I'm a lot less sure that it's not quite what you have in mind, either.

Another way to put this - if they were equivalent, why are none of the existing implementations simply doing that?

Not sure, but it's possible that it simply doesn't cover specific edge cases for some of them, which really isn't the job of the standard library, per se. For example, the golang/crypto implementation you linked to uses an actual network connection over the loopback interface. If all they wanted was buffering that kind of seems like overkill. It's also possible that some just didn't think of it. I've seen popular repositories with very odd design choices, such as the Go implementation of Mustache which basically re-implements strings.Reader in order to parse something out of a string instead of just using strings.Reader, or, better, just using an io.Reader in the first place.

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

“Buffered pipe” seems like a very vague concept

Can you think of a name that would be more specific, whatever its meaning may be?

How many of the existing implementations you found could this proposal replace, and how much (if any) overhead would it introduce relative to what they're using today? What properties do those implementations share in common, and what variations do they require?

From what I saw, most of the implementations are just a reimplementation of io.Pipe with an added internal buffer. That is, a write to the pipe won't block if there's space for it in the buffer, and a read from the pipe won't block if there are any bytes in the buffer. Otherwise, the buffered pipe behaves like our current synchronous pipe.

The only notable outliers were what I covered in the original post; http2 using a custom buffer implementation, and net requiring read and write deadlines. I didn't dig deep into all the third-party implmementations of buffered pipes, but perhaps the authors can chime in - this is why I pinged some of them.

You're asking for a pipe that's not just buffered, but asynchronous. [...] much like a buffered channel.

Correct. This is why I called it "buffered pipe", to mirror "buffered channel".

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

I also just realised I forgot to mention the parallel with buffered channels in the original post. That's entirely my bad.

@bcmills
Copy link
Contributor

bcmills commented Nov 14, 2018

To take the analogy with buffered channels a bit further, in my experience there are only three generally-useful buffer sizes for channels: 0 (synchronous messages), 1 (mutually-excluded data), and “an upper bound on the total number of elements sent” (asynchronous results).

(A program that is micro-optimized for cache locality might use buffers of other sizes, but the code that actually uses those buffers pretty much always ends up equivalent to what you'd write for a buffer size of 0 or 1.)

By analogy, the useful buffer sizes for an io.Pipe would be “synchronous”, “exactly one Write at a time”, and “unbounded”. None of those corresponds to something I could construct with a BufferedPipe(size int) function.

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

I think the "buffered channel" analogy only goes as far as the name and the high-level idea. It breaks quickly, because a buffered channel always passes messages intact, while an io.Pipe may break chunks of writes into smaller pieces. For example, one pipe write of 100 bytes can be unblocked by four reads of 20 bytes.

Similarly, a buffered pipe could mean that two pipe writes of 50 bytes end up as a single pipe read of 100 bytes, joining the two sent chunks of bytes.

We can agree that io.Pipe is "synchronous". I don't think "mutually excluded data" maps to a pipe, because we're not talking about messages that remain as themselves - chunks of bytes may be split or joined, as explained above.

As for “an upper bound on the total number of elements sent”, I think the equivalent in a buffered pipe would be "the maximum size of one message sent on the pipe", where one message could consist of many writes. In practice, programs tend to know the maximum message size, so they can use that knowledge to guarantee that they can write a message without blocking. (thanks to @rogpeppe for helping word this paragraph clearly)

As for practical examples - net_fake uses newBufferedPipe(65536), which I assume comes from the maximum size of a TCP packet.

@rogpeppe
Copy link
Contributor

By analogy, the useful buffer sizes for an io.Pipe would be “synchronous”, “exactly one Write at a time”, and “unbounded”. None of those corresponds to something I could construct with a BufferedPipe(size int) function.

FWIW you can get the "exactly one Write at a time" semantic with something like this: https://play.golang.org/p/YQ14XqARLV6

Messages don't have to be written in a single write though.

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

"the maximum size of one message sent on the pipe"

Another way to think about it would be that BufferedPipe(N*S) could mean "allow writing up to N messages of size S without blocking".

@bcmills
Copy link
Contributor

bcmills commented Nov 14, 2018

In practice, programs tend to know the maximum message size, so they can use that knowledge to guarantee that they can write a message without blocking.

If you know that the write won't block, why use a fixed buffer size at all? (That is, why should the pipe block when it reaches N bytes rather than expanding the buffer?)

For that matter, why use an io.Pipe rather than a chan string or chan []byte? (If you already have an upper bound on the message size, why is it important to break up that message into multiple writes?)

(If it's important to satisfy the io.Reader or io.Writer interface on one end or the other, it's easy enough to adapt a channel for that purpose: see my sketch on this thread.)

@mvdan
Copy link
Member Author

mvdan commented Nov 14, 2018

why use a fixed buffer size at all?

If you mean, as opposed to having the buffer grow as needed - I'd personally prefer to be in control of how much memory a program can use. There's no way to create a buffered channel without a fixed size, for example.

For that matter, why use an io.Pipe rather than a chan string or chan []byte?

Like you say further below, satisfying io.Reader and io.Writer is often useful.

it's easy enough to adapt a channel for that purpose

I wouldn't be so sure; I imagine that a full implementation with Read and Write would take significantly more code, especially when one considers supporting concurrent calls. I imagine that io.BufferedPipe would carry that guarantee from io.Pipe.

io.Pipe also has extra bits of useful API, such as being able to close either end of the pipe. It's certainly possible to adapt a channel for this, but I'd say it's very close to just implementing a buffered pipe from scratch, which is what plenty of users are already doing.

@bcmills
Copy link
Contributor

bcmills commented Nov 14, 2018

I imagine that a full implementation with Read and Write would take significantly more code, especially when one considers supporting concurrent calls.

It seems pretty trivial to me: concurrent calls just need a sync.Mutex on the Read side. What am I missing?
(https://play.golang.org/p/Y59967-bfQF)

It's certainly possible to adapt a channel for this, but I'd say it's very close to just implementing a buffered pipe from scratch, which is what plenty of users are already doing.

That may be true, but it doesn't contradict my point about buffer sizes: even if it is useful to provide a standard buffered io.Pipe, why should it buffer a specific number of bytes, rather than unlimited data or a specific number of Write calls? (That's at least part of the information I would hope to glean from looking at the existing implementations you intend to replace.)

@mikioh
Copy link
Contributor

mikioh commented Nov 15, 2018

The net package could either mirror these proposed Go2 changes, or break with the connection to the io package entirely.

Just FYI. As described in #28650, the existing net_fake.go is broken. Surely, networking stuff uses queueing theory by default, but it is just for supporting reliable data transfer and the stuff holds various complicated control as a cost. Without considering the control, future buffered pipe users might be trapped in #28650 or similar issues.

For adaptation of the net package, I feel like it might be better to search a cheap way of making queueing network between the io and net packages, in another issue.

@as
Copy link
Contributor

as commented Nov 15, 2018

@DeedleFake

I'm still not exactly sure why just using both a bufio.Writer and a bufio.Reader on each end won't work, like this, but now I'm a lot less sure that it's not quite what you have in mind, either.

It actually does work: https://play.golang.org/p/iGeuSFtebI-

The linked program in the prior example had a subtle bug: the pipe closed before the buffer flushed.

		bw := bufio.NewWriterSize(w, 64)
		defer bw.Flush()
		defer w.Close()

Because the pipe has no internal buffering, calling w.Close ends the relationship with the pipe. However, the pipe has already copied 64 bytes into bw. The consumer digests 64 bytes and receives the EOF from bw after the fact.

This is WAD.

Edit: Checking the error message on the buffer close method (in the original example) will yield a write on closed pipe error.

@mvdan
Copy link
Member Author

mvdan commented Nov 16, 2018

It seems pretty trivial to me: concurrent calls just need a sync.Mutex on the Read side. What am I missing?

Let me answer that question with another question :) If writing a correct buffered pipe is trivial, why do the implementations out there vary so wildly? In other words, it doesn't look like there's a well-understood canonical way to write a buffered pipe.

Perhaps the solution to that will be an example or piece of documentation somewhere; I honestly don't know. This proposal was more about starting a conversation than adding an API.

why should it buffer a specific number of bytes, rather than unlimited data or a specific number of Write calls?

That's a fair point. I haven't done a deep dive of the implmementations and use cases in the wild, so I don't have an answer right now. From what I've seen so far, buffering up to a number of bytes seems to be the most common need.

Without considering the control, future buffered pipe users might be trapped in #28650 or similar issues.

Thanks for chiming in - it does look like net_fake.go isn't a good example use case here, then.

@bcmills
Copy link
Contributor

bcmills commented Nov 16, 2018

If writing a correct buffered pipe is trivial, why do the implementations out there vary so wildly?

I was hoping you could answer from having looked at those implementations already. 🙂

I have a few hypotheses you could check.

  1. Perhaps the non-trivial implementations are a performance optimization (e.g. to reduce allocations, to improve cache locality, or to reduce context-switches).
  2. Perhaps the non-trivial implementations are then copied to a network socket or other protocol with a fixed MTU, and the pipe needs to buffer writes up to that MTU.
  3. Perhaps the “fixed-capacity buffered pipe” pattern is carried over from languages that lack goroutines and channels, and needs to be rethought in the context of Go (much like the “worker pool” pattern).
  4. Perhaps a few implementations needed fixed buffering due to (1) or (2), and the remainder copied those without questioning the reason for the buffer.

I took a quick glance at the links and saw a lot of sync.Cond usage, so I'm inclined to suspect (3), but I'd be interested to see a deeper analysis.

@VictoriaRaymond
Copy link

Chiming in as V2Ray is listed in the thread.

As @bcmills suggests, the buffered pipe in V2Ray is tailored for V2Ray's use case. It is more or less a [][2k]byte with some Mutex fixtures. The buffered pipe is designed to minimize bytes copying, as [2k]byte buffers are widely used in the project. The pipe itself is hardly sharable with other projects, unless they use [2k]byte as well.

The [][2k]byte can be replaced with chan [2k]byte. But in our case, each pipe is guaranteed to have only one reader. We find it easier to use slice in such case.

@neelance
Copy link
Member

While writing bufferedPipe of net_fake.go for the js/wasm architecture, my goal was to make the existing tests pass. For example there are tests that fail if the fake connection has an unbounded buffer. I did not second guess those requirements on a theoretical level since I wanted to avoid having to modify any tests while adding a new architecture.

@bcmills
Copy link
Contributor

bcmills commented Nov 16, 2018

@neelance, that sounds like (4), but with the “without questioning” part as a very intentional (and reasonable!) decision. 🙂

@DeedleFake
Copy link

The linked program in the prior example had a subtle bug: the pipe closed before the buffer flushed.

Ah, whoops. Got the defers backwards. I threw it together real quick and wasn't paying much attention.

@mvdan
Copy link
Member Author

mvdan commented Nov 26, 2018

Thanks all for the input. It seems clear that there are multiple understandings of what a "buffered pipe" should be and how it should behave, so I'm thinking it's best if we don't proceed with the proposal for now. I'll leave it open for a little longer, in case anyone has anything else to add.

@rsc
Copy link
Contributor

rsc commented Nov 28, 2018

Per @mvdan, closing proposal.
For what it's worth the reason I made io.Pipe unbuffered is that if you get your code working without assuming any buffering at all, it's certain to work with more buffering.

@rsc rsc closed this as completed Nov 28, 2018
@acomagu
Copy link

acomagu commented Jun 18, 2019

I implemented (variable-sized) buffered pipe by simply wrapping bytes.Buffer.
https://github.com/acomagu/bufpipe

jwgcarlson added a commit to lanikai/alohartc that referenced this issue Jun 19, 2019
This is necessary to be able to unblock `Buffer` readers that are
waiting for the next packet.

Note: The way `Buffer` is currently used it acts much like `io.Pipe`,
except that it allows packets to be buffered. There's probably a way to
achieve this without requiring an allocation on every `Write`. See also
golang/go#28790.
jwgcarlson added a commit to lanikai/alohartc that referenced this issue Jun 25, 2019
This is necessary to be able to unblock `Buffer` readers that are
waiting for the next packet.

Note: The way `Buffer` is currently used it acts much like `io.Pipe`,
except that it allows packets to be buffered. There's probably a way to
achieve this without requiring an allocation on every `Write`. See also
golang/go#28790.
@israel-lugo
Copy link

israel-lugo commented Sep 4, 2019

I would like to add a use case, and rationale for why having a limit to the input buffer size is important.

Consider a remote data source, streaming bytes via network / Internet. This can be e.g. video, or a large file. Consumer, running locally, receives the byte stream and does some processing on the data. Let's say it compresses the data, or some similar bursty behaviour.

The data source may vary in transfer speed, which is easy especially if travelling through a wide area network.

So, you have a data source with varying transfer speed, and a bursty consumer. You want a buffer in between, to avoid stalling either end. And you want the buffer to be asynchronous, so you can have one goroutine reading from the network and another consuming the data.

Think e.g. of a streaming RPC method, where you receive an arbitrarily large file. Your RPC handler spawns a data processing goroutine that reads from the buffered pipe. Then, the handler loops reading chunks of data from the stream and writing them to the buffered pipe. Finally, the handler waits on the data processing goroutine, and returns.

Why a limit to the buffer size? Because RAM has a physical limit, and incoming data doesn't. You may not want to accumulate 1 GiB of streamed input in RAM if the data processor becomes slower than the network for a while. Or maybe you can live with 1 GiB, but not with 10 GiB. Or 100 GiB. So you specify a buffer size.

Now, instead or letting one client kill the serving task, or bring the machine to a halt through thrashing, you get automatic throttling. Once the buffer is full, the RPC handler will block on the write to the buffered pipe, so the client will block on sending the stream.

For a more general use case, just think of anything where you have a producer sending arbitrary amounts of data at variable or bursty rates, and a consumer receiving at variable or bursty rates. Even writing to tape drives (or optical discs, for that matter) will benefit from a memory buffer.

Regarding the understanding of what a "buffered pipe" is, I would point to the arguably most basic of interprocess communication mechanisms, after which I presume io.Pipe was named. Having a bounded buffer and being asynchronous (in that writes are decoupled from reads) are pretty much the defining characteristics of a pipe :)

Could you please revisit the decision to close?

@ianlancetaylor
Copy link
Contributor

@israel-lugo This proposal was closed because it was not clear what it meant. As you seem to have a specific meaning in mind, I recommend that you open a new proposal describing the exact API you are after.

Note in particular that io.Pipe is synchronous, so to describe what you want as a buffered io.Pipe is going to mislead people. Don't call it that.

@golang golang locked and limited conversation to collaborators Sep 3, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests