1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288 |
- package sarama
- import (
- "log"
- "os"
- "os/signal"
- "reflect"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- )
- var testMsg = StringEncoder("Foo")
- // If a particular offset is provided then messages are consumed starting from
- // that offset.
- func TestConsumerOffsetManual(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 10; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 2345),
- "FetchRequest": mockFetchResponse,
- })
- // When
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 1234)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages starting from offset 1234 are consumed.
- for i := 0; i < 10; i++ {
- select {
- case message := <-consumer.Messages():
- assertMessageOffset(t, message, int64(i+1234))
- case err := <-consumer.Errors():
- t.Error(err)
- }
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // If `OffsetNewest` is passed as the initial offset then the first consumed
- // message is indeed corresponds to the offset that broker claims to be the
- // newest in its metadata response.
- func TestConsumerOffsetNewest(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 10).
- SetOffset("my_topic", 0, OffsetOldest, 7),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 9, testMsg).
- SetMessage("my_topic", 0, 10, testMsg).
- SetMessage("my_topic", 0, 11, testMsg).
- SetHighWaterMark("my_topic", 0, 14),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
- if err != nil {
- t.Fatal(err)
- }
- // Then
- assertMessageOffset(t, <-consumer.Messages(), 10)
- if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
- t.Errorf("Expected high water mark offset 14, found %d", hwmo)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // It is possible to close a partition consumer and create the same anew.
- func TestConsumerRecreate(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 10, testMsg),
- })
- c, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := c.ConsumePartition("my_topic", 0, 10)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-pc.Messages(), 10)
- // When
- safeClose(t, pc)
- pc, err = c.ConsumePartition("my_topic", 0, 10)
- if err != nil {
- t.Fatal(err)
- }
- // Then
- assertMessageOffset(t, <-pc.Messages(), 10)
- safeClose(t, pc)
- safeClose(t, c)
- broker0.Close()
- }
- // An attempt to consume the same partition twice should fail.
- func TestConsumerDuplicate(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1),
- })
- config := NewConfig()
- config.ChannelBufferSize = 0
- c, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- pc1, err := c.ConsumePartition("my_topic", 0, 0)
- if err != nil {
- t.Fatal(err)
- }
- // When
- pc2, err := c.ConsumePartition("my_topic", 0, 0)
- // Then
- if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
- t.Fatal("A partition cannot be consumed twice at the same time")
- }
- safeClose(t, pc1)
- safeClose(t, c)
- broker0.Close()
- }
- func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
- // Given
- broker0 := NewMockBroker(t, 100)
- // Stage 1: my_topic/0 served by broker0
- Logger.Printf(" STAGE 1")
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 123).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 123, testMsg),
- })
- c, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-pc.Messages(), 123)
- // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
- // but the requests to retrieve metadata fail with network timeout.
- Logger.Printf(" STAGE 2")
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockWrapper(fetchResponse2),
- })
- if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
- t.Errorf("Unexpected error: %v", consErr.Err)
- }
- // Stage 3: finally the metadata returned by broker0 tells that broker1 is
- // a new leader for my_topic/0. Consumption resumes.
- Logger.Printf(" STAGE 3")
- broker1 := NewMockBroker(t, 101)
- broker1.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 124, testMsg),
- })
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetBroker(broker1.Addr(), broker1.BrokerID()).
- SetLeader("my_topic", 0, broker1.BrokerID()),
- })
- assertMessageOffset(t, <-pc.Messages(), 124)
- safeClose(t, pc)
- safeClose(t, c)
- broker1.Close()
- broker0.Close()
- }
- // If consumer fails to refresh metadata it keeps retrying with frequency
- // specified by `Config.Consumer.Retry.Backoff`.
- func TestConsumerLeaderRefreshError(t *testing.T) {
- config := NewConfig()
- config.Net.ReadTimeout = 100 * time.Millisecond
- config.Consumer.Retry.Backoff = 200 * time.Millisecond
- config.Consumer.Return.Errors = true
- config.Metadata.Retry.Max = 0
- runConsumerLeaderRefreshErrorTestWithConfig(t, config)
- }
- func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
- var calls int32 = 0
- config := NewConfig()
- config.Net.ReadTimeout = 100 * time.Millisecond
- config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
- atomic.AddInt32(&calls, 1)
- return 200 * time.Millisecond
- }
- config.Consumer.Return.Errors = true
- config.Metadata.Retry.Max = 0
- runConsumerLeaderRefreshErrorTestWithConfig(t, config)
- // we expect at least one call to our backoff function
- if calls == 0 {
- t.Fail()
- }
- }
- func TestConsumerInvalidTopic(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 100)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()),
- })
- c, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When
- pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
- // Then
- if pc != nil || err != ErrUnknownTopicOrPartition {
- t.Errorf("Should fail with, err=%v", err)
- }
- safeClose(t, c)
- broker0.Close()
- }
- // Nothing bad happens if a partition consumer that has no leader assigned at
- // the moment is closed.
- func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 100)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 123).
- SetOffset("my_topic", 0, OffsetNewest, 1000),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 123, testMsg),
- })
- config := NewConfig()
- config.Net.ReadTimeout = 100 * time.Millisecond
- config.Consumer.Retry.Backoff = 100 * time.Millisecond
- config.Consumer.Return.Errors = true
- config.Metadata.Retry.Max = 0
- c, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-pc.Messages(), 123)
- // broker0 says that it is no longer the leader for my_topic/0, but the
- // requests to retrieve metadata fail with network timeout.
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockWrapper(fetchResponse2),
- })
- // When
- if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
- t.Errorf("Unexpected error: %v", consErr.Err)
- }
- // Then: the partition consumer can be closed without any problem.
- safeClose(t, pc)
- safeClose(t, c)
- broker0.Close()
- }
- // If the initial offset passed on partition consumer creation is out of the
- // actual offset range for the partition, then the partition consumer stops
- // immediately closing its output channels.
- func TestConsumerShutsDownOutOfRange(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- fetchResponse := new(FetchResponse)
- fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 7),
- "FetchRequest": NewMockWrapper(fetchResponse),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 101)
- if err != nil {
- t.Fatal(err)
- }
- // Then: consumer should shut down closing its messages and errors channels.
- if _, ok := <-consumer.Messages(); ok {
- t.Error("Expected the consumer to shut down")
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // If a fetch response contains messages with offsets that are smaller then
- // requested, then such messages are ignored.
- func TestConsumerExtraOffsets(t *testing.T) {
- // Given
- legacyFetchResponse := &FetchResponse{}
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
- newFetchResponse := &FetchResponse{Version: 4}
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
- newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
- newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
- for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
- var offsetResponseVersion int16
- cfg := NewConfig()
- cfg.Consumer.Return.Errors = true
- if fetchResponse1.Version >= 4 {
- cfg.Version = V0_11_0_0
- offsetResponseVersion = 1
- }
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.Version = fetchResponse1.Version
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(offsetResponseVersion).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 3)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages with offsets 1 and 2 are not returned even though they
- // are present in the response.
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, 3)
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, 4)
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- }
- // In some situations broker may return a block containing only
- // messages older then requested, even though there would be
- // more messages if higher offset was requested.
- func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
- // Given
- fetchResponse1 := &FetchResponse{Version: 4}
- fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
- fetchResponse2 := &FetchResponse{Version: 4}
- fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
- cfg := NewConfig()
- cfg.Consumer.Return.Errors = true
- cfg.Version = V1_1_0_0
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 2)
- if err != nil {
- t.Fatal(err)
- }
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, 1000000)
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
- // Given
- fetchResponse1 := &FetchResponse{Version: 4}
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
- cfg := NewConfig()
- cfg.Version = V0_11_0_0
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{}
- fetchResponse2.Version = 4
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- assertMessageOffset(t, <-consumer.Messages(), 1)
- assertMessageOffset(t, <-consumer.Messages(), 2)
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- // It is fine if offsets of fetched messages are not sequential (although
- // strictly increasing!).
- func TestConsumerNonSequentialOffsets(t *testing.T) {
- // Given
- legacyFetchResponse := &FetchResponse{}
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
- legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
- newFetchResponse := &FetchResponse{Version: 4}
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
- newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
- newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
- newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
- for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
- var offsetResponseVersion int16
- cfg := NewConfig()
- if fetchResponse1.Version >= 4 {
- cfg.Version = V0_11_0_0
- offsetResponseVersion = 1
- }
- broker0 := NewMockBroker(t, 0)
- fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
- fetchResponse2.AddError("my_topic", 0, ErrNoError)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(offsetResponseVersion).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 3)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages with offsets 1 and 2 are not returned even though they
- // are present in the response.
- assertMessageOffset(t, <-consumer.Messages(), 5)
- assertMessageOffset(t, <-consumer.Messages(), 7)
- assertMessageOffset(t, <-consumer.Messages(), 11)
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- }
- // If leadership for a partition is changing then consumer resolves the new
- // leader and switches to it.
- func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
- // initial setup
- seedBroker := NewMockBroker(t, 10)
- leader0 := NewMockBroker(t, 0)
- leader1 := NewMockBroker(t, 1)
- seedBroker.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(leader0.Addr(), leader0.BrokerID()).
- SetBroker(leader1.Addr(), leader1.BrokerID()).
- SetLeader("my_topic", 0, leader0.BrokerID()).
- SetLeader("my_topic", 1, leader1.BrokerID()),
- })
- mockOffsetResponse1 := NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1000).
- SetOffset("my_topic", 1, OffsetOldest, 0).
- SetOffset("my_topic", 1, OffsetNewest, 1000)
- leader0.SetHandlerByMap(map[string]MockResponse{
- "OffsetRequest": mockOffsetResponse1,
- "FetchRequest": NewMockFetchResponse(t, 1),
- })
- leader1.SetHandlerByMap(map[string]MockResponse{
- "OffsetRequest": mockOffsetResponse1,
- "FetchRequest": NewMockFetchResponse(t, 1),
- })
- // launch test goroutines
- config := NewConfig()
- config.Consumer.Retry.Backoff = 50
- master, err := NewConsumer([]string{seedBroker.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- // we expect to end up (eventually) consuming exactly ten messages on each partition
- var wg sync.WaitGroup
- for i := int32(0); i < 2; i++ {
- consumer, err := master.ConsumePartition("my_topic", i, 0)
- if err != nil {
- t.Error(err)
- }
- go func(c PartitionConsumer) {
- for err := range c.Errors() {
- t.Error(err)
- }
- }(consumer)
- wg.Add(1)
- go func(partition int32, c PartitionConsumer) {
- for i := 0; i < 10; i++ {
- message := <-consumer.Messages()
- if message.Offset != int64(i) {
- t.Error("Incorrect message offset!", i, partition, message.Offset)
- }
- if message.Partition != partition {
- t.Error("Incorrect message partition!")
- }
- }
- safeClose(t, consumer)
- wg.Done()
- }(i, consumer)
- }
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 1")
- // Stage 1:
- // * my_topic/0 -> leader0 serves 4 messages
- // * my_topic/1 -> leader1 serves 0 messages
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 4; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
- }
- leader0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse,
- })
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 2")
- // Stage 2:
- // * leader0 says that it is no longer serving my_topic/0
- // * seedBroker tells that leader1 is serving my_topic/0 now
- // seed broker tells that the new partition 0 leader is leader1
- seedBroker.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetLeader("my_topic", 0, leader1.BrokerID()).
- SetLeader("my_topic", 1, leader1.BrokerID()),
- })
- // leader0 says no longer leader of partition 0
- fetchResponse := new(FetchResponse)
- fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
- leader0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockWrapper(fetchResponse),
- })
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 3")
- // Stage 3:
- // * my_topic/0 -> leader1 serves 3 messages
- // * my_topic/1 -> leader1 server 8 messages
- // leader1 provides 3 message on partition 0, and 8 messages on partition 1
- mockFetchResponse2 := NewMockFetchResponse(t, 2)
- for i := 4; i < 7; i++ {
- mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
- }
- for i := 0; i < 8; i++ {
- mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
- }
- leader1.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse2,
- })
- time.Sleep(50 * time.Millisecond)
- Logger.Printf(" STAGE 4")
- // Stage 4:
- // * my_topic/0 -> leader1 serves 3 messages
- // * my_topic/1 -> leader1 tells that it is no longer the leader
- // * seedBroker tells that leader0 is a new leader for my_topic/1
- // metadata assigns 0 to leader1 and 1 to leader0
- seedBroker.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetLeader("my_topic", 0, leader1.BrokerID()).
- SetLeader("my_topic", 1, leader0.BrokerID()),
- })
- // leader1 provides three more messages on partition0, says no longer leader of partition1
- mockFetchResponse3 := NewMockFetchResponse(t, 3).
- SetMessage("my_topic", 0, int64(7), testMsg).
- SetMessage("my_topic", 0, int64(8), testMsg).
- SetMessage("my_topic", 0, int64(9), testMsg)
- fetchResponse4 := new(FetchResponse)
- fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
- leader1.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
- })
- // leader0 provides two messages on partition 1
- mockFetchResponse4 := NewMockFetchResponse(t, 2)
- for i := 8; i < 10; i++ {
- mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
- }
- leader0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse4,
- })
- wg.Wait()
- safeClose(t, master)
- leader1.Close()
- leader0.Close()
- seedBroker.Close()
- }
- // When two partitions have the same broker as the leader, if one partition
- // consumer channel buffer is full then that does not affect the ability to
- // read messages by the other consumer.
- func TestConsumerInterleavedClose(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()).
- SetLeader("my_topic", 1, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 1000).
- SetOffset("my_topic", 0, OffsetNewest, 1100).
- SetOffset("my_topic", 1, OffsetOldest, 2000).
- SetOffset("my_topic", 1, OffsetNewest, 2100),
- "FetchRequest": NewMockFetchResponse(t, 1).
- SetMessage("my_topic", 0, 1000, testMsg).
- SetMessage("my_topic", 0, 1001, testMsg).
- SetMessage("my_topic", 0, 1002, testMsg).
- SetMessage("my_topic", 1, 2000, testMsg),
- })
- config := NewConfig()
- config.ChannelBufferSize = 0
- master, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- c0, err := master.ConsumePartition("my_topic", 0, 1000)
- if err != nil {
- t.Fatal(err)
- }
- c1, err := master.ConsumePartition("my_topic", 1, 2000)
- if err != nil {
- t.Fatal(err)
- }
- // When/Then: we can read from partition 0 even if nobody reads from partition 1
- assertMessageOffset(t, <-c0.Messages(), 1000)
- assertMessageOffset(t, <-c0.Messages(), 1001)
- assertMessageOffset(t, <-c0.Messages(), 1002)
- safeClose(t, c1)
- safeClose(t, c0)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerBounceWithReferenceOpen(t *testing.T) {
- broker0 := NewMockBroker(t, 0)
- broker0Addr := broker0.Addr()
- broker1 := NewMockBroker(t, 1)
- mockMetadataResponse := NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetBroker(broker1.Addr(), broker1.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()).
- SetLeader("my_topic", 1, broker1.BrokerID())
- mockOffsetResponse := NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetOldest, 1000).
- SetOffset("my_topic", 0, OffsetNewest, 1100).
- SetOffset("my_topic", 1, OffsetOldest, 2000).
- SetOffset("my_topic", 1, OffsetNewest, 2100)
- mockFetchResponse := NewMockFetchResponse(t, 1)
- for i := 0; i < 10; i++ {
- mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
- mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "OffsetRequest": mockOffsetResponse,
- "FetchRequest": mockFetchResponse,
- })
- broker1.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": mockMetadataResponse,
- "OffsetRequest": mockOffsetResponse,
- "FetchRequest": mockFetchResponse,
- })
- config := NewConfig()
- config.Consumer.Return.Errors = true
- config.Consumer.Retry.Backoff = 100 * time.Millisecond
- config.ChannelBufferSize = 1
- master, err := NewConsumer([]string{broker1.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- c0, err := master.ConsumePartition("my_topic", 0, 1000)
- if err != nil {
- t.Fatal(err)
- }
- c1, err := master.ConsumePartition("my_topic", 1, 2000)
- if err != nil {
- t.Fatal(err)
- }
- // read messages from both partition to make sure that both brokers operate
- // normally.
- assertMessageOffset(t, <-c0.Messages(), 1000)
- assertMessageOffset(t, <-c1.Messages(), 2000)
- // Simulate broker shutdown. Note that metadata response does not change,
- // that is the leadership does not move to another broker. So partition
- // consumer will keep retrying to restore the connection with the broker.
- broker0.Close()
- // Make sure that while the partition/0 leader is down, consumer/partition/1
- // is capable of pulling messages from broker1.
- for i := 1; i < 7; i++ {
- offset := (<-c1.Messages()).Offset
- if offset != int64(2000+i) {
- t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
- }
- }
- // Bring broker0 back to service.
- broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "FetchRequest": mockFetchResponse,
- })
- // Read the rest of messages from both partitions.
- for i := 7; i < 10; i++ {
- assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
- }
- for i := 1; i < 10; i++ {
- assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
- }
- select {
- case <-c0.Errors():
- default:
- t.Errorf("Partition consumer should have detected broker restart")
- }
- safeClose(t, c1)
- safeClose(t, c0)
- safeClose(t, master)
- broker0.Close()
- broker1.Close()
- }
- func TestConsumerOffsetOutOfRange(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 2)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 2345),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, nil)
- if err != nil {
- t.Fatal(err)
- }
- // When/Then
- if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
- t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
- }
- if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
- t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
- }
- if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
- t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
- }
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerExpiryTicker(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- fetchResponse1 := &FetchResponse{}
- for i := 1; i <= 8; i++ {
- fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
- }
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 1),
- "FetchRequest": NewMockSequence(fetchResponse1),
- })
- config := NewConfig()
- config.ChannelBufferSize = 0
- config.Consumer.MaxProcessingTime = 10 * time.Millisecond
- master, err := NewConsumer([]string{broker0.Addr()}, config)
- if err != nil {
- t.Fatal(err)
- }
- // When
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- // Then: messages with offsets 1 through 8 are read
- for i := 1; i <= 8; i++ {
- assertMessageOffset(t, <-consumer.Messages(), int64(i))
- time.Sleep(2 * time.Millisecond)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func TestConsumerTimestamps(t *testing.T) {
- now := time.Now().Truncate(time.Millisecond)
- type testMessage struct {
- key Encoder
- offset int64
- timestamp time.Time
- }
- for _, d := range []struct {
- kversion KafkaVersion
- logAppendTime bool
- messages []testMessage
- expectedTimestamp []time.Time
- }{
- {MinVersion, false, []testMessage{
- {testMsg, 1, now},
- {testMsg, 2, now},
- }, []time.Time{{}, {}}},
- {V0_9_0_0, false, []testMessage{
- {testMsg, 1, now},
- {testMsg, 2, now},
- }, []time.Time{{}, {}}},
- {V0_10_0_0, false, []testMessage{
- {testMsg, 1, now},
- {testMsg, 2, now},
- }, []time.Time{{}, {}}},
- {V0_10_2_1, false, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
- {V0_10_2_1, true, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now, now}},
- {V0_11_0_0, false, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
- {V0_11_0_0, true, []testMessage{
- {testMsg, 1, now.Add(time.Second)},
- {testMsg, 2, now.Add(2 * time.Second)},
- }, []time.Time{now, now}},
- } {
- var fr *FetchResponse
- var offsetResponseVersion int16
- cfg := NewConfig()
- cfg.Version = d.kversion
- switch {
- case d.kversion.IsAtLeast(V0_11_0_0):
- offsetResponseVersion = 1
- fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
- for _, m := range d.messages {
- fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
- }
- fr.SetLastOffsetDelta("my_topic", 0, 2)
- fr.SetLastStableOffset("my_topic", 0, 2)
- case d.kversion.IsAtLeast(V0_10_1_0):
- offsetResponseVersion = 1
- fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
- for _, m := range d.messages {
- fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
- }
- default:
- var version int16
- switch {
- case d.kversion.IsAtLeast(V0_10_0_0):
- version = 2
- case d.kversion.IsAtLeast(V0_9_0_0):
- version = 1
- }
- fr = &FetchResponse{Version: version}
- for _, m := range d.messages {
- fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
- }
- }
- broker0 := NewMockBroker(t, 0)
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(offsetResponseVersion).
- SetOffset("my_topic", 0, OffsetNewest, 1234).
- SetOffset("my_topic", 0, OffsetOldest, 0),
- "FetchRequest": NewMockSequence(fr),
- })
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 1)
- if err != nil {
- t.Fatal(err)
- }
- for i, ts := range d.expectedTimestamp {
- select {
- case msg := <-consumer.Messages():
- assertMessageOffset(t, msg, int64(i)+1)
- if msg.Timestamp != ts {
- t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
- d.kversion, d.logAppendTime, msg.Timestamp, ts)
- }
- case err := <-consumer.Errors():
- t.Fatal(err)
- }
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- }
- // When set to ReadCommitted, no uncommitted message should be available in messages channel
- func TestExcludeUncommitted(t *testing.T) {
- // Given
- broker0 := NewMockBroker(t, 0)
- fetchResponse := &FetchResponse{
- Version: 4,
- Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
- AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
- }}},
- }
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true) // committed msg
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true) // uncommitted msg
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true) // uncommitted msg
- fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
- fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true) // committed msg
- broker0.SetHandlerByMap(map[string]MockResponse{
- "MetadataRequest": NewMockMetadataResponse(t).
- SetBroker(broker0.Addr(), broker0.BrokerID()).
- SetLeader("my_topic", 0, broker0.BrokerID()),
- "OffsetRequest": NewMockOffsetResponse(t).
- SetVersion(1).
- SetOffset("my_topic", 0, OffsetOldest, 0).
- SetOffset("my_topic", 0, OffsetNewest, 1237),
- "FetchRequest": NewMockWrapper(fetchResponse),
- })
- cfg := NewConfig()
- cfg.Consumer.Return.Errors = true
- cfg.Version = V0_11_0_0
- cfg.Consumer.IsolationLevel = ReadCommitted
- // When
- master, err := NewConsumer([]string{broker0.Addr()}, cfg)
- if err != nil {
- t.Fatal(err)
- }
- consumer, err := master.ConsumePartition("my_topic", 0, 1234)
- if err != nil {
- t.Fatal(err)
- }
- // Then: only the 2 committed messages are returned
- select {
- case message := <-consumer.Messages():
- assertMessageOffset(t, message, int64(1234))
- case err := <-consumer.Errors():
- t.Error(err)
- }
- select {
- case message := <-consumer.Messages():
- assertMessageOffset(t, message, int64(1238))
- case err := <-consumer.Errors():
- t.Error(err)
- }
- safeClose(t, consumer)
- safeClose(t, master)
- broker0.Close()
- }
- func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
- if msg.Offset != expectedOffset {
- t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
- }
- }
- // This example shows how to use the consumer to read messages
- // from a single partition.
- func ExampleConsumer() {
- consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
- if err != nil {
- panic(err)
- }
- defer func() {
- if err := consumer.Close(); err != nil {
- log.Fatalln(err)
- }
- }()
- partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
- if err != nil {
- panic(err)
- }
- defer func() {
- if err := partitionConsumer.Close(); err != nil {
- log.Fatalln(err)
- }
- }()
- // Trap SIGINT to trigger a shutdown.
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- consumed := 0
- ConsumerLoop:
- for {
- select {
- case msg := <-partitionConsumer.Messages():
- log.Printf("Consumed message offset %d\n", msg.Offset)
- consumed++
- case <-signals:
- break ConsumerLoop
- }
- }
- log.Printf("Consumed: %d\n", consumed)
- }
- func Test_partitionConsumer_parseResponse(t *testing.T) {
- type args struct {
- response *FetchResponse
- }
- tests := []struct {
- name string
- args args
- want []*ConsumerMessage
- wantErr bool
- }{
- {
- name: "empty but throttled FetchResponse is not considered an error",
- args: args{
- response: &FetchResponse{
- ThrottleTime: time.Millisecond,
- },
- },
- },
- {
- name: "empty FetchResponse is considered an incomplete response by default",
- args: args{
- response: &FetchResponse{},
- },
- wantErr: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- child := &partitionConsumer{
- broker: &brokerConsumer{
- broker: &Broker{},
- },
- conf: &Config{},
- }
- got, err := child.parseResponse(tt.args.response)
- if (err != nil) != tt.wantErr {
- t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if !reflect.DeepEqual(got, tt.want) {
- t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
- }
- })
- }
- }
|