Переглянути джерело

add conviction policy interface (#1081)

* add conviction policy interface and tests

* remove irrelevant test

* add hostinfo as parameter

* fix control.go

* fix comments and Reset function
Chang Liu 7 роки тому
батько
коміт
3a24f01b62
5 змінених файлів з 28 додано та 2 видалено
  1. 2 0
      cluster.go
  2. 2 0
      cluster_test.go
  3. 3 1
      connectionpool.go
  4. 1 1
      control.go
  5. 20 0
      policies.go

+ 2 - 0
cluster.go

@@ -54,6 +54,7 @@ type ClusterConfig struct {
 	Compressor         Compressor         // compression algorithm (default: nil)
 	Authenticator      Authenticator      // authenticator (default: nil)
 	RetryPolicy        RetryPolicy        // Default retry policy to use for queries (default: 0)
+	ConvictionPolicy   ConvictionPolicy   // Decide whether to mark host as down based on the error and host info (default: SimpleConvictionPolicy)
 	ReconnectionPolicy ReconnectionPolicy // Default reconnection policy to use for reconnecting before trying to mark host as down (default: see below)
 	SocketKeepalive    time.Duration      // The keepalive period to use, enabled if > 0 (default: 0)
 	MaxPreparedStmts   int                // Sets the maximum cache size for prepared statements globally for gocql (default: 1000)
@@ -152,6 +153,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
 		DefaultTimestamp:       true,
 		MaxWaitSchemaAgreement: 60 * time.Second,
 		ReconnectInterval:      60 * time.Second,
+		ConvictionPolicy:       &SimpleConvictionPolicy{},
 		ReconnectionPolicy:     &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
 	}
 	return cfg

+ 2 - 0
cluster_test.go

@@ -20,6 +20,8 @@ func TestNewCluster_Defaults(t *testing.T) {
 	assertEqual(t, "cluster config default timestamp", true, cfg.DefaultTimestamp)
 	assertEqual(t, "cluster config max wait schema agreement", 60*time.Second, cfg.MaxWaitSchemaAgreement)
 	assertEqual(t, "cluster config reconnect interval", 60*time.Second, cfg.ReconnectInterval)
+	assertTrue(t, "cluster config conviction policy",
+		reflect.DeepEqual(&SimpleConvictionPolicy{}, cfg.ConvictionPolicy))
 	assertTrue(t, "cluster config reconnection policy",
 		reflect.DeepEqual(&ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second}, cfg.ReconnectionPolicy))
 }

+ 3 - 1
connectionpool.go

@@ -420,7 +420,9 @@ func (pool *hostConnPool) fill() {
 
 			// this is call with the connection pool mutex held, this call will
 			// then recursively try to lock it again. FIXME
-			go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
+			if pool.session.cfg.ConvictionPolicy.AddFailure(err, pool.host) {
+				go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
+			}
 			return
 		}
 

+ 1 - 1
control.go

@@ -341,7 +341,7 @@ func (c *controlConn) reconnect(refreshring bool) {
 	if host != nil {
 		// try to connect to the old host
 		conn, err := c.session.connect(host, c)
-		if err != nil {
+		if err != nil && c.session.cfg.ConvictionPolicy.AddFailure(err, host) {
 			// host is dead
 			// TODO: this is replicated in a few places
 			c.session.handleNodeDown(host.ConnectAddress(), host.Port())

+ 20 - 0
policies.go

@@ -791,6 +791,26 @@ func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
 	}
 }
 
+// ConvictionPolicy interface is used by gocql to determine if a host should be
+// marked as DOWN based on the error and host info
+type ConvictionPolicy interface {
+	// Implementations should return `true` if the host should be convicted, `false` otherwise.
+	AddFailure(error error, host *HostInfo) bool
+	//Implementations should clear out any convictions or state regarding the host.
+	Reset(host *HostInfo)
+}
+
+// SimpleConvictionPolicy implements a ConvictionPolicy which convicts all hosts
+// regardless of error
+type SimpleConvictionPolicy struct {
+}
+
+func (e *SimpleConvictionPolicy) AddFailure(error error, host *HostInfo) bool {
+	return true
+}
+
+func (e *SimpleConvictionPolicy) Reset(host *HostInfo) {}
+
 // ReconnectionPolicy interface is used by gocql to determine if reconnection
 // can be attempted after connection error. The interface allows gocql users
 // to implement their own logic to determine how to attempt reconnection.