functional_offset_manager_test.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package sarama
  2. import (
  3. "testing"
  4. )
  5. func TestFuncOffsetManager(t *testing.T) {
  6. checkKafkaVersion(t, "0.8.2")
  7. setupFunctionalTest(t)
  8. defer teardownFunctionalTest(t)
  9. client, err := NewClient(kafkaBrokers, nil)
  10. if err != nil {
  11. t.Fatal(err)
  12. }
  13. offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
  14. if err != nil {
  15. t.Fatal(err)
  16. }
  17. if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
  18. t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
  19. }
  20. pom1, err := offsetManager.ManagePartition("test.1", 0)
  21. if err != nil {
  22. t.Fatal(err)
  23. }
  24. pom1.MarkOffset(10, "test metadata")
  25. safeClose(t, pom1)
  26. pom2, err := offsetManager.ManagePartition("test.1", 0)
  27. if err != nil {
  28. t.Fatal(err)
  29. }
  30. offset, metadata := pom2.NextOffset()
  31. if offset != 10+1 {
  32. t.Errorf("Expected the next offset to be 11, found %d.", offset)
  33. }
  34. if metadata != "test metadata" {
  35. t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
  36. }
  37. safeClose(t, pom2)
  38. safeClose(t, offsetManager)
  39. safeClose(t, client)
  40. }