Reduces the number of read syscalls caused by small, sequential reads on the broker's net.Conn.
@@ -85,6 +85,7 @@ func (b *Broker) Open(conf *Config) error {
Logger.Printf("Failed to connect to broker %s: %s\n", b.addr, b.connErr)
return
}
+ b.conn = NewBufConn(b.conn)
b.conf = conf
b.done = make(chan bool)
@@ -1,6 +1,10 @@
package sarama
-import "sort"
+import (
+ "bufio"
+ "net"
+ "sort"
+)
type none struct{}
@@ -87,3 +91,21 @@ func (b ByteEncoder) Encode() ([]byte, error) {
func (b ByteEncoder) Length() int {
return len(b)
+
+// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
+// reads that trigger syscalls.
+type bufConn struct {
+ net.Conn
+ buf *bufio.Reader
+}
+func NewBufConn(conn net.Conn) *bufConn {
+ return &bufConn{
+ Conn: conn,
+ buf: bufio.NewReader(conn),
+ }
+func (bc *bufConn) Read(b []byte) (n int, err error) {
+ return bc.buf.Read(b)