functional_offset_manager_test.go 981 B

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. //+build functional
  2. package sarama
  3. import (
  4. "testing"
  5. )
  6. func TestFuncOffsetManager(t *testing.T) {
  7. checkKafkaVersion(t, "0.8.2")
  8. setupFunctionalTest(t)
  9. defer teardownFunctionalTest(t)
  10. client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
  11. if err != nil {
  12. t.Fatal(err)
  13. }
  14. offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
  15. if err != nil {
  16. t.Fatal(err)
  17. }
  18. pom1, err := offsetManager.ManagePartition("test.1", 0)
  19. if err != nil {
  20. t.Fatal(err)
  21. }
  22. pom1.MarkOffset(10, "test metadata")
  23. safeClose(t, pom1)
  24. pom2, err := offsetManager.ManagePartition("test.1", 0)
  25. if err != nil {
  26. t.Fatal(err)
  27. }
  28. offset, metadata := pom2.NextOffset()
  29. if offset != 10 {
  30. t.Errorf("Expected the next offset to be 10, found %d.", offset)
  31. }
  32. if metadata != "test metadata" {
  33. t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
  34. }
  35. safeClose(t, pom2)
  36. safeClose(t, offsetManager)
  37. safeClose(t, client)
  38. }