functional_offset_manager_test.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. )
  6. func TestFuncOffsetManager(t *testing.T) {
  7. checkKafkaVersion(t, "0.8.2")
  8. if os.Getenv("KAFKA_VERSION") == "0.9.0.0" {
  9. t.Skip("Offset manager is broken with kafka 0.9 at the moment.")
  10. }
  11. setupFunctionalTest(t)
  12. defer teardownFunctionalTest(t)
  13. client, err := NewClient(kafkaBrokers, nil)
  14. if err != nil {
  15. t.Fatal(err)
  16. }
  17. offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
  18. if err != nil {
  19. t.Fatal(err)
  20. }
  21. if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
  22. t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
  23. }
  24. pom1, err := offsetManager.ManagePartition("test.1", 0)
  25. if err != nil {
  26. t.Fatal(err)
  27. }
  28. pom1.MarkOffset(10, "test metadata")
  29. safeClose(t, pom1)
  30. pom2, err := offsetManager.ManagePartition("test.1", 0)
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. offset, metadata := pom2.NextOffset()
  35. if offset != 10+1 {
  36. t.Errorf("Expected the next offset to be 11, found %d.", offset)
  37. }
  38. if metadata != "test metadata" {
  39. t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
  40. }
  41. safeClose(t, pom2)
  42. safeClose(t, offsetManager)
  43. safeClose(t, client)
  44. }