12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454 |
- package sarama
- import (
- "log"
- "os"
- "os/signal"
- "reflect"
- "strconv"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- )
- var testMsg = StringEncoder("Foo")
- // If a particular offset is provided then messages are consumed starting from
- // that offset.
- func TestConsumerOffsetManual(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 10; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 2345),
- "FetchRequest": mockFetchResponse,
- })
- // When
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 1234)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages starting from offset 1234 are consumed.
- for i := 0; i < 10; i++ {
- select {
- case message := <-consumer.Messages():
- assertMessageOffset(t, message, int64(i+1234))
- case err := <-consumer.Errors():
- t.Error(err)
- }
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // If `OffsetNewest` is passed as the initial offset then the first consumed
- // message is indeed corresponds to the offset that broker claims to be the
- // newest in its metadata response.
- func TestConsumerOffsetNewest(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 10).
- SetOffset("my_topic", 0, OffsetOldest, 7),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 9, testMsg).
- SetMessage("my_topic", 0, 10, testMsg).
- SetMessage("my_topic", 0, 11, testMsg).
- SetHighWaterMark("my_topic", 0, 14),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
- if err != nil {
- t.Fatal(err)
- }
- // Then
- assertMessageOffset(t, <-consumer.Messages(), 10)
- if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
- t.Errorf("Expected high water mark offset 14, found %d", hwmo)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // It is possible to close a partition consumer and create the same anew.
- func TestConsumerRecreate(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 10, testMsg),
- })
- c, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := c.ConsumePartition("my_topic", 0, 10)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-pc.Messages(), 10)
- // When
- safeClose(t, pc)
- pc, err = c.ConsumePartition("my_topic", 0, 10)
- if err != nil {
- t.Fatal(err)
- }
- // Then
- assertMessageOffset(t, <-pc.Messages(), 10)
- safeClose(t, pc)
- safeClose(t, c)
- broker0.Close()
- }
- // An attempt to consume the same partition twice should fail.
- func TestConsumerDuplicate(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1),
- })
- config := NewConfig()
- config.ChannelBufferSize = 0
- c, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- pc1, err := c.ConsumePartition("my_topic", 0, 0)
- if err != nil {
- t.Fatal(err)
- }
- // When
- pc2, err := c.ConsumePartition("my_topic", 0, 0)
- // Then
- if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
- t.Fatal("A partition cannot be consumed twice at the same time")
- }
- safeClose(t, pc1)
- safeClose(t, c)
- broker0.Close()
- }
- func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
- // Given
- broker0 := NewMockBroker(t, 100)
- // Stage 1: my_topic/0 served by broker0
- Logger.Printf(" STAGE 1")
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 123).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 123, testMsg),
- })
- c, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-pc.Messages(), 123)
- // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
- // but the requests to retrieve metadata fail with network timeout.
- Logger.Printf(" STAGE 2")
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockWrapper(fetchResponse2),
- })
- if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
- t.Errorf("Unexpected error: %v", consErr.Err)
- }
- // Stage 3: finally the metadata returned by broker0 tells that broker1 is
- // a new leader for my_topic/0. Consumption resumes.
- Logger.Printf(" STAGE 3")
- broker1 := NewMockBroker(t, 101)
- broker1.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 124, testMsg),
- })
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetBroker(broker1.Addr(), broker1.BrokerID()).
- SetLeader("my_topic", 0, broker1.BrokerID()),
- })
- assertMessageOffset(t, <-pc.Messages(), 124)
- safeClose(t, pc)
- safeClose(t, c)
- broker1.Close()
- broker0.Close()
- }
- // If consumer fails to refresh metadata it keeps retrying with frequency
- // specified by `Config.Consumer.Retry.Backoff`.
- func TestConsumerLeaderRefreshError(t *testing.T) {
- config := NewConfig()
- config.Net.ReadTimeout = 100 * time.Millisecond
- config.Consumer.Retry.Backoff = 200 * time.Millisecond
- config.Consumer.Return.Errors = true
- config.Metadata.Retry.Max = 0
- runConsumerLeaderRefreshErrorTestWithConfig(t, config)
- }
- func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
- var calls int32 = 0
- config := NewConfig()
- config.Net.ReadTimeout = 100 * time.Millisecond
- config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
- atomic.AddInt32(&calls, 1)
- return 200 * time.Millisecond
- }
- config.Consumer.Return.Errors = true
- config.Metadata.Retry.Max = 0
- runConsumerLeaderRefreshErrorTestWithConfig(t, config)
- // we expect at least one call to our backoff function
- if calls == 0 {
- t.Fail()
- }
- }
- func TestConsumerInvalidTopic(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 100)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()),
- })
- c, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When
- pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
- // Then
- if pc != nil || err != ErrUnknownTopicOrPartition {
- t.Errorf("Should fail with, err=%v", err)
- }
- safeClose(t, c)
- broker0.Close()
- }
- // Nothing bad happens if a partition consumer that has no leader assigned at
- // the moment is closed.
- func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 100)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 123).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 123, testMsg),
- })
- config := NewConfig()
- config.Net.ReadTimeout = 100 * time.Millisecond
- config.Consumer.Retry.Backoff = 100 * time.Millisecond
- config.Consumer.Return.Errors = true
- config.Metadata.Retry.Max = 0
- c, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-pc.Messages(), 123)
- // broker0 says that it is no longer the leader for my_topic/0, but the
- // requests to retrieve metadata fail with network timeout.
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockWrapper(fetchResponse2),
- })
- // When
- if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
- t.Errorf("Unexpected error: %v", consErr.Err)
- }
- // Then: the partition consumer can be closed without any problem.
- safeClose(t, pc)
- safeClose(t, c)
- broker0.Close()
- }
- // If the initial offset passed on partition consumer creation is out of the
- // actual offset range for the partition, then the partition consumer stops
- // immediately closing its output channels.
- func TestConsumerShutsDownOutOfRange(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- fetchResponse := new(FetchResponse)
- fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 7),
- "FetchRequest": NewMockWrapper(fetchResponse),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 101)
- if err != nil {
- t.Fatal(err)
- }
- // Then: consumer should shut down closing its messages and errors channels.
- if _, ok := <-consumer.Messages(); ok {
- t.Error("Expected the consumer to shut down")
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // If a fetch response contains messages with offsets that are smaller then
- // requested, then such messages are ignored.
- func TestConsumerExtraOffsets(t *testing.T) {
- // Given
- legacyFetchResponse := &FetchResponse{}
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
- newFetchResponse := &FetchResponse{Version: 4}
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
- newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
- newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
- for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
- var offsetResponseVersion int16
- cfg := NewConfig()
- cfg.Consumer.Return.Errors = true
- if fetchResponse1.Version >= 4 {
- cfg.Version = V0_11_0_0
- offsetResponseVersion = 1
- }
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.Version = fetchResponse1.Version
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(offsetResponseVersion).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 3)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages with offsets 1 and 2 are not returned even though they
- // are present in the response.
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, 3)
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, 4)
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- }
- // In some situations broker may return a block containing only
- // messages older then requested, even though there would be
- // more messages if higher offset was requested.
- func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
- // Given
- fetchResponse1 := &FetchResponse{Version: 4}
- fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
- fetchResponse2 := &FetchResponse{Version: 4}
- fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
- cfg := NewConfig()
- cfg.Consumer.Return.Errors = true
- cfg.Version = V0_11_0_0
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 2)
- if err != nil {
- t.Fatal(err)
- }
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, 1000000)
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
- // Given
- fetchResponse1 := &FetchResponse{Version: 4}
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
- cfg := NewConfig()
- cfg.Version = V0_11_0_0
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.Version = 4
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-consumer.Messages(), 1)
- assertMessageOffset(t, <-consumer.Messages(), 2)
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumeMessageWithSessionIDs(t *testing.T) {
- // Given
- fetchResponse1 := &FetchResponse{Version: 7}
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
- cfg := NewConfig()
- cfg.Version = V1_1_0_0
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.Version = 7
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-consumer.Messages(), 1)
- assertMessageOffset(t, <-consumer.Messages(), 2)
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- fetchReq := broker0.History()[3].Request.(*FetchRequest)
- if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
- t.Error("Expected session ID to be zero & Epoch to be -1")
- }
- }
- // It is fine if offsets of fetched messages are not sequential (although
- // strictly increasing!).
- func TestConsumerNonSequentialOffsets(t *testing.T) {
- // Given
- legacyFetchResponse := &FetchResponse{}
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
- newFetchResponse := &FetchResponse{Version: 4}
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
- newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
- newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
- for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
- var offsetResponseVersion int16
- cfg := NewConfig()
- if fetchResponse1.Version >= 4 {
- cfg.Version = V0_11_0_0
- offsetResponseVersion = 1
- }
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(offsetResponseVersion).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 3)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages with offsets 1 and 2 are not returned even though they
- // are present in the response.
- assertMessageOffset(t, <-consumer.Messages(), 5)
- assertMessageOffset(t, <-consumer.Messages(), 7)
- assertMessageOffset(t, <-consumer.Messages(), 11)
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- }
- // If leadership for a partition is changing then consumer resolves the new
- // leader and switches to it.
- func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
- // initial setup
- seedBroker := NewMockBroker(t, 10)
- leader0 := NewMockBroker(t, 0)
- leader1 := NewMockBroker(t, 1)
- seedBroker.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(leader0.Addr(), leader0.BrokerID()).
- SetBroker(leader1.Addr(), leader1.BrokerID()).
- SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
- SetLeader("my_topic", 0, leader0.BrokerID()).
- SetLeader("my_topic", 1, leader1.BrokerID()),
- })
- mockOffsetResponse1 := NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1000).
- SetOffset("my_topic", 1, OffsetOldest, 0).
- SetOffset("my_topic", 1, OffsetNewest, 1000)
- leader0.SetHandlerByMap(map[string]MockResponse{
- "OffsetRequest": mockOffsetResponse1,
- "FetchRequest": NewMockFetchResponse(t, 1),
- })
- leader1.SetHandlerByMap(map[string]MockResponse{
- "OffsetRequest": mockOffsetResponse1,
- "FetchRequest": NewMockFetchResponse(t, 1),
- })
- // launch test goroutines
- config := NewConfig()
- config.Consumer.Retry.Backoff = 50
- master, err := NewConsumer([]string{seedBroker.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- // we expect to end up (eventually) consuming exactly ten messages on each partition
- var wg sync.WaitGroup
- for i := int32(0); i < 2; i++ {
- consumer, err := master.ConsumePartition("my_topic", i, 0)
- if err != nil {
- t.Error(err)
- }
- go func(c PartitionConsumer) {
- for err := range c.Errors() {
- t.Error(err)
- }
- }(consumer)
- wg.Add(1)
- go func(partition int32, c PartitionConsumer) {
- for i := 0; i < 10; i++ {
- message := <-consumer.Messages()
- if message.Offset != int64(i) {
- t.Error("Incorrect message offset!", i, partition, message.Offset)
- }
- if message.Partition != partition {
- t.Error("Incorrect message partition!")
- }
- }
- safeClose(t, consumer)
- wg.Done()
- }(i, consumer)
- }
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 1")
- // Stage 1:
- // * my_topic/0 -> leader0 serves 4 messages
- // * my_topic/1 -> leader1 serves 0 messages
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 4; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
- }
- leader0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse,
- })
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 2")
- // Stage 2:
- // * leader0 says that it is no longer serving my_topic/0
- // * seedBroker tells that leader1 is serving my_topic/0 now
- // seed broker tells that the new partition 0 leader is leader1
- seedBroker.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetLeader("my_topic", 0, leader1.BrokerID()).
- SetLeader("my_topic", 1, leader1.BrokerID()).
- SetBroker(leader0.Addr(), leader0.BrokerID()).
- SetBroker(leader1.Addr(), leader1.BrokerID()).
- SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
- })
- // leader0 says no longer leader of partition 0
- fetchResponse := new(FetchResponse)
- fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
- leader0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockWrapper(fetchResponse),
- })
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 3")
- // Stage 3:
- // * my_topic/0 -> leader1 serves 3 messages
- // * my_topic/1 -> leader1 server 8 messages
- // leader1 provides 3 message on partition 0, and 8 messages on partition 1
- mockFetchResponse2 := NewMockFetchResponse(t, 2)
- for i := 4; i < 7; i++ {
- mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
- }
- for i := 0; i < 8; i++ {
- mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
- }
- leader1.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse2,
- })
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 4")
- // Stage 4:
- // * my_topic/0 -> leader1 serves 3 messages
- // * my_topic/1 -> leader1 tells that it is no longer the leader
- // * seedBroker tells that leader0 is a new leader for my_topic/1
- // metadata assigns 0 to leader1 and 1 to leader0
- seedBroker.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetLeader("my_topic", 0, leader1.BrokerID()).
- SetLeader("my_topic", 1, leader0.BrokerID()).
- SetBroker(leader0.Addr(), leader0.BrokerID()).
- SetBroker(leader1.Addr(), leader1.BrokerID()).
- SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
- })
- // leader1 provides three more messages on partition0, says no longer leader of partition1
- mockFetchResponse3 := NewMockFetchResponse(t, 3).
- SetMessage("my_topic", 0, int64(7), testMsg).
- SetMessage("my_topic", 0, int64(8), testMsg).
- SetMessage("my_topic", 0, int64(9), testMsg)
- fetchResponse4 := new(FetchResponse)
- fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
- leader1.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
- })
- // leader0 provides two messages on partition 1
- mockFetchResponse4 := NewMockFetchResponse(t, 2)
- for i := 8; i < 10; i++ {
- mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
- }
- leader0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse4,
- })
- wg.Wait()
- safeClose(t, master)
- leader1.Close()
- leader0.Close()
- seedBroker.Close()
- }
- // When two partitions have the same broker as the leader, if one partition
- // consumer channel buffer is full then that does not affect the ability to
- // read messages by the other consumer.
- func TestConsumerInterleavedClose(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()).
- SetLeader("my_topic", 1, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 1000).
- SetOffset("my_topic", 0, OffsetNewest, 1100).
- SetOffset("my_topic", 1, OffsetOldest, 2000).
- SetOffset("my_topic", 1, OffsetNewest, 2100),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 1000, testMsg).
- SetMessage("my_topic", 0, 1001, testMsg).
- SetMessage("my_topic", 0, 1002, testMsg).
- SetMessage("my_topic", 1, 2000, testMsg),
- })
- config := NewConfig()
- config.ChannelBufferSize = 0
- master, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- c0, err := master.ConsumePartition("my_topic", 0, 1000)
- if err != nil {
- t.Fatal(err)
- }
- c1, err := master.ConsumePartition("my_topic", 1, 2000)
- if err != nil {
- t.Fatal(err)
- }
- // When/Then: we can read from partition 0 even if nobody reads from partition 1
- assertMessageOffset(t, <-c0.Messages(), 1000)
- assertMessageOffset(t, <-c0.Messages(), 1001)
- assertMessageOffset(t, <-c0.Messages(), 1002)
- safeClose(t, c1)
- safeClose(t, c0)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerBounceWithReferenceOpen(t *testing.T) {
- broker0 := NewMockBroker(t, 0)
- broker0Addr := broker0.Addr()
- broker1 := NewMockBroker(t, 1)
- mockMetadataResponse := NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetBroker(broker1.Addr(), broker1.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()).
- SetLeader("my_topic", 1, broker1.BrokerID())
- mockOffsetResponse := NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 1000).
- SetOffset("my_topic", 0, OffsetNewest, 1100).
- SetOffset("my_topic", 1, OffsetOldest, 2000).
- SetOffset("my_topic", 1, OffsetNewest, 2100)
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 10; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
- mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "OffsetRequest": mockOffsetResponse,
- "FetchRequest": mockFetchResponse,
- })
- broker1.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": mockMetadataResponse,
- "OffsetRequest": mockOffsetResponse,
- "FetchRequest": mockFetchResponse,
- })
- config := NewConfig()
- config.Consumer.Return.Errors = true
- config.Consumer.Retry.Backoff = 100 * time.Millisecond
- config.ChannelBufferSize = 1
- master, err := NewConsumer([]string{broker1.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- c0, err := master.ConsumePartition("my_topic", 0, 1000)
- if err != nil {
- t.Fatal(err)
- }
- c1, err := master.ConsumePartition("my_topic", 1, 2000)
- if err != nil {
- t.Fatal(err)
- }
- // read messages from both partition to make sure that both brokers operate
- // normally.
- assertMessageOffset(t, <-c0.Messages(), 1000)
- assertMessageOffset(t, <-c1.Messages(), 2000)
- // Simulate broker shutdown. Note that metadata response does not change,
- // that is the leadership does not move to another broker. So partition
- // consumer will keep retrying to restore the connection with the broker.
- broker0.Close()
- // Make sure that while the partition/0 leader is down, consumer/partition/1
- // is capable of pulling messages from broker1.
- for i := 1; i < 7; i++ {
- offset := (<-c1.Messages()).Offset
- if offset != int64(2000+i) {
- t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
- }
- }
- // Bring broker0 back to service.
- broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse,
- })
- // Read the rest of messages from both partitions.
- for i := 7; i < 10; i++ {
- assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
- }
- for i := 1; i < 10; i++ {
- assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
- }
- select {
- case <-c0.Errors():
- default:
- t.Errorf("Partition consumer should have detected broker restart")
- }
- safeClose(t, c1)
- safeClose(t, c0)
- safeClose(t, master)
- broker0.Close()
- broker1.Close()
- }
- func TestConsumerOffsetOutOfRange(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 2)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 2345),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When/Then
- if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
- t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
- }
- if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
- t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
- }
- if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
- t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
- }
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerExpiryTicker(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- fetchResponse1 := &FetchResponse{}
- for i := 1; i <= 8; i++ {
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 1),
- "FetchRequest": NewMockSequence(fetchResponse1),
- })
- config := NewConfig()
- config.ChannelBufferSize = 0
- config.Consumer.MaxProcessingTime = 10 * time.Millisecond
- master, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages with offsets 1 through 8 are read
- for i := 1; i <= 8; i++ {
- assertMessageOffset(t, <-consumer.Messages(), int64(i))
- time.Sleep(2 * time.Millisecond)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerTimestamps(t *testing.T) {
- now := time.Now().Truncate(time.Millisecond)
- type testMessage struct {
- key Encoder
- offset int64
- timestamp time.Time
- }
- for _, d := range []struct {
- kversion KafkaVersion
- logAppendTime bool
- messages []testMessage
- expectedTimestamp []time.Time
- }{
- {MinVersion, false, []testMessage{
- {testMsg, 1, now},
- {testMsg, 2, now},
- }, []time.Time{{}, {}}},
- {V0_9_0_0, false, []testMessage{
- {testMsg, 1, now},
- {testMsg, 2, now},
- }, []time.Time{{}, {}}},
- {V0_10_0_0, false, []testMessage{
- {testMsg, 1, now},
- {testMsg, 2, now},
- }, []time.Time{{}, {}}},
- {V0_10_2_1, false, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
- {V0_10_2_1, true, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now, now}},
- {V0_11_0_0, false, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
- {V0_11_0_0, true, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now, now}},
- } {
- var fr *FetchResponse
- var offsetResponseVersion int16
- cfg := NewConfig()
- cfg.Version = d.kversion
- switch {
- case d.kversion.IsAtLeast(V0_11_0_0):
- offsetResponseVersion = 1
- fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
- for _, m := range d.messages {
- fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
- }
- fr.SetLastOffsetDelta("my_topic", 0, 2)
- fr.SetLastStableOffset("my_topic", 0, 2)
- case d.kversion.IsAtLeast(V0_10_1_0):
- offsetResponseVersion = 1
- fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
- for _, m := range d.messages {
- fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
- }
- default:
- var version int16
- switch {
- case d.kversion.IsAtLeast(V0_10_0_0):
- version = 2
- case d.kversion.IsAtLeast(V0_9_0_0):
- version = 1
- }
- fr = &FetchResponse{Version: version}
- for _, m := range d.messages {
- fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
- }
- }
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(offsetResponseVersion).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fr),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- for i, ts := range d.expectedTimestamp {
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, int64(i)+1)
- if msg.Timestamp != ts {
- t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
- d.kversion, d.logAppendTime, msg.Timestamp, ts)
- }
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- }
- // When set to ReadCommitted, no uncommitted message should be available in messages channel
- func TestExcludeUncommitted(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- fetchResponse := &FetchResponse{
- Version: 4,
- Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
- AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
- }}},
- }
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
- fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1237),
- "FetchRequest": NewMockWrapper(fetchResponse),
- })
- cfg := NewConfig()
- cfg.Consumer.Return.Errors = true
- cfg.Version = V0_11_0_0
- cfg.Consumer.IsolationLevel = ReadCommitted
- // When
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 1234)
- if err != nil {
- t.Fatal(err)
- }
- // Then: only the 2 committed messages are returned
- select {
- case message := <-consumer.Messages():
- assertMessageOffset(t, message, int64(1234))
- case err := <-consumer.Errors():
- t.Error(err)
- }
- select {
- case message := <-consumer.Messages():
- assertMessageOffset(t, message, int64(1238))
- case err := <-consumer.Errors():
- t.Error(err)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
- if msg.Offset != expectedOffset {
- t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
- }
- }
- // This example shows how to use the consumer to read messages
- // from a single partition.
- func ExampleConsumer() {
- consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
- if err != nil {
- panic(err)
- }
- defer func() {
- if err := consumer.Close(); err != nil {
- log.Fatalln(err)
- }
- }()
- partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
- if err != nil {
- panic(err)
- }
- defer func() {
- if err := partitionConsumer.Close(); err != nil {
- log.Fatalln(err)
- }
- }()
- // Trap SIGINT to trigger a shutdown.
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- consumed := 0
- ConsumerLoop:
- for {
- select {
- case msg := <-partitionConsumer.Messages():
- log.Printf("Consumed message offset %d\n", msg.Offset)
- consumed++
- case <-signals:
- break ConsumerLoop
- }
- }
- log.Printf("Consumed: %d\n", consumed)
- }
- func Test_partitionConsumer_parseResponse(t *testing.T) {
- type args struct {
- response *FetchResponse
- }
- tests := []struct {
- name string
- args args
- want []*ConsumerMessage
- wantErr bool
- }{
- {
- name: "empty but throttled FetchResponse is not considered an error",
- args: args{
- response: &FetchResponse{
- ThrottleTime: time.Millisecond,
- },
- },
- },
- {
- name: "empty FetchResponse is considered an incomplete response by default",
- args: args{
- response: &FetchResponse{},
- },
- wantErr: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- child := &partitionConsumer{
- broker: &brokerConsumer{
- broker: &Broker{},
- },
- conf: &Config{},
- }
- got, err := child.parseResponse(tt.args.response)
- if (err != nil) != tt.wantErr {
- t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if !reflect.DeepEqual(got, tt.want) {
- t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
- }
- })
- }
- }
- func testConsumerInterceptor(
- t *testing.T,
- interceptors []ConsumerInterceptor,
- expectationFn func(*testing.T, int, *ConsumerMessage),
- ) {
- // Given
- broker0 := NewMockBroker(t, 0)
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 10; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 0),
- "FetchRequest": mockFetchResponse,
- })
- config := NewConfig()
- config.Consumer.Interceptors = interceptors
- // When
- master, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 0)
- if err != nil {
- t.Fatal(err)
- }
- for i := 0; i < 10; i++ {
- select {
- case msg := <-consumer.Messages():
- expectationFn(t, i, msg)
- case err := <-consumer.Errors():
- t.Error(err)
- }
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerInterceptors(t *testing.T) {
- tests := []struct {
- name string
- interceptors []ConsumerInterceptor
- expectationFn func(*testing.T, int, *ConsumerMessage)
- }{
- {
- name: "intercept messages",
- interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}},
- expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
- ev, _ := testMsg.Encode()
- expected := string(ev) + strconv.Itoa(i)
- v := string(msg.Value)
- if v != expected {
- t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
- }
- },
- },
- {
- name: "interceptor chain",
- interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
- expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
- ev, _ := testMsg.Encode()
- expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000)
- v := string(msg.Value)
- if v != expected {
- t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
- }
- },
- },
- {
- name: "interceptor chain with one interceptor failing",
- interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
- expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
- ev, _ := testMsg.Encode()
- expected := string(ev) + strconv.Itoa(i+1000)
- v := string(msg.Value)
- if v != expected {
- t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
- }
- },
- },
- {
- name: "interceptor chain with all interceptors failing",
- interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
- expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
- ev, _ := testMsg.Encode()
- expected := string(ev)
- v := string(msg.Value)
- if v != expected {
- t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
- }
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) })
- }
- }
|