浏览代码

Merge pull request #514 from Zariel/schema-agreement-conn

wait for schema agreement in conn
Chris Bannister 10 年之前
父节点
当前提交
59a2dc3ba1
共有 7 个文件被更改,包括 142 次插入103 次删除
  1. 2 13
      cassandra_test.go
  2. 19 14
      cluster.go
  3. 65 2
      conn.go
  4. 21 4
      connectionpool.go
  5. 23 59
      control.go
  6. 1 1
      session.go
  7. 11 10
      session_test.go

+ 2 - 13
cassandra_test.go

@@ -62,7 +62,7 @@ func createTable(s *Session, table string) error {
 		return err
 	}
 
-	return s.control.awaitSchemaAgreement()
+	return nil
 }
 
 func createCluster() *ClusterConfig {
@@ -71,6 +71,7 @@ func createCluster() *ClusterConfig {
 	cluster.CQLVersion = *flagCQL
 	cluster.Timeout = *flagTimeout
 	cluster.Consistency = Quorum
+	cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
 	if *flagRetry > 0 {
 		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
 	}
@@ -101,10 +102,6 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 		tb.Fatal(err)
 	}
 
-	if err = session.control.awaitSchemaAgreement(); err != nil {
-		tb.Fatal(err)
-	}
-
 	err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
 	WITH replication = {
 		'class' : 'SimpleStrategy',
@@ -114,14 +111,6 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 	if err != nil {
 		tb.Fatal(err)
 	}
-
-	// the schema version might be out of data between 2 nodes, so wait for the
-	// cluster to settle.
-	// TODO(zariel): use events here to know when the cluster has resolved to the
-	// new schema version
-	if err = session.control.awaitSchemaAgreement(); err != nil {
-		tb.Fatal(err)
-	}
 }
 
 func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {

+ 19 - 14
cluster.go

@@ -63,7 +63,7 @@ type PoolConfig struct {
 	ConnSelectionPolicy func() ConnSelectionPolicy
 }
 
-func (p PoolConfig) buildPool(cfg *ClusterConfig) (*policyConnPool, error) {
+func (p PoolConfig) buildPool(session *Session) (*policyConnPool, error) {
 	hostSelection := p.HostSelectionPolicy
 	if hostSelection == nil {
 		hostSelection = RoundRobinHostPolicy()
@@ -74,7 +74,7 @@ func (p PoolConfig) buildPool(cfg *ClusterConfig) (*policyConnPool, error) {
 		connSelection = RoundRobinConnPolicy()
 	}
 
-	return newPolicyConnPool(cfg, hostSelection, connSelection)
+	return newPolicyConnPool(session, hostSelection, connSelection)
 }
 
 // ClusterConfig is a struct to configure the default cluster implementation
@@ -107,6 +107,10 @@ type ClusterConfig struct {
 	// configuration of host selection and connection selection policies.
 	PoolConfig PoolConfig
 
+	// The maximum amount of time to wait for schema agreement in a cluster after
+	// receiving a schema change frame. (deault: 60s)
+	MaxWaitSchemaAgreement time.Duration
+
 	// internal config for testing
 	disableControlConn bool
 }
@@ -114,18 +118,19 @@ type ClusterConfig struct {
 // NewCluster generates a new config for the default cluster implementation.
 func NewCluster(hosts ...string) *ClusterConfig {
 	cfg := &ClusterConfig{
-		Hosts:             hosts,
-		CQLVersion:        "3.0.0",
-		ProtoVersion:      2,
-		Timeout:           600 * time.Millisecond,
-		Port:              9042,
-		NumConns:          2,
-		Consistency:       Quorum,
-		DiscoverHosts:     false,
-		MaxPreparedStmts:  defaultMaxPreparedStmts,
-		MaxRoutingKeyInfo: 1000,
-		PageSize:          5000,
-		DefaultTimestamp:  true,
+		Hosts:                  hosts,
+		CQLVersion:             "3.0.0",
+		ProtoVersion:           2,
+		Timeout:                600 * time.Millisecond,
+		Port:                   9042,
+		NumConns:               2,
+		Consistency:            Quorum,
+		DiscoverHosts:          false,
+		MaxPreparedStmts:       defaultMaxPreparedStmts,
+		MaxRoutingKeyInfo:      1000,
+		PageSize:               5000,
+		DefaultTimestamp:       true,
+		MaxWaitSchemaAgreement: 60 * time.Second,
 	}
 	return cfg
 }

+ 65 - 2
conn.go

@@ -114,6 +114,8 @@ type Conn struct {
 	currentKeyspace string
 	started         bool
 
+	session *Session
+
 	closed int32
 	quit   chan struct{}
 
@@ -122,7 +124,7 @@ type Conn struct {
 
 // Connect establishes a connection to a Cassandra node.
 // You must also call the Serve method before you can execute any queries.
-func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
+func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, session *Session) (*Conn, error) {
 	var (
 		err  error
 		conn net.Conn
@@ -178,6 +180,7 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn
 		auth:         cfg.Authenticator,
 		headerBuf:    make([]byte, headerSize),
 		quit:         make(chan struct{}),
+		session:      session,
 	}
 
 	if cfg.Keepalive > 0 {
@@ -700,8 +703,15 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 
 		return iter
-	case *resultKeyspaceFrame, *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction:
+	case *resultKeyspaceFrame:
 		return &Iter{framer: framer}
+	case *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction:
+		iter := &Iter{framer: framer}
+		c.awaitSchemaAgreement()
+		// dont return an error from this, might be a good idea to give a warning
+		// though. The impact of this returning an error would be that the cluster
+		// is not consistent with regards to its schema.
+		return iter
 	case *RequestErrUnprepared:
 		stmtsLRU.Lock()
 		stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
@@ -886,6 +896,59 @@ func (c *Conn) setKeepalive(d time.Duration) error {
 	return nil
 }
 
+func (c *Conn) query(statement string, values ...interface{}) (iter *Iter) {
+	q := c.session.Query(statement, values...).Consistency(One)
+	return c.executeQuery(q)
+}
+
+func (c *Conn) awaitSchemaAgreement() (err error) {
+	const (
+		peerSchemas  = "SELECT schema_version FROM system.peers"
+		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
+	)
+
+	endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
+	for time.Now().Before(endDeadline) {
+		iter := c.query(peerSchemas)
+
+		versions := make(map[string]struct{})
+
+		var schemaVersion string
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		iter = c.query(localSchemas)
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		if len(versions) <= 1 {
+			return nil
+		}
+
+	cont:
+		time.Sleep(200 * time.Millisecond)
+	}
+
+	if err != nil {
+		return
+	}
+
+	// not exported
+	return errors.New("gocql: cluster schema versions not consistent")
+}
+
 type inflightPrepare struct {
 	info QueryInfo
 	err  error

+ 21 - 4
connectionpool.go

@@ -58,6 +58,8 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
 }
 
 type policyConnPool struct {
+	session *Session
+
 	port     int
 	numConns int
 	connCfg  *ConnConfig
@@ -69,7 +71,7 @@ type policyConnPool struct {
 	hostConnPools map[string]*hostConnPool
 }
 
-func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
+func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
 	connPolicy func() ConnSelectionPolicy) (*policyConnPool, error) {
 
 	var (
@@ -77,6 +79,8 @@ func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
 		tlsConfig *tls.Config
 	)
 
+	cfg := session.cfg
+
 	if cfg.SslOpts != nil {
 		tlsConfig, err = setupTLSConfig(cfg.SslOpts)
 		if err != nil {
@@ -86,6 +90,7 @@ func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
 
 	// create the pool
 	pool := &policyConnPool{
+		session:  session,
 		port:     cfg.Port,
 		numConns: cfg.NumConns,
 		connCfg: &ConnConfig{
@@ -130,6 +135,7 @@ func (p *policyConnPool) SetHosts(hosts []HostInfo) {
 		if !exists {
 			// create a connection pool for the host
 			pool = newHostConnPool(
+				p.session,
 				hosts[i].Peer,
 				p.port,
 				p.numConns,
@@ -184,9 +190,18 @@ func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
 		host = nextHost()
 		if host == nil {
 			break
+		} else if host.Info() == nil {
+			panic(fmt.Sprintf("policy %T returned no host info: %+v", p.hostPolicy, host))
+		}
+
+		pool, ok := p.hostConnPools[host.Info().Peer]
+		if !ok {
+			continue
 		}
-		conn = p.hostConnPools[host.Info().Peer].Pick(qry)
+
+		conn = pool.Pick(qry)
 	}
+
 	p.mu.RUnlock()
 	return host, conn
 }
@@ -208,6 +223,7 @@ func (p *policyConnPool) Close() {
 // hostConnPool is a connection pool for a single host.
 // Connection selection is based on a provided ConnSelectionPolicy
 type hostConnPool struct {
+	session  *Session
 	host     string
 	port     int
 	addr     string
@@ -222,10 +238,11 @@ type hostConnPool struct {
 	filling bool
 }
 
-func newHostConnPool(host string, port int, size int, connCfg *ConnConfig,
+func newHostConnPool(session *Session, host string, port, size int, connCfg *ConnConfig,
 	keyspace string, policy ConnSelectionPolicy) *hostConnPool {
 
 	pool := &hostConnPool{
+		session:  session,
 		host:     host,
 		port:     port,
 		addr:     JoinHostPort(host, port),
@@ -395,7 +412,7 @@ func (pool *hostConnPool) fillingStopped() {
 // create a new connection to the host and add it to the pool
 func (pool *hostConnPool) connect() error {
 	// try to connect
-	conn, err := Connect(pool.addr, pool.connCfg, pool)
+	conn, err := Connect(pool.addr, pool.connCfg, pool, pool.session)
 	if err != nil {
 		return err
 	}

+ 23 - 59
control.go

@@ -88,7 +88,7 @@ func (c *controlConn) reconnect(refreshring bool) {
 		return
 	}
 
-	newConn, err := Connect(conn.addr, conn.cfg, c)
+	newConn, err := Connect(conn.addr, conn.cfg, c, c.session)
 	if err != nil {
 		host.Mark(err)
 		// TODO: add log handler for things like this
@@ -135,18 +135,15 @@ func (c *controlConn) writeFrame(w frameWriter) (frame, error) {
 	return framer.parseFrame()
 }
 
-// query will return nil if the connection is closed or nil
-func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
-	q := c.session.Query(statement, values...).Consistency(One)
-
+func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
 	const maxConnectAttempts = 5
 	connectAttempts := 0
 
-	for {
+	for i := 0; i < maxConnectAttempts; i++ {
 		conn := c.conn.Load().(*Conn)
 		if conn == nil {
 			if connectAttempts > maxConnectAttempts {
-				return &Iter{err: errNoControl}
+				break
 			}
 
 			connectAttempts++
@@ -155,67 +152,34 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
 			continue
 		}
 
-		iter = conn.executeQuery(q)
-		q.attempts++
-		if iter.err == nil || !c.retry.Attempt(q) {
-			break
-		}
+		return fn(conn)
 	}
 
-	return
+	return &Iter{err: errNoControl}
 }
 
-func (c *controlConn) awaitSchemaAgreement() (err error) {
-
-	const (
-		// TODO(zariel): if we export this make this configurable
-		maxWaitTime = 60 * time.Second
-
-		peerSchemas  = "SELECT schema_version FROM system.peers"
-		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
-	)
-
-	endDeadline := time.Now().Add(maxWaitTime)
-
-	for time.Now().Before(endDeadline) {
-		iter := c.query(peerSchemas)
-
-		versions := make(map[string]struct{})
-
-		var schemaVersion string
-		for iter.Scan(&schemaVersion) {
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
-		}
-
-		if err = iter.Close(); err != nil {
-			goto cont
-		}
-
-		iter = c.query(localSchemas)
-		for iter.Scan(&schemaVersion) {
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
-		}
+// query will return nil if the connection is closed or nil
+func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
+	q := c.session.Query(statement, values...).Consistency(One)
 
-		if err = iter.Close(); err != nil {
-			goto cont
-		}
+	for {
+		iter = c.withConn(func(conn *Conn) *Iter {
+			return conn.executeQuery(q)
+		})
 
-		if len(versions) <= 1 {
-			return nil
+		q.attempts++
+		if iter.err == nil || !c.retry.Attempt(q) {
+			break
 		}
-
-	cont:
-		time.Sleep(200 * time.Millisecond)
 	}
 
-	if err != nil {
-		return
-	}
+	return
+}
 
-	// not exported
-	return errors.New("gocql: cluster schema versions not consistent")
+func (c *controlConn) awaitSchemaAgreement() error {
+	return c.withConn(func(conn *Conn) *Iter {
+		return &Iter{err: conn.awaitSchemaAgreement()}
+	}).err
 }
 
 func (c *controlConn) addr() string {
@@ -231,4 +195,4 @@ func (c *controlConn) close() {
 	close(c.quit)
 }
 
-var errNoControl = errors.New("gocql: no controll connection available")
+var errNoControl = errors.New("gocql: no control connection available")

+ 1 - 1
session.go

@@ -75,7 +75,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		pageSize: cfg.PageSize,
 	}
 
-	pool, err := cfg.PoolConfig.buildPool(&s.cfg)
+	pool, err := cfg.PoolConfig.buildPool(s)
 	if err != nil {
 		return nil, err
 	}

+ 11 - 10
session_test.go

@@ -10,17 +10,17 @@ import (
 func TestSessionAPI(t *testing.T) {
 
 	cfg := &ClusterConfig{}
-	pool, err := cfg.PoolConfig.buildPool(cfg)
-	if err != nil {
-		t.Fatal(err)
-	}
 
 	s := &Session{
-		pool: pool,
 		cfg:  *cfg,
 		cons: Quorum,
 	}
 
+	var err error
+	s.pool, err = cfg.PoolConfig.buildPool(s)
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer s.Close()
 
 	s.SetConsistency(All)
@@ -160,18 +160,19 @@ func TestQueryShouldPrepare(t *testing.T) {
 func TestBatchBasicAPI(t *testing.T) {
 
 	cfg := &ClusterConfig{RetryPolicy: &SimpleRetryPolicy{NumRetries: 2}}
-	pool, err := cfg.PoolConfig.buildPool(cfg)
-	if err != nil {
-		t.Fatal(err)
-	}
 
 	s := &Session{
-		pool: pool,
 		cfg:  *cfg,
 		cons: Quorum,
 	}
 	defer s.Close()
 
+	var err error
+	s.pool, err = cfg.PoolConfig.buildPool(s)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	b := s.NewBatch(UnloggedBatch)
 	if b.Type != UnloggedBatch {
 		t.Fatalf("expceted batch.Type to be '%v', got '%v'", UnloggedBatch, b.Type)