Browse Source

wait for schema agreement in conn

After receiving a schema change frame wait for the cluster schemas
to become consistent.

Move logic for waiting for schema agreement onto Conn, provide
cluster config to change max wait time.
Chris Bannister 10 years ago
parent
commit
c8020c32f0
7 changed files with 142 additions and 103 deletions
  1. 2 13
      cassandra_test.go
  2. 19 14
      cluster.go
  3. 65 2
      conn.go
  4. 21 4
      connectionpool.go
  5. 23 59
      control.go
  6. 1 1
      session.go
  7. 11 10
      session_test.go

+ 2 - 13
cassandra_test.go

@@ -62,7 +62,7 @@ func createTable(s *Session, table string) error {
 		return err
 		return err
 	}
 	}
 
 
-	return s.control.awaitSchemaAgreement()
+	return nil
 }
 }
 
 
 func createCluster() *ClusterConfig {
 func createCluster() *ClusterConfig {
@@ -71,6 +71,7 @@ func createCluster() *ClusterConfig {
 	cluster.CQLVersion = *flagCQL
 	cluster.CQLVersion = *flagCQL
 	cluster.Timeout = *flagTimeout
 	cluster.Timeout = *flagTimeout
 	cluster.Consistency = Quorum
 	cluster.Consistency = Quorum
+	cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
 	if *flagRetry > 0 {
 	if *flagRetry > 0 {
 		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
 		cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
 	}
 	}
@@ -101,10 +102,6 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 		tb.Fatal(err)
 		tb.Fatal(err)
 	}
 	}
 
 
-	if err = session.control.awaitSchemaAgreement(); err != nil {
-		tb.Fatal(err)
-	}
-
 	err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
 	err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
 	WITH replication = {
 	WITH replication = {
 		'class' : 'SimpleStrategy',
 		'class' : 'SimpleStrategy',
@@ -114,14 +111,6 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 	if err != nil {
 	if err != nil {
 		tb.Fatal(err)
 		tb.Fatal(err)
 	}
 	}
-
-	// the schema version might be out of data between 2 nodes, so wait for the
-	// cluster to settle.
-	// TODO(zariel): use events here to know when the cluster has resolved to the
-	// new schema version
-	if err = session.control.awaitSchemaAgreement(); err != nil {
-		tb.Fatal(err)
-	}
 }
 }
 
 
 func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
 func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {

+ 19 - 14
cluster.go

@@ -63,7 +63,7 @@ type PoolConfig struct {
 	ConnSelectionPolicy func() ConnSelectionPolicy
 	ConnSelectionPolicy func() ConnSelectionPolicy
 }
 }
 
 
-func (p PoolConfig) buildPool(cfg *ClusterConfig) (*policyConnPool, error) {
+func (p PoolConfig) buildPool(session *Session) (*policyConnPool, error) {
 	hostSelection := p.HostSelectionPolicy
 	hostSelection := p.HostSelectionPolicy
 	if hostSelection == nil {
 	if hostSelection == nil {
 		hostSelection = RoundRobinHostPolicy()
 		hostSelection = RoundRobinHostPolicy()
@@ -74,7 +74,7 @@ func (p PoolConfig) buildPool(cfg *ClusterConfig) (*policyConnPool, error) {
 		connSelection = RoundRobinConnPolicy()
 		connSelection = RoundRobinConnPolicy()
 	}
 	}
 
 
-	return newPolicyConnPool(cfg, hostSelection, connSelection)
+	return newPolicyConnPool(session, hostSelection, connSelection)
 }
 }
 
 
 // ClusterConfig is a struct to configure the default cluster implementation
 // ClusterConfig is a struct to configure the default cluster implementation
@@ -107,6 +107,10 @@ type ClusterConfig struct {
 	// configuration of host selection and connection selection policies.
 	// configuration of host selection and connection selection policies.
 	PoolConfig PoolConfig
 	PoolConfig PoolConfig
 
 
+	// The maximum amount of time to wait for schema agreement in a cluster after
+	// receiving a schema change frame. (deault: 60s)
+	MaxWaitSchemaAgreement time.Duration
+
 	// internal config for testing
 	// internal config for testing
 	disableControlConn bool
 	disableControlConn bool
 }
 }
@@ -114,18 +118,19 @@ type ClusterConfig struct {
 // NewCluster generates a new config for the default cluster implementation.
 // NewCluster generates a new config for the default cluster implementation.
 func NewCluster(hosts ...string) *ClusterConfig {
 func NewCluster(hosts ...string) *ClusterConfig {
 	cfg := &ClusterConfig{
 	cfg := &ClusterConfig{
-		Hosts:             hosts,
-		CQLVersion:        "3.0.0",
-		ProtoVersion:      2,
-		Timeout:           600 * time.Millisecond,
-		Port:              9042,
-		NumConns:          2,
-		Consistency:       Quorum,
-		DiscoverHosts:     false,
-		MaxPreparedStmts:  defaultMaxPreparedStmts,
-		MaxRoutingKeyInfo: 1000,
-		PageSize:          5000,
-		DefaultTimestamp:  true,
+		Hosts:                  hosts,
+		CQLVersion:             "3.0.0",
+		ProtoVersion:           2,
+		Timeout:                600 * time.Millisecond,
+		Port:                   9042,
+		NumConns:               2,
+		Consistency:            Quorum,
+		DiscoverHosts:          false,
+		MaxPreparedStmts:       defaultMaxPreparedStmts,
+		MaxRoutingKeyInfo:      1000,
+		PageSize:               5000,
+		DefaultTimestamp:       true,
+		MaxWaitSchemaAgreement: 60 * time.Second,
 	}
 	}
 	return cfg
 	return cfg
 }
 }

+ 65 - 2
conn.go

@@ -114,6 +114,8 @@ type Conn struct {
 	currentKeyspace string
 	currentKeyspace string
 	started         bool
 	started         bool
 
 
+	session *Session
+
 	closed int32
 	closed int32
 	quit   chan struct{}
 	quit   chan struct{}
 
 
@@ -122,7 +124,7 @@ type Conn struct {
 
 
 // Connect establishes a connection to a Cassandra node.
 // Connect establishes a connection to a Cassandra node.
 // You must also call the Serve method before you can execute any queries.
 // You must also call the Serve method before you can execute any queries.
-func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) {
+func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler, session *Session) (*Conn, error) {
 	var (
 	var (
 		err  error
 		err  error
 		conn net.Conn
 		conn net.Conn
@@ -178,6 +180,7 @@ func Connect(addr string, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn
 		auth:         cfg.Authenticator,
 		auth:         cfg.Authenticator,
 		headerBuf:    make([]byte, headerSize),
 		headerBuf:    make([]byte, headerSize),
 		quit:         make(chan struct{}),
 		quit:         make(chan struct{}),
+		session:      session,
 	}
 	}
 
 
 	if cfg.Keepalive > 0 {
 	if cfg.Keepalive > 0 {
@@ -700,8 +703,15 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 		}
 
 
 		return iter
 		return iter
-	case *resultKeyspaceFrame, *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction:
+	case *resultKeyspaceFrame:
 		return &Iter{framer: framer}
 		return &Iter{framer: framer}
+	case *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction:
+		iter := &Iter{framer: framer}
+		c.awaitSchemaAgreement()
+		// dont return an error from this, might be a good idea to give a warning
+		// though. The impact of this returning an error would be that the cluster
+		// is not consistent with regards to its schema.
+		return iter
 	case *RequestErrUnprepared:
 	case *RequestErrUnprepared:
 		stmtsLRU.Lock()
 		stmtsLRU.Lock()
 		stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
 		stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
@@ -886,6 +896,59 @@ func (c *Conn) setKeepalive(d time.Duration) error {
 	return nil
 	return nil
 }
 }
 
 
+func (c *Conn) query(statement string, values ...interface{}) (iter *Iter) {
+	q := c.session.Query(statement, values...).Consistency(One)
+	return c.executeQuery(q)
+}
+
+func (c *Conn) awaitSchemaAgreement() (err error) {
+	const (
+		peerSchemas  = "SELECT schema_version FROM system.peers"
+		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
+	)
+
+	endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
+	for time.Now().Before(endDeadline) {
+		iter := c.query(peerSchemas)
+
+		versions := make(map[string]struct{})
+
+		var schemaVersion string
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		iter = c.query(localSchemas)
+		for iter.Scan(&schemaVersion) {
+			versions[schemaVersion] = struct{}{}
+			schemaVersion = ""
+		}
+
+		if err = iter.Close(); err != nil {
+			goto cont
+		}
+
+		if len(versions) <= 1 {
+			return nil
+		}
+
+	cont:
+		time.Sleep(200 * time.Millisecond)
+	}
+
+	if err != nil {
+		return
+	}
+
+	// not exported
+	return errors.New("gocql: cluster schema versions not consistent")
+}
+
 type inflightPrepare struct {
 type inflightPrepare struct {
 	info QueryInfo
 	info QueryInfo
 	err  error
 	err  error

+ 21 - 4
connectionpool.go

@@ -58,6 +58,8 @@ func setupTLSConfig(sslOpts *SslOptions) (*tls.Config, error) {
 }
 }
 
 
 type policyConnPool struct {
 type policyConnPool struct {
+	session *Session
+
 	port     int
 	port     int
 	numConns int
 	numConns int
 	connCfg  *ConnConfig
 	connCfg  *ConnConfig
@@ -69,7 +71,7 @@ type policyConnPool struct {
 	hostConnPools map[string]*hostConnPool
 	hostConnPools map[string]*hostConnPool
 }
 }
 
 
-func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
+func newPolicyConnPool(session *Session, hostPolicy HostSelectionPolicy,
 	connPolicy func() ConnSelectionPolicy) (*policyConnPool, error) {
 	connPolicy func() ConnSelectionPolicy) (*policyConnPool, error) {
 
 
 	var (
 	var (
@@ -77,6 +79,8 @@ func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
 		tlsConfig *tls.Config
 		tlsConfig *tls.Config
 	)
 	)
 
 
+	cfg := session.cfg
+
 	if cfg.SslOpts != nil {
 	if cfg.SslOpts != nil {
 		tlsConfig, err = setupTLSConfig(cfg.SslOpts)
 		tlsConfig, err = setupTLSConfig(cfg.SslOpts)
 		if err != nil {
 		if err != nil {
@@ -86,6 +90,7 @@ func newPolicyConnPool(cfg *ClusterConfig, hostPolicy HostSelectionPolicy,
 
 
 	// create the pool
 	// create the pool
 	pool := &policyConnPool{
 	pool := &policyConnPool{
+		session:  session,
 		port:     cfg.Port,
 		port:     cfg.Port,
 		numConns: cfg.NumConns,
 		numConns: cfg.NumConns,
 		connCfg: &ConnConfig{
 		connCfg: &ConnConfig{
@@ -130,6 +135,7 @@ func (p *policyConnPool) SetHosts(hosts []HostInfo) {
 		if !exists {
 		if !exists {
 			// create a connection pool for the host
 			// create a connection pool for the host
 			pool = newHostConnPool(
 			pool = newHostConnPool(
+				p.session,
 				hosts[i].Peer,
 				hosts[i].Peer,
 				p.port,
 				p.port,
 				p.numConns,
 				p.numConns,
@@ -184,9 +190,18 @@ func (p *policyConnPool) Pick(qry *Query) (SelectedHost, *Conn) {
 		host = nextHost()
 		host = nextHost()
 		if host == nil {
 		if host == nil {
 			break
 			break
+		} else if host.Info() == nil {
+			panic(fmt.Sprintf("policy %T returned no host info: %+v", p.hostPolicy, host))
+		}
+
+		pool, ok := p.hostConnPools[host.Info().Peer]
+		if !ok {
+			continue
 		}
 		}
-		conn = p.hostConnPools[host.Info().Peer].Pick(qry)
+
+		conn = pool.Pick(qry)
 	}
 	}
+
 	p.mu.RUnlock()
 	p.mu.RUnlock()
 	return host, conn
 	return host, conn
 }
 }
@@ -208,6 +223,7 @@ func (p *policyConnPool) Close() {
 // hostConnPool is a connection pool for a single host.
 // hostConnPool is a connection pool for a single host.
 // Connection selection is based on a provided ConnSelectionPolicy
 // Connection selection is based on a provided ConnSelectionPolicy
 type hostConnPool struct {
 type hostConnPool struct {
+	session  *Session
 	host     string
 	host     string
 	port     int
 	port     int
 	addr     string
 	addr     string
@@ -222,10 +238,11 @@ type hostConnPool struct {
 	filling bool
 	filling bool
 }
 }
 
 
-func newHostConnPool(host string, port int, size int, connCfg *ConnConfig,
+func newHostConnPool(session *Session, host string, port, size int, connCfg *ConnConfig,
 	keyspace string, policy ConnSelectionPolicy) *hostConnPool {
 	keyspace string, policy ConnSelectionPolicy) *hostConnPool {
 
 
 	pool := &hostConnPool{
 	pool := &hostConnPool{
+		session:  session,
 		host:     host,
 		host:     host,
 		port:     port,
 		port:     port,
 		addr:     JoinHostPort(host, port),
 		addr:     JoinHostPort(host, port),
@@ -395,7 +412,7 @@ func (pool *hostConnPool) fillingStopped() {
 // create a new connection to the host and add it to the pool
 // create a new connection to the host and add it to the pool
 func (pool *hostConnPool) connect() error {
 func (pool *hostConnPool) connect() error {
 	// try to connect
 	// try to connect
-	conn, err := Connect(pool.addr, pool.connCfg, pool)
+	conn, err := Connect(pool.addr, pool.connCfg, pool, pool.session)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 23 - 59
control.go

@@ -88,7 +88,7 @@ func (c *controlConn) reconnect(refreshring bool) {
 		return
 		return
 	}
 	}
 
 
-	newConn, err := Connect(conn.addr, conn.cfg, c)
+	newConn, err := Connect(conn.addr, conn.cfg, c, c.session)
 	if err != nil {
 	if err != nil {
 		host.Mark(err)
 		host.Mark(err)
 		// TODO: add log handler for things like this
 		// TODO: add log handler for things like this
@@ -135,18 +135,15 @@ func (c *controlConn) writeFrame(w frameWriter) (frame, error) {
 	return framer.parseFrame()
 	return framer.parseFrame()
 }
 }
 
 
-// query will return nil if the connection is closed or nil
-func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
-	q := c.session.Query(statement, values...).Consistency(One)
-
+func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
 	const maxConnectAttempts = 5
 	const maxConnectAttempts = 5
 	connectAttempts := 0
 	connectAttempts := 0
 
 
-	for {
+	for i := 0; i < maxConnectAttempts; i++ {
 		conn := c.conn.Load().(*Conn)
 		conn := c.conn.Load().(*Conn)
 		if conn == nil {
 		if conn == nil {
 			if connectAttempts > maxConnectAttempts {
 			if connectAttempts > maxConnectAttempts {
-				return &Iter{err: errNoControl}
+				break
 			}
 			}
 
 
 			connectAttempts++
 			connectAttempts++
@@ -155,67 +152,34 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
 			continue
 			continue
 		}
 		}
 
 
-		iter = conn.executeQuery(q)
-		q.attempts++
-		if iter.err == nil || !c.retry.Attempt(q) {
-			break
-		}
+		return fn(conn)
 	}
 	}
 
 
-	return
+	return &Iter{err: errNoControl}
 }
 }
 
 
-func (c *controlConn) awaitSchemaAgreement() (err error) {
-
-	const (
-		// TODO(zariel): if we export this make this configurable
-		maxWaitTime = 60 * time.Second
-
-		peerSchemas  = "SELECT schema_version FROM system.peers"
-		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
-	)
-
-	endDeadline := time.Now().Add(maxWaitTime)
-
-	for time.Now().Before(endDeadline) {
-		iter := c.query(peerSchemas)
-
-		versions := make(map[string]struct{})
-
-		var schemaVersion string
-		for iter.Scan(&schemaVersion) {
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
-		}
-
-		if err = iter.Close(); err != nil {
-			goto cont
-		}
-
-		iter = c.query(localSchemas)
-		for iter.Scan(&schemaVersion) {
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
-		}
+// query will return nil if the connection is closed or nil
+func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
+	q := c.session.Query(statement, values...).Consistency(One)
 
 
-		if err = iter.Close(); err != nil {
-			goto cont
-		}
+	for {
+		iter = c.withConn(func(conn *Conn) *Iter {
+			return conn.executeQuery(q)
+		})
 
 
-		if len(versions) <= 1 {
-			return nil
+		q.attempts++
+		if iter.err == nil || !c.retry.Attempt(q) {
+			break
 		}
 		}
-
-	cont:
-		time.Sleep(200 * time.Millisecond)
 	}
 	}
 
 
-	if err != nil {
-		return
-	}
+	return
+}
 
 
-	// not exported
-	return errors.New("gocql: cluster schema versions not consistent")
+func (c *controlConn) awaitSchemaAgreement() error {
+	return c.withConn(func(conn *Conn) *Iter {
+		return &Iter{err: conn.awaitSchemaAgreement()}
+	}).err
 }
 }
 
 
 func (c *controlConn) addr() string {
 func (c *controlConn) addr() string {
@@ -231,4 +195,4 @@ func (c *controlConn) close() {
 	close(c.quit)
 	close(c.quit)
 }
 }
 
 
-var errNoControl = errors.New("gocql: no controll connection available")
+var errNoControl = errors.New("gocql: no control connection available")

+ 1 - 1
session.go

@@ -75,7 +75,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 		pageSize: cfg.PageSize,
 		pageSize: cfg.PageSize,
 	}
 	}
 
 
-	pool, err := cfg.PoolConfig.buildPool(&s.cfg)
+	pool, err := cfg.PoolConfig.buildPool(s)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 11 - 10
session_test.go

@@ -10,17 +10,17 @@ import (
 func TestSessionAPI(t *testing.T) {
 func TestSessionAPI(t *testing.T) {
 
 
 	cfg := &ClusterConfig{}
 	cfg := &ClusterConfig{}
-	pool, err := cfg.PoolConfig.buildPool(cfg)
-	if err != nil {
-		t.Fatal(err)
-	}
 
 
 	s := &Session{
 	s := &Session{
-		pool: pool,
 		cfg:  *cfg,
 		cfg:  *cfg,
 		cons: Quorum,
 		cons: Quorum,
 	}
 	}
 
 
+	var err error
+	s.pool, err = cfg.PoolConfig.buildPool(s)
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer s.Close()
 	defer s.Close()
 
 
 	s.SetConsistency(All)
 	s.SetConsistency(All)
@@ -160,18 +160,19 @@ func TestQueryShouldPrepare(t *testing.T) {
 func TestBatchBasicAPI(t *testing.T) {
 func TestBatchBasicAPI(t *testing.T) {
 
 
 	cfg := &ClusterConfig{RetryPolicy: &SimpleRetryPolicy{NumRetries: 2}}
 	cfg := &ClusterConfig{RetryPolicy: &SimpleRetryPolicy{NumRetries: 2}}
-	pool, err := cfg.PoolConfig.buildPool(cfg)
-	if err != nil {
-		t.Fatal(err)
-	}
 
 
 	s := &Session{
 	s := &Session{
-		pool: pool,
 		cfg:  *cfg,
 		cfg:  *cfg,
 		cons: Quorum,
 		cons: Quorum,
 	}
 	}
 	defer s.Close()
 	defer s.Close()
 
 
+	var err error
+	s.pool, err = cfg.PoolConfig.buildPool(s)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	b := s.NewBatch(UnloggedBatch)
 	b := s.NewBatch(UnloggedBatch)
 	if b.Type != UnloggedBatch {
 	if b.Type != UnloggedBatch {
 		t.Fatalf("expceted batch.Type to be '%v', got '%v'", UnloggedBatch, b.Type)
 		t.Fatalf("expceted batch.Type to be '%v', got '%v'", UnloggedBatch, b.Type)