Browse Source

Last bits of the framework seem to be in place

Evan Huus 11 years ago
parent
commit
9eaf799481
3 changed files with 37 additions and 4 deletions
  1. 7 4
      client.go
  2. 14 0
      encoderDecoder.go
  3. 16 0
      request.go

+ 7 - 4
client.go

@@ -116,17 +116,20 @@ func (client *Client) encode(api API, body []byte, pe packetEncoder) {
 	//pe.putRaw(body)
 }
 
-func (client *Client) sendRequest(api API, body []byte) (chan []byte, error) {
+func (client *Client) sendRequest(api API, body encoder) (chan []byte, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 
-	client.encode(api, body, &prepEnc)
+	req := request{api, client.correlation_id, client.id, body}
+
+	req.encode(&prepEnc)
 	if prepEnc.err {
 		return nil, errors.New("kafka encoding error")
 	}
 
-	realEnc.raw = make([]byte, prepEnc.length)
-	client.encode(api, body, &realEnc)
+	realEnc.raw = make([]byte, prepEnc.length+4)
+	realEnc.putInt32(int32(prepEnc.length))
+	req.encode(&realEnc)
 
 	// we buffer one packet so that we can send our packet to the request queue without
 	// blocking, and so that the responses can be sent to us async if we want them

+ 14 - 0
encoderDecoder.go

@@ -0,0 +1,14 @@
+package kafka
+
+type encoder interface {
+	encode(pe packetEncoder)
+}
+
+type decoder interface {
+	decoder(pd packetDecoder)
+}
+
+type encoderDecoder interface {
+	encoder
+	decoder
+}

+ 16 - 0
request.go

@@ -0,0 +1,16 @@
+package kafka
+
+type request struct {
+	api            API
+	correlation_id int32
+	id             *string
+	body           encoder
+}
+
+func (r *request) encode(pe packetEncoder) {
+	pe.putInt16(r.api.key)
+	pe.putInt16(r.api.version)
+	pe.putInt32(r.correlation_id)
+	pe.putString(r.id)
+	r.body.encode(pe)
+}