소스 검색

Merge pull request #1644 from zendesk/ktsanaktsidis/fix_session_id_thrashing

Fix brokers continually allocating new Session IDs
Vlad Gorodetsky 4 년 전
부모
커밋
6159078aac
2개의 변경된 파일58개의 추가작업 그리고 1개의 파일을 삭제
  1. 8 0
      consumer.go
  2. 50 1
      consumer_test.go

+ 8 - 0
consumer.go

@@ -887,6 +887,14 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		request.Version = 4
 		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
 	}
+	if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
+		request.Version = 7
+		// We do not currently implement KIP-227 FetchSessions. Setting the id to 0
+		// and the epoch to -1 tells the broker not to generate as session ID we're going
+		// to just ignore anyway.
+		request.SessionID = 0
+		request.SessionEpoch = -1
+	}
 	if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
 		request.Version = 10
 	}

+ 50 - 1
consumer_test.go

@@ -488,7 +488,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
 
 	cfg := NewConfig()
 	cfg.Consumer.Return.Errors = true
-	cfg.Version = V1_1_0_0
+	cfg.Version = V0_11_0_0
 
 	broker0 := NewMockBroker(t, 0)
 
@@ -569,6 +569,55 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
 	broker0.Close()
 }
 
+func TestConsumeMessageWithSessionIDs(t *testing.T) {
+	// Given
+	fetchResponse1 := &FetchResponse{Version: 7}
+	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
+	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
+
+	cfg := NewConfig()
+	cfg.Version = V1_1_0_0
+
+	broker0 := NewMockBroker(t, 0)
+	fetchResponse2 := &FetchResponse{}
+	fetchResponse2.Version = 7
+	fetchResponse2.AddError("my_topic", 0, ErrNoError)
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetVersion(1).
+			SetOffset("my_topic", 0, OffsetNewest, 1234).
+			SetOffset("my_topic", 0, OffsetOldest, 0),
+		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
+	})
+
+	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	assertMessageOffset(t, <-consumer.Messages(), 1)
+	assertMessageOffset(t, <-consumer.Messages(), 2)
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+
+	fetchReq := broker0.History()[3].Request.(*FetchRequest)
+	if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
+		t.Error("Expected session ID to be zero & Epoch to be -1")
+	}
+}
+
 // It is fine if offsets of fetched messages are not sequential (although
 // strictly increasing!).
 func TestConsumerNonSequentialOffsets(t *testing.T) {