浏览代码

sendMetadataRequest

Slowly figuring out how to do this...
Evan Huus 12 年之前
父节点
当前提交
a4e6a99593
共有 2 个文件被更改,包括 52 次插入24 次删除
  1. 28 6
      client.go
  2. 24 18
      protocol.go

+ 28 - 6
client.go

@@ -75,20 +75,42 @@ func (client *Client) read() (buf []byte, err error) {
 	return buf, nil
 }
 
-func (client *Client) sendRequest(api *API, body []byte) (err error) {
+func (client *Client) sendRequest(api API, body []byte) (err error) {
 	idLen, err := stringLen(client.id)
 	if err != nil {
 		return err
 	}
 	buf := make([]byte, 4+idLen+len(body))
-	binary.BigEndian.PutUint16(buf[0:2], uint16(api.key))
-	binary.BigEndian.PutUint16(buf[2:4], uint16(api.version))
-	binary.BigEndian.PutUint32(buf[4:8], uint32(client.correlation_id))
+	off := 0
+	binary.BigEndian.PutUint16(buf[off:], uint16(api.key))
+	off += 2
+	binary.BigEndian.PutUint16(buf[off:], uint16(api.version))
+	off += 2
+	binary.BigEndian.PutUint32(buf[off:], uint32(client.correlation_id))
+	off += 4
 	client.correlation_id++
-	err = encodeString(client.id, buf[8:])
+	off, err = encodeString(buf, off, client.id)
 	if err != nil {
 		return err
 	}
-	copy(buf[8+idLen:], body)
+	copy(buf[off:], body)
 	return client.write(buf)
 }
+
+func (client *Client) sendMetadataRequest(topics []string) (err error) {
+	bufLen := 4
+	for i := range topics {
+		bufLen += len(topics[i])
+	}
+	buf := make([]byte, bufLen)
+	off := 0
+	binary.BigEndian.PutUint32(buf[off:], uint32(len(topics)))
+	off += 4
+	for i := range topics {
+		off, err = encodeString(buf, off, &topics[i])
+		if err != nil {
+			return err
+		}
+	}
+	return client.sendRequest(REQUEST_METADATA, buf)
+}

+ 24 - 18
protocol.go

@@ -17,25 +17,28 @@ func stringLen(in *string) (n int, err error) {
 	return 2 + n, nil
 }
 
-func encodeString(in *string, buf []byte) (err error) {
-	if len(buf) < 2 {
-		return errors.New("kafka: buffer too short to encode any string")
+func encodeString(buf []byte, off int, in *string) (int, error) {
+	available := len(buf) - off
+	if available < 2 {
+		return -1, errors.New("kafka: buffer too short to encode any string")
 	}
 	n := -1
 	if in != nil {
 		n = len(*in)
 	}
 	if n > math.MaxInt16 {
-		return errors.New("kafka: string too long to encode")
+		return -1, errors.New("kafka: string too long to encode")
 	}
-	if n > len(buf) {
-		return errors.New("kafka: buffer too short to encode string")
+	if n > available-2 {
+		return -1, errors.New("kafka: buffer too short to encode string")
 	}
-	binary.BigEndian.PutUint16(buf, uint16(n))
+	binary.BigEndian.PutUint16(buf[off:], uint16(n))
+	off += 2
 	if n > 0 {
-		copy(buf[2:], *in)
+		copy(buf[off:], *in)
 	}
-	return nil
+	off += n
+	return off, nil
 }
 
 func decodeString(buf []byte) (out *string, err error) {
@@ -70,25 +73,28 @@ func bytesLen(in *[]byte) (n int, err error) {
 	return 4 + n, nil
 }
 
-func encodeBytes(in *[]byte, buf []byte) (err error) {
-	if len(buf) < 4 {
-		return errors.New("kafka: buffer too short to encode any bytes")
+func encodeBytes(buf []byte, off int, in *[]byte) (int, error) {
+	available := len(buf) - off
+	if available < 4 {
+		return -1, errors.New("kafka: buffer too short to encode any bytes")
 	}
 	n := -1
 	if in != nil {
 		n = len(*in)
 	}
 	if n > math.MaxInt32 {
-		return errors.New("kafka: bytes too long to encode")
+		return -1, errors.New("kafka: bytes too long to encode")
 	}
-	if n > len(buf) {
-		return errors.New("kafka: buffer too short to encode bytes")
+	if n > available-4 {
+		return -1, errors.New("kafka: buffer too short to encode bytes")
 	}
-	binary.BigEndian.PutUint32(buf, uint32(n))
+	binary.BigEndian.PutUint32(buf[off:], uint32(n))
+	off += 4
 	if n > 0 {
-		copy(buf[4:], *in)
+		copy(buf[off:], *in)
 	}
-	return nil
+	off += n
+	return off, nil
 }
 
 func decodebyte(buf []byte) (out *[]byte, err error) {