浏览代码

Merge pull request #85 from eapache/proto-updates

Update a few odds and ends to the latest spec
Willem van Bergen 11 年之前
父节点
当前提交
3364fa390e
共有 3 个文件被更改,包括 35 次插入26 次删除
  1. 23 17
      errors.go
  2. 10 8
      offset_commit_request.go
  3. 2 1
      offset_commit_request_test.go

+ 23 - 17
errors.go

@@ -76,20 +76,22 @@ type KError int16
 
 // Numeric error codes returned by the Kafka server.
 const (
-	NoError                  KError = 0
-	Unknown                  KError = -1
-	OffsetOutOfRange         KError = 1
-	InvalidMessage           KError = 2
-	UnknownTopicOrPartition  KError = 3
-	InvalidMessageSize       KError = 4
-	LeaderNotAvailable       KError = 5
-	NotLeaderForPartition    KError = 6
-	RequestTimedOut          KError = 7
-	BrokerNotAvailable       KError = 8
-	ReplicaNotAvailable      KError = 9
-	MessageSizeTooLarge      KError = 10
-	StaleControllerEpochCode KError = 11
-	OffsetMetadataTooLarge   KError = 12
+	NoError                         KError = 0
+	Unknown                         KError = -1
+	OffsetOutOfRange                KError = 1
+	InvalidMessage                  KError = 2
+	UnknownTopicOrPartition         KError = 3
+	InvalidMessageSize              KError = 4
+	LeaderNotAvailable              KError = 5
+	NotLeaderForPartition           KError = 6
+	RequestTimedOut                 KError = 7
+	BrokerNotAvailable              KError = 8
+	MessageSizeTooLarge             KError = 10
+	StaleControllerEpochCode        KError = 11
+	OffsetMetadataTooLarge          KError = 12
+	OffsetsLoadInProgress           KError = 14
+	ConsumerCoordinatorNotAvailable KError = 15
+	NotCoordinatorForConsumer       KError = 16
 )
 
 func (err KError) Error() string {
@@ -116,14 +118,18 @@ func (err KError) Error() string {
 		return "kafka server: Request exceeded the user-specified time limit in the request."
 	case BrokerNotAvailable:
 		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
-	case ReplicaNotAvailable:
-		return "kafka server: Replica not available. No replicas are available to read from this topic-partition."
 	case MessageSizeTooLarge:
 		return "kafka server: Message was too large, server rejected it to avoid allocation error."
 	case StaleControllerEpochCode:
-		return "kafka server: Stale controller epoch code. ???"
+		return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
 	case OffsetMetadataTooLarge:
 		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
+	case OffsetsLoadInProgress:
+		return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
+	case ConsumerCoordinatorNotAvailable:
+		return "kafka server: Offset's topic has not yet been created."
+	case NotCoordinatorForConsumer:
+		return "kafka server: Request was for a consumer group that is not coordinated by this broker."
 	}
 
 	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)

+ 10 - 8
offset_commit_request.go

@@ -1,12 +1,18 @@
 package sarama
 
+// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
+// tells the broker to set the timestamp to the time at which the request was received.
+const ReceiveTime int64 = -1
+
 type offsetCommitRequestBlock struct {
-	offset   int64
-	metadata string
+	offset    int64
+	timestamp int64
+	metadata  string
 }
 
 func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
 	pe.putInt64(r.offset)
+	pe.putInt64(r.timestamp)
 	return pe.putString(r.metadata)
 }
 
@@ -52,7 +58,7 @@ func (r *OffsetCommitRequest) version() int16 {
 	return 0
 }
 
-func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, metadata string) {
+func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
 	}
@@ -61,9 +67,5 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i
 		r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
 	}
 
-	tmp := new(offsetCommitRequestBlock)
-	tmp.offset = offset
-	tmp.metadata = metadata
-
-	r.blocks[topic][partitionID] = tmp
+	r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
 }

+ 2 - 1
offset_commit_request_test.go

@@ -18,6 +18,7 @@ var (
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x52, 0x21,
 		0x00, 0x00, 0x00, 0x00, 0xDE, 0xAD, 0xBE, 0xEF,
+		0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x08, 'm', 'e', 't', 'a', 'd', 'a', 't', 'a'}
 )
 
@@ -28,6 +29,6 @@ func TestOffsetCommitRequest(t *testing.T) {
 	request.ConsumerGroup = "foobar"
 	testEncodable(t, "no blocks", request, offsetCommitRequestNoBlocks)
 
-	request.AddBlock("topic", 0x5221, 0xDEADBEEF, "metadata")
+	request.AddBlock("topic", 0x5221, 0xDEADBEEF, ReceiveTime, "metadata")
 	testEncodable(t, "one block", request, offsetCommitRequestOneBlock)
 }