Browse Source

Publish some necessary properties and types

Evan Huus 11 years ago
parent
commit
c8a7da6540
3 changed files with 36 additions and 36 deletions
  1. 6 6
      fetch_response.go
  2. 27 27
      message_set.go
  3. 3 3
      produce_request.go

+ 6 - 6
fetch_response.go

@@ -1,18 +1,18 @@
 package kafka
 
 type FetchResponseBlock struct {
-	err                 KError
-	highWaterMarkOffset int64
-	msgSet              messageSet
+	Err                 KError
+	HighWaterMarkOffset int64
+	MsgSet              MessageSet
 }
 
 func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
-	pr.err, err = pd.getError()
+	pr.Err, err = pd.getError()
 	if err != nil {
 		return err
 	}
 
-	pr.highWaterMarkOffset, err = pd.getInt64()
+	pr.HighWaterMarkOffset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
@@ -26,7 +26,7 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	err = (&pr.msgSet).decode(msgSetDecoder)
+	err = (&pr.MsgSet).decode(msgSetDecoder)
 
 	return err
 }

+ 27 - 27
message_set.go

@@ -1,19 +1,19 @@
 package kafka
 
-type messageSetBlock struct {
-	offset int64
-	msg    *Message
+type MessageBlock struct {
+	Offset int64
+	Msg    *Message
 }
 
-func (msb *messageSetBlock) encode(pe packetEncoder) {
-	pe.putInt64(msb.offset)
+func (msb *MessageBlock) encode(pe packetEncoder) {
+	pe.putInt64(msb.Offset)
 	pe.pushLength32()
-	msb.msg.encode(pe)
+	msb.Msg.encode(pe)
 	pe.pop()
 }
 
-func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
-	msb.offset, err = pd.getInt64()
+func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
+	msb.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
@@ -23,8 +23,8 @@ func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	msb.msg = new(Message)
-	err = msb.msg.decode(pd)
+	msb.Msg = new(Message)
+	err = msb.Msg.decode(pd)
 	if err != nil {
 		return err
 	}
@@ -37,39 +37,39 @@ func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-type messageSet struct {
-	msgs []*messageSetBlock
+type MessageSet struct {
+	Messages []*MessageBlock
 }
 
-func (ms *messageSet) encode(pe packetEncoder) {
-	for i := range ms.msgs {
-		ms.msgs[i].encode(pe)
+func (ms *MessageSet) encode(pe packetEncoder) {
+	for i := range ms.Messages {
+		ms.Messages[i].encode(pe)
 	}
 }
 
-func (ms *messageSet) decode(pd packetDecoder) (err error) {
-	ms.msgs = nil
+func (ms *MessageSet) decode(pd packetDecoder) (err error) {
+	ms.Messages = nil
 
 	for pd.remaining() > 0 {
-		msb := new(messageSetBlock)
+		msb := new(MessageBlock)
 		err = msb.decode(pd)
 		if err != nil {
 			return err
 		}
-		ms.msgs = append(ms.msgs, msb)
+		ms.Messages = append(ms.Messages, msb)
 	}
 
 	return nil
 }
 
-func newMessageSet() *messageSet {
-	set := new(messageSet)
-	set.msgs = make([]*messageSetBlock, 0)
-	return set
+func (ms *MessageSet) addMessage(msg *Message) {
+	block := new(MessageBlock)
+	block.Msg = msg
+	ms.Messages = append(ms.Messages, block)
 }
 
-func (ms *messageSet) addMessage(msg *Message) {
-	block := new(messageSetBlock)
-	block.msg = msg
-	ms.msgs = append(ms.msgs, block)
+func newMessageSet() *MessageSet {
+	set := new(MessageSet)
+	set.Messages = make([]*MessageBlock, 0)
+	return set
 }

+ 3 - 3
produce_request.go

@@ -10,7 +10,7 @@ const (
 type ProduceRequest struct {
 	ResponseCondition int16
 	Timeout           int32
-	msgSets           map[*string]map[int32]*messageSet
+	msgSets           map[*string]map[int32]*MessageSet
 }
 
 func (p *ProduceRequest) encode(pe packetEncoder) {
@@ -37,11 +37,11 @@ func (p *ProduceRequest) version() int16 {
 
 func (p *ProduceRequest) AddMessage(topic *string, partition int32, msg *Message) {
 	if p.msgSets == nil {
-		p.msgSets = make(map[*string]map[int32]*messageSet)
+		p.msgSets = make(map[*string]map[int32]*MessageSet)
 	}
 
 	if p.msgSets[topic] == nil {
-		p.msgSets[topic] = make(map[int32]*messageSet)
+		p.msgSets[topic] = make(map[int32]*MessageSet)
 	}
 
 	set := p.msgSets[topic][partition]