Sfoglia il codice sorgente

Merge pull request #518 from Shopify/offset-manager-close

Add Close() OffsetManager interface
Willem van Bergen 9 anni fa
parent
commit
de8e3123aa
2 ha cambiato i file con 15 aggiunte e 0 eliminazioni
  1. 9 0
      offset_manager.go
  2. 6 0
      offset_manager_test.go

+ 9 - 0
offset_manager.go

@@ -12,6 +12,11 @@ type OffsetManager interface {
 	// ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
 	// return an error if this OffsetManager is already managing the given topic/partition.
 	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
+
+	// Close stops the OffsetManager from managing offsets. It is required to call this function
+	// before an OffsetManager object passes out of scope, as it will otherwise
+	// leak memory. You must call this after all the PartitionOffsetManagers are closed.
+	Close() error
 }
 
 type offsetManager struct {
@@ -66,6 +71,10 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
 	return pom, nil
 }
 
+func (om *offsetManager) Close() error {
+	return nil
+}
+
 func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
 	om.lock.Lock()
 	defer om.lock.Unlock()

+ 6 - 0
offset_manager_test.go

@@ -124,6 +124,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
 	coordinator.Close()
 	newCoordinator.Close()
 	safeClose(t, pom)
+	safeClose(t, om)
 	safeClose(t, testClient)
 }
 
@@ -157,6 +158,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
 	broker.Close()
 	coordinator.Close()
 	safeClose(t, pom)
+	safeClose(t, om)
 	safeClose(t, testClient)
 }
 
@@ -173,6 +175,7 @@ func TestPartitionOffsetManagerOffset(t *testing.T) {
 	}
 
 	safeClose(t, pom)
+	safeClose(t, om)
 	broker.Close()
 	coordinator.Close()
 	safeClose(t, testClient)
@@ -197,6 +200,7 @@ func TestPartitionOffsetManagerSetOffset(t *testing.T) {
 	}
 
 	safeClose(t, pom)
+	safeClose(t, om)
 	safeClose(t, testClient)
 	broker.Close()
 	coordinator.Close()
@@ -271,6 +275,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 	broker.Close()
 	coordinator.Close()
 	newCoordinator.Close()
+	safeClose(t, om)
 	safeClose(t, testClient)
 }
 
@@ -298,6 +303,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) {
 	pom.SetOffset(100, "modified_meta")
 
 	safeClose(t, pom)
+	safeClose(t, om)
 	broker.Close()
 	safeClose(t, testClient)
 }