Просмотр исходного кода

Hide the fact that ProduceRequests use messageSets

Just expose straight messages, and build the messageSets dynamically.
Evan Huus 12 лет назад
Родитель
Сommit
805772fbb8
3 измененных файлов с 24 добавлено и 10 удалено
  1. 14 7
      message_set.go
  2. 9 2
      produce_request.go
  3. 1 1
      producer.go

+ 14 - 7
message_set.go

@@ -2,13 +2,13 @@ package kafka
 
 type messageSetBlock struct {
 	offset int64
-	msg    Message
+	msg    *Message
 }
 
 func (msb *messageSetBlock) encode(pe packetEncoder) {
 	pe.putInt64(msb.offset)
 	pe.pushLength32()
-	(&msb.msg).encode(pe)
+	msb.msg.encode(pe)
 	pe.pop()
 }
 
@@ -23,7 +23,8 @@ func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	err = (&msb.msg).decode(pd)
+	msb.msg = new(Message)
+	err = msb.msg.decode(pd)
 	if err != nil {
 		return err
 	}
@@ -61,8 +62,14 @@ func (ms *messageSet) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func newSingletonMessageSet(msg *Message) *messageSet {
-	tmp := make([]*messageSetBlock, 1)
-	tmp[0] = &messageSetBlock{msg: *msg}
-	return &messageSet{tmp}
+func newMessageSet() *messageSet {
+	set := new(messageSet)
+	set.msgs = make([]*messageSetBlock, 0)
+	return set
+}
+
+func (ms *messageSet) addMessage(msg *Message) {
+	block := new(messageSetBlock)
+	block.msg = msg
+	ms.msgs = append(ms.msgs, block)
 }

+ 9 - 2
produce_request.go

@@ -34,7 +34,7 @@ func (p *ProduceRequest) version() int16 {
 	return 0
 }
 
-func (p *ProduceRequest) AddMessageSet(topic *string, partition int32, set *messageSet) {
+func (p *ProduceRequest) AddMessage(topic *string, partition int32, msg *Message) {
 	if p.MsgSets == nil {
 		p.MsgSets = make(map[*string]map[int32]*messageSet)
 	}
@@ -43,5 +43,12 @@ func (p *ProduceRequest) AddMessageSet(topic *string, partition int32, set *mess
 		p.MsgSets[topic] = make(map[int32]*messageSet)
 	}
 
-	p.MsgSets[topic][partition] = set
+	set := p.MsgSets[topic][partition]
+
+	if set == nil {
+		set = newMessageSet()
+		p.MsgSets[topic][partition] = set
+	}
+
+	set.addMessage(msg)
 }

+ 1 - 1
producer.go

@@ -58,7 +58,7 @@ func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
 	}
 
 	request := &ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
-	request.AddMessageSet(&p.topic, partition, newSingletonMessageSet(&Message{Key: keyBytes, Value: valBytes}))
+	request.AddMessage(&p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)
 	if err != nil {