|
|
@@ -1,7 +1,6 @@
|
|
|
package kafka
|
|
|
|
|
|
import (
|
|
|
- "encoding/binary"
|
|
|
"io"
|
|
|
"math"
|
|
|
"net"
|
|
|
@@ -114,7 +113,7 @@ func (b *broker) sendRequestLoop() {
|
|
|
}
|
|
|
|
|
|
func (b *broker) rcvresponsePromiseLoop() {
|
|
|
- header := make([]byte, 4)
|
|
|
+ header := make([]byte, 8)
|
|
|
for response := range b.responses {
|
|
|
_, err := io.ReadFull(b.conn, header)
|
|
|
if err != nil {
|
|
|
@@ -122,18 +121,15 @@ func (b *broker) rcvresponsePromiseLoop() {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- length := int32(binary.BigEndian.Uint32(header))
|
|
|
+ decoder := realDecoder{raw: header}
|
|
|
+ length, _ := decoder.getInt32()
|
|
|
if length <= 4 || length > 2*math.MaxUint16 {
|
|
|
b.forceDisconnect(&response, DecodingError{})
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- _, err = io.ReadFull(b.conn, header)
|
|
|
- if err != nil {
|
|
|
- b.forceDisconnect(&response, err)
|
|
|
- return
|
|
|
- }
|
|
|
- if response.correlation_id != int32(binary.BigEndian.Uint32(header)) {
|
|
|
+ corr_id, _ := decoder.getInt32()
|
|
|
+ if response.correlation_id != corr_id {
|
|
|
b.forceDisconnect(&response, DecodingError{})
|
|
|
return
|
|
|
}
|