浏览代码

Check if peer valid in awaitSchemaAgreement (#1300)

Signed-off-by: Alex Lourie <djay.il@gmail.com>
Alex Lourie 6 年之前
父节点
当前提交
0a80211e3c
共有 2 个文件被更改,包括 22 次插入8 次删除
  1. 15 8
      conn.go
  2. 7 0
      host_source.go

+ 15 - 8
conn.go

@@ -1353,11 +1353,12 @@ func (c *Conn) query(ctx context.Context, statement string, values ...interface{
 
 func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
 	const (
-		peerSchemas  = "SELECT schema_version, peer FROM system.peers"
+		peerSchemas  = "SELECT * FROM system.peers"
 		localSchemas = "SELECT schema_version FROM system.local WHERE key='local'"
 	)
 
 	var versions map[string]struct{}
+	var schemaVersion string
 
 	endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement)
 	for time.Now().Before(endDeadline) {
@@ -1365,16 +1366,22 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
 
 		versions = make(map[string]struct{})
 
-		var schemaVersion string
-		var peer string
-		for iter.Scan(&schemaVersion, &peer) {
-			if schemaVersion == "" {
-				Logger.Printf("skipping peer entry with empty schema_version: peer=%q", peer)
+		rows, err := iter.SliceMap()
+		if err != nil {
+			goto cont
+		}
+
+		for _, row := range rows {
+			host, err := c.session.hostInfoFromMap(row, c.session.cfg.Port)
+			if err != nil {
+				goto cont
+			}
+			if !isValidPeer(host) || host.schemaVersion == "" {
+				Logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host)
 				continue
 			}
 
-			versions[schemaVersion] = struct{}{}
-			schemaVersion = ""
+			versions[host.schemaVersion] = struct{}{}
 		}
 
 		if err = iter.Close(); err != nil {

+ 7 - 0
host_source.go

@@ -128,6 +128,7 @@ type HostInfo struct {
 	clusterName      string
 	version          cassVersion
 	state            nodeState
+	schemaVersion	 string
 	tokens           []string
 }
 
@@ -547,6 +548,12 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*
 			if !ok {
 				return nil, fmt.Errorf(assertErrorMsg, "dse_version")
 			}
+		case "schema_version":
+			schemaVersion, ok := value.(UUID)
+			if !ok {
+				return nil, fmt.Errorf(assertErrorMsg, "schema_version")
+			}
+			host.schemaVersion = schemaVersion.String()
 		}
 		// TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
 		// Not sure what the port field will be called until the JIRA issue is complete