Kaynağa Gözat

Merge pull request #682 from Shopify/fix-version-check-race

Fix race in broker version check
Evan Huus 9 yıl önce
ebeveyn
işleme
1deca4b0fa

+ 4 - 0
api_versions_request.go

@@ -18,3 +18,7 @@ func (r *ApiVersionsRequest) key() int16 {
 func (r *ApiVersionsRequest) version() int16 {
 	return 0
 }
+
+func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
+	return V0_10_0_0
+}

+ 4 - 0
api_versions_response.go

@@ -80,3 +80,7 @@ func (r *ApiVersionsResponse) key() int16 {
 func (r *ApiVersionsResponse) version() int16 {
 	return 0
 }
+
+func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
+	return V0_10_0_0
+}

+ 4 - 15
broker.go

@@ -198,21 +198,6 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e
 }
 
 func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
-	switch request.version() {
-	case 0:
-		break
-	case 1:
-		if !b.conf.Version.IsAtLeast(V0_9_0_0) {
-			return nil, ErrUnsupportedVersion
-		}
-	case 2:
-		if !b.conf.Version.IsAtLeast(V0_10_0_0) {
-			return nil, ErrUnsupportedVersion
-		}
-	default:
-		return nil, ErrUnsupportedVersion
-	}
-
 	var response *ProduceResponse
 	var err error
 
@@ -343,6 +328,10 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 		return nil, ErrNotConnected
 	}
 
+	if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
+		return nil, ErrUnsupportedVersion
+	}
+
 	req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
 	buf, err := encode(req)
 	if err != nil {

+ 3 - 1
broker_test.go

@@ -56,7 +56,9 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	defer mb.Close()
 
 	broker := NewBroker(mb.Addr())
-	err := broker.Open(nil)
+	conf := NewConfig()
+	conf.Version = V0_10_0_0
+	err := broker.Open(conf)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 1 - 1
config.go

@@ -267,7 +267,7 @@ func NewConfig() *Config {
 
 	c.ClientID = defaultClientID
 	c.ChannelBufferSize = 256
-	c.Version = V0_8_2_0
+	c.Version = minVersion
 
 	return c
 }

+ 4 - 0
consumer_metadata_request.go

@@ -20,3 +20,7 @@ func (r *ConsumerMetadataRequest) key() int16 {
 func (r *ConsumerMetadataRequest) version() int16 {
 	return 0
 }
+
+func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion {
+	return V0_8_2_0
+}

+ 4 - 0
consumer_metadata_response.go

@@ -79,3 +79,7 @@ func (r *ConsumerMetadataResponse) key() int16 {
 func (r *ConsumerMetadataResponse) version() int16 {
 	return 0
 }
+
+func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
+	return V0_8_2_0
+}

+ 4 - 0
describe_groups_request.go

@@ -21,6 +21,10 @@ func (r *DescribeGroupsRequest) version() int16 {
 	return 0
 }
 
+func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}
+
 func (r *DescribeGroupsRequest) AddGroup(group string) {
 	r.Groups = append(r.Groups, group)
 }

+ 4 - 0
describe_groups_response.go

@@ -43,6 +43,10 @@ func (r *DescribeGroupsResponse) version() int16 {
 	return 0
 }
 
+func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}
+
 type GroupDescription struct {
 	Err          KError
 	GroupId      string

+ 4 - 0
fetch_request.go

@@ -106,6 +106,10 @@ func (f *FetchRequest) version() int16 {
 	return 0
 }
 
+func (r *FetchRequest) requiredVersion() KafkaVersion {
+	return minVersion
+}
+
 func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
 	if f.blocks == nil {
 		f.blocks = make(map[string]map[int32]*fetchRequestBlock)

+ 4 - 0
fetch_response.go

@@ -124,6 +124,10 @@ func (r *FetchResponse) version() int16 {
 	return 0
 }
 
+func (r *FetchResponse) requiredVersion() KafkaVersion {
+	return minVersion
+}
+
 func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
 	if fr.Blocks == nil {
 		return nil

+ 4 - 0
heartbeat_request.go

@@ -41,3 +41,7 @@ func (r *HeartbeatRequest) key() int16 {
 func (r *HeartbeatRequest) version() int16 {
 	return 0
 }
+
+func (r *HeartbeatRequest) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
heartbeat_response.go

@@ -26,3 +26,7 @@ func (r *HeartbeatResponse) key() int16 {
 func (r *HeartbeatResponse) version() int16 {
 	return 0
 }
+
+func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
join_group_request.go

@@ -85,6 +85,10 @@ func (r *JoinGroupRequest) version() int16 {
 	return 0
 }
 
+func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}
+
 func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
 	if r.GroupProtocols == nil {
 		r.GroupProtocols = make(map[string][]byte)

+ 4 - 0
join_group_response.go

@@ -108,3 +108,7 @@ func (r *JoinGroupResponse) key() int16 {
 func (r *JoinGroupResponse) version() int16 {
 	return 0
 }
+
+func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
leave_group_request.go

@@ -34,3 +34,7 @@ func (r *LeaveGroupRequest) key() int16 {
 func (r *LeaveGroupRequest) version() int16 {
 	return 0
 }
+
+func (r *LeaveGroupRequest) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
leave_group_response.go

@@ -26,3 +26,7 @@ func (r *LeaveGroupResponse) key() int16 {
 func (r *LeaveGroupResponse) version() int16 {
 	return 0
 }
+
+func (r *LeaveGroupResponse) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
list_groups_request.go

@@ -18,3 +18,7 @@ func (r *ListGroupsRequest) key() int16 {
 func (r *ListGroupsRequest) version() int16 {
 	return 0
 }
+
+func (r *ListGroupsRequest) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
list_groups_response.go

@@ -62,3 +62,7 @@ func (r *ListGroupsResponse) key() int16 {
 func (r *ListGroupsResponse) version() int16 {
 	return 0
 }
+
+func (r *ListGroupsResponse) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 4 - 0
metadata_request.go

@@ -46,3 +46,7 @@ func (mr *MetadataRequest) key() int16 {
 func (mr *MetadataRequest) version() int16 {
 	return 0
 }
+
+func (mr *MetadataRequest) requiredVersion() KafkaVersion {
+	return minVersion
+}

+ 4 - 0
metadata_response.go

@@ -184,6 +184,10 @@ func (r *MetadataResponse) version() int16 {
 	return 0
 }
 
+func (r *MetadataResponse) requiredVersion() KafkaVersion {
+	return minVersion
+}
+
 // testing API
 
 func (m *MetadataResponse) AddBroker(addr string, id int32) {

+ 12 - 1
offset_commit_request.go

@@ -49,7 +49,7 @@ type OffsetCommitRequest struct {
 	// Version can be:
 	// - 0 (kafka 0.8.1 and later)
 	// - 1 (kafka 0.8.2 and later)
-	// - 2 (kafka 0.8.3 and later)
+	// - 2 (kafka 0.9.0 and later)
 	Version int16
 	blocks  map[string]map[int32]*offsetCommitRequestBlock
 }
@@ -166,6 +166,17 @@ func (r *OffsetCommitRequest) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	case 2:
+		return V0_9_0_0
+	default:
+		return minVersion
+	}
+}
+
 func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)

+ 4 - 0
offset_commit_response.go

@@ -79,3 +79,7 @@ func (r *OffsetCommitResponse) key() int16 {
 func (r *OffsetCommitResponse) version() int16 {
 	return 0
 }
+
+func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
+	return minVersion
+}

+ 9 - 0
offset_fetch_request.go

@@ -63,6 +63,15 @@ func (r *OffsetFetchRequest) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	default:
+		return minVersion
+	}
+}
+
 func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
 	if r.partitions == nil {
 		r.partitions = make(map[string][]int32)

+ 4 - 0
offset_fetch_response.go

@@ -114,6 +114,10 @@ func (r *OffsetFetchResponse) version() int16 {
 	return 0
 }
 
+func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
+	return minVersion
+}
+
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
 	if r.Blocks == nil {
 		return nil

+ 1 - 0
offset_manager_test.go

@@ -11,6 +11,7 @@ func initOffsetManager(t *testing.T) (om OffsetManager,
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
+	config.Version = V0_9_0_0
 
 	broker = NewMockBroker(t, 1)
 	coordinator = NewMockBroker(t, 2)

+ 4 - 0
offset_request.go

@@ -96,6 +96,10 @@ func (r *OffsetRequest) version() int16 {
 	return 0
 }
 
+func (r *OffsetRequest) requiredVersion() KafkaVersion {
+	return minVersion
+}
+
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)

+ 4 - 0
offset_response.go

@@ -123,6 +123,10 @@ func (r *OffsetResponse) version() int16 {
 	return 0
 }
 
+func (r *OffsetResponse) requiredVersion() KafkaVersion {
+	return minVersion
+}
+
 // testing API
 
 func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {

+ 11 - 0
produce_request.go

@@ -114,6 +114,17 @@ func (p *ProduceRequest) version() int16 {
 	return p.Version
 }
 
+func (p *ProduceRequest) requiredVersion() KafkaVersion {
+	switch p.Version {
+	case 1:
+		return V0_9_0_0
+	case 2:
+		return V0_10_0_0
+	default:
+		return minVersion
+	}
+}
+
 func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
 	if p.msgSets == nil {
 		p.msgSets = make(map[string]map[int32]*MessageSet)

+ 11 - 0
produce_response.go

@@ -110,6 +110,17 @@ func (r *ProduceResponse) version() int16 {
 	return r.Version
 }
 
+func (r *ProduceResponse) requiredVersion() KafkaVersion {
+	switch r.Version {
+	case 1:
+		return V0_9_0_0
+	case 2:
+		return V0_10_0_0
+	default:
+		return minVersion
+	}
+}
+
 func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
 	if pr.Blocks == nil {
 		return nil

+ 1 - 0
request.go

@@ -11,6 +11,7 @@ type protocolBody interface {
 	versionedDecoder
 	key() int16
 	version() int16
+	requiredVersion() KafkaVersion
 }
 
 type request struct {

+ 4 - 0
sasl_handshake_request.go

@@ -27,3 +27,7 @@ func (r *SaslHandshakeRequest) key() int16 {
 func (r *SaslHandshakeRequest) version() int16 {
 	return 0
 }
+
+func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion {
+	return V0_10_0_0
+}

+ 4 - 0
sasl_handshake_response.go

@@ -32,3 +32,7 @@ func (r *SaslHandshakeResponse) key() int16 {
 func (r *SaslHandshakeResponse) version() int16 {
 	return 0
 }
+
+func (r *SaslHandshakeResponse) requiredVersion() KafkaVersion {
+	return V0_10_0_0
+}

+ 4 - 0
sync_group_request.go

@@ -77,6 +77,10 @@ func (r *SyncGroupRequest) version() int16 {
 	return 0
 }
 
+func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}
+
 func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
 	if r.GroupAssignments == nil {
 		r.GroupAssignments = make(map[string][]byte)

+ 4 - 0
sync_group_response.go

@@ -34,3 +34,7 @@ func (r *SyncGroupResponse) key() int16 {
 func (r *SyncGroupResponse) version() int16 {
 	return 0
 }
+
+func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
+	return V0_9_0_0
+}

+ 7 - 6
utils.go

@@ -140,10 +140,11 @@ func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
 
 // Effective constants defining the supported kafka versions.
 var (
-	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
-	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
-	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
-	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
-	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
-	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
+	V0_8_2_0   = newKafkaVersion(0, 8, 2, 0)
+	V0_8_2_1   = newKafkaVersion(0, 8, 2, 1)
+	V0_8_2_2   = newKafkaVersion(0, 8, 2, 2)
+	V0_9_0_0   = newKafkaVersion(0, 9, 0, 0)
+	V0_9_0_1   = newKafkaVersion(0, 9, 0, 1)
+	V0_10_0_0  = newKafkaVersion(0, 10, 0, 0)
+	minVersion = V0_8_2_0
 )