Chris Bannister 10 роки тому
батько
коміт
7080624247
2 змінених файлів з 37 додано та 11 видалено
  1. 4 4
      control.go
  2. 33 7
      host_source.go

+ 4 - 4
control.go

@@ -278,15 +278,15 @@ func (c *controlConn) fetchHostInfo(addr net.IP, port int) (*HostInfo, error) {
 	if isLocal {
 		fn = func(host *HostInfo) error {
 			// TODO(zariel): should we fetch rpc_address from here?
-			iter := c.query("SELECT data_center, rack, host_id, tokens FROM system.local WHERE key='local'")
-			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens)
+			iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.local WHERE key='local'")
+			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
 			return iter.Close()
 		}
 	} else {
 		fn = func(host *HostInfo) error {
 			// TODO(zariel): should we fetch rpc_address from here?
-			iter := c.query("SELECT data_center, rack, host_id, tokens FROM system.peers WHERE peer=?", addr)
-			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens)
+			iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.peers WHERE peer=?", addr)
+			iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
 			return iter.Close()
 		}
 	}

+ 33 - 7
host_source.go

@@ -2,8 +2,9 @@ package gocql
 
 import (
 	"fmt"
-	"log"
 	"net"
+	"strconv"
+	"strings"
 	"sync"
 )
 
@@ -27,6 +28,31 @@ type cassVersion struct {
 	Major, Minor, Patch int
 }
 
+func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
+	v := strings.Split(string(data), ".")
+	if len(v) != 3 {
+		return fmt.Errorf("invalid schema_version: %v", string(data))
+	}
+
+	var err error
+	c.Major, err = strconv.Atoi(v[0])
+	if err != nil {
+		return fmt.Errorf("invalid major version %v: %v", v[0], err)
+	}
+
+	c.Minor, err = strconv.Atoi(v[1])
+	if err != nil {
+		return fmt.Errorf("invalid minor version %v: %v", v[1], err)
+	}
+
+	c.Patch, err = strconv.Atoi(v[2])
+	if err != nil {
+		return fmt.Errorf("invalid patch version %v: %v", v[2], err)
+	}
+
+	return nil
+}
+
 func (c cassVersion) String() string {
 	return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
 }
@@ -192,9 +218,9 @@ func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err e
 	// on the same node to get the whole cluster
 
 	const (
-		legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner FROM system.local"
+		legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
 		// only supported in 2.2.0, 2.1.6, 2.0.16
-		localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner FROM system.local"
+		localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
 	)
 
 	localHost := &HostInfo{}
@@ -205,7 +231,7 @@ func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err e
 		}
 
 		iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
-			&localHost.hostId, &localHost.tokens, &partitioner)
+			&localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
 
 		if err = iter.Close(); err != nil {
 			return nil, "", err
@@ -216,7 +242,7 @@ func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err e
 			return r.prevHosts, r.prevPartitioner, nil
 		}
 
-		iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner)
+		iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
 
 		if err = iter.Close(); err != nil {
 			return nil, "", err
@@ -234,13 +260,13 @@ func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err e
 
 	hosts = []*HostInfo{localHost}
 
-	iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens FROM system.peers")
+	iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
 	if iter == nil {
 		return r.prevHosts, r.prevPartitioner, nil
 	}
 
 	host := &HostInfo{}
-	for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens) {
+	for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
 		if r.matchFilter(host) {
 			hosts = append(hosts, host)
 		}