瀏覽代碼

SendSimpleMessage for producer, doesn't quite work yet

Evan Huus 12 年之前
父節點
當前提交
4960166afb
共有 8 個文件被更改,包括 80 次插入3 次删除
  1. 3 0
      broker.go
  2. 27 1
      broker_manager.go
  3. 2 2
      message.go
  4. 6 0
      message_set.go
  5. 14 0
      partitionChooser.go
  6. 9 0
      produce_request.go
  7. 7 0
      produce_response.go
  8. 12 0
      producer.go

+ 3 - 0
broker.go

@@ -94,6 +94,9 @@ func (b *broker) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
+	if b.port > math.MaxUint16 {
+		return DecodingError{"Broker port > 65536"}
+	}
 
 	err = b.connect()
 	if err != nil {

+ 27 - 1
broker_manager.go

@@ -78,7 +78,33 @@ func (bm *brokerManager) getValidLeader(topic string, partition_id int32) (*brok
 	return leader, nil
 }
 
-func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, res decoder) error {
+func (bm *brokerManager) choosePartition(topic string, p partitionChooser) (int32, error) {
+	bm.lock.RLock()
+	id_map := bm.partitions[topic]
+	if id_map == nil {
+		bm.lock.RUnlock()
+		err := bm.refreshTopic(topic)
+		if err != nil {
+			return -1, err
+		}
+		bm.lock.RLock()
+		id_map = bm.partitions[topic]
+		if id_map == nil {
+			bm.lock.RUnlock()
+			return -1, UNKNOWN_TOPIC_OR_PARTITION
+		}
+	}
+	partitions := make([]int32, len(id_map))
+	i := 0
+	for id, _ := range id_map {
+		partitions[i] = id
+		i++
+	}
+	bm.lock.RUnlock()
+	return p.choosePartition(partitions), nil
+}
+
+func (bm *brokerManager) sendToPartition(topic string, partition int32, req encoder, res decoder) error {
 	b, err := bm.getValidLeader(topic, partition)
 	if err != nil {
 		return err

+ 2 - 2
message.go

@@ -71,7 +71,7 @@ func (m *message) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-func newMessageFromString(in string) message {
+func newMessageFromString(in string) *message {
 	buf := make([]byte, len(in))
-	return message{COMPRESSION_NONE, nil, &buf}
+	return &message{value: &buf}
 }

+ 6 - 0
message_set.go

@@ -60,3 +60,9 @@ func (ms *messageSet) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func newSingletonMessageSet(msg *message) *messageSet {
+	tmp := make([]*messageSetBlock, 1)
+	tmp[0] = &messageSetBlock{msg: *msg}
+	return &messageSet{tmp}
+}

+ 14 - 0
partitionChooser.go

@@ -0,0 +1,14 @@
+package kafka
+
+import "math/rand"
+
+type partitionChooser interface {
+	choosePartition(options []int32) int32
+}
+
+type randomPartitioner struct {
+}
+
+func (p randomPartitioner) choosePartition(options []int32) int32 {
+	return options[rand.Intn(len(options))]
+}

+ 9 - 0
produce_request.go

@@ -39,3 +39,12 @@ func (p *produceRequest) encode(pe packetEncoder) {
 		(&p.topics[i]).encode(pe)
 	}
 }
+
+func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
+	req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
+	req.topics[0].topic = &topic
+	req.topics[0].partitions = make([]produceRequestPartitionBlock, 1)
+	req.topics[0].partitions[0].partition = partition
+	req.topics[0].partitions[0].msgSet = set
+	return req
+}

+ 7 - 0
produce_response.go

@@ -1 +1,8 @@
 package kafka
+
+type produceResponse struct {
+}
+
+func (pr *produceResponse) decode(pd packetDecoder) (err error) {
+	return nil
+}

+ 12 - 0
producer.go

@@ -8,3 +8,15 @@ type Producer struct {
 func NewProducer(client *Client, topic string) *Producer {
 	return &Producer{client, topic}
 }
+
+func (p *Producer) SendSimpleMessage(in string) error {
+	partition, err := p.client.brokers.choosePartition(p.topic, randomPartitioner{})
+	if err != nil {
+		return err
+	}
+
+	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(newMessageFromString(in)))
+	response := new(produceResponse)
+
+	return p.client.brokers.sendToPartition(p.topic, partition, request, response)
+}