functional_consumer_test.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package sarama
  2. import (
  3. "math"
  4. "testing"
  5. )
  6. func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
  7. checkKafkaAvailability(t)
  8. consumer, err := NewConsumer(kafkaBrokers, nil)
  9. if err != nil {
  10. t.Fatal(err)
  11. }
  12. if _, err := consumer.ConsumePartition("test.1", 0, -10); err != ErrOffsetOutOfRange {
  13. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  14. }
  15. if _, err := consumer.ConsumePartition("test.1", 0, math.MaxInt64); err != ErrOffsetOutOfRange {
  16. t.Error("Expected ErrOffsetOutOfRange, got:", err)
  17. }
  18. safeClose(t, consumer)
  19. }
  20. func TestConsumerHighWaterMarkOffset(t *testing.T) {
  21. checkKafkaAvailability(t)
  22. p, err := NewSyncProducer(kafkaBrokers, nil)
  23. if err != nil {
  24. t.Fatal(err)
  25. }
  26. defer safeClose(t, p)
  27. _, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. c, err := NewConsumer(kafkaBrokers, nil)
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. defer safeClose(t, c)
  36. pc, err := c.ConsumePartition("test.1", 0, OffsetOldest)
  37. if err != nil {
  38. t.Fatal(err)
  39. }
  40. <-pc.Messages()
  41. if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
  42. t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
  43. }
  44. safeClose(t, pc)
  45. }