瀏覽代碼

add support for events

Chris Bannister 10 年之前
父節點
當前提交
732282f2ca
共有 6 個文件被更改,包括 331 次插入8 次删除
  1. 5 1
      conn.go
  2. 37 0
      connectionpool.go
  3. 46 0
      control.go
  4. 80 0
      events.go
  5. 44 0
      frame.go
  6. 119 7
      policies.go

+ 5 - 1
conn.go

@@ -397,7 +397,11 @@ func (c *Conn) recv() error {
 		return fmt.Errorf("gocql: frame header stream is beyond call exepected bounds: %d", head.stream)
 		return fmt.Errorf("gocql: frame header stream is beyond call exepected bounds: %d", head.stream)
 	} else if head.stream == -1 {
 	} else if head.stream == -1 {
 		// TODO: handle cassandra event frames, we shouldnt get any currently
 		// TODO: handle cassandra event frames, we shouldnt get any currently
-		return c.discardFrame(head)
+		framer := newFramer(c, c, c.compressor, c.version)
+		if err := framer.readFrame(&head); err != nil {
+			return err
+		}
+		go c.session.handleEvent(framer)
 	} else if head.stream <= 0 {
 	} else if head.stream <= 0 {
 		// reserved stream that we dont use, probably due to a protocol error
 		// reserved stream that we dont use, probably due to a protocol error
 		// or a bug in Cassandra, this should be an error, parse it and return.
 		// or a bug in Cassandra, this should be an error, parse it and return.

+ 37 - 0
connectionpool.go

@@ -218,6 +218,43 @@ func (p *policyConnPool) Close() {
 	}
 	}
 }
 }
 
 
+func (p *policyConnPool) addHost(host *HostInfo) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	pool, ok := p.hostConnPools[host.Peer]
+	if ok {
+		return
+	}
+
+	pool = newHostConnPool(
+		p.session,
+		host.Peer,
+		p.port,
+		p.numConns,
+		p.connCfg,
+		p.keyspace,
+		p.connPolicy(),
+	)
+
+	p.hostConnPools[host.Peer] = pool
+}
+
+func (p *policyConnPool) removeHost(addr string) {
+	p.mu.Lock()
+
+	pool, ok := p.hostConnPools[addr]
+	if !ok {
+		p.mu.Unlock()
+		return
+	}
+
+	delete(p.hostConnPools, addr)
+	p.mu.Unlock()
+
+	pool.Close()
+}
+
 // hostConnPool is a connection pool for a single host.
 // hostConnPool is a connection pool for a single host.
 // Connection selection is based on a provided ConnSelectionPolicy
 // Connection selection is based on a provided ConnSelectionPolicy
 type hostConnPool struct {
 type hostConnPool struct {

+ 46 - 0
control.go

@@ -3,6 +3,8 @@ package gocql
 import (
 import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"log"
+	"net"
 	"sync/atomic"
 	"sync/atomic"
 	"time"
 	"time"
 )
 )
@@ -98,6 +100,18 @@ func (c *controlConn) reconnect(refreshring bool) {
 		return
 		return
 	}
 	}
 
 
+	frame, err := c.writeFrame(&writeRegisterFrame{
+		events: []string{"TOPOLOGY_CHANGE", "STATUS_CHANGE", "STATUS_CHANGE"},
+	})
+
+	if err != nil {
+		host.Mark(err)
+		return
+	} else if _, ok := frame.(*readyFrame); !ok {
+		log.Printf("gocql: unexpected frame in response to register: got %T: %v\n", frame, frame)
+		return
+	}
+
 	host.Mark(nil)
 	host.Mark(nil)
 	c.conn.Store(newConn)
 	c.conn.Store(newConn)
 	success = true
 	success = true
@@ -179,6 +193,38 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
 	return
 	return
 }
 }
 
 
+func (c *controlConn) fetchHostInfo(addr net.IP, port int) (*HostInfo, error) {
+	// TODO(zariel): we should probably move this into host_source or atleast
+	// share code with it.
+	isLocal := c.addr() == addr.String()
+
+	var fn func(*HostInfo) error
+
+	if isLocal {
+		fn = func(host *HostInfo) error {
+			// TODO(zariel): should we fetch rpc_address from here?
+			iter := c.query("SELECT data_center, rack, host_id, tokens FROM system.local WHERE key='local'")
+			iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens)
+			return iter.Close()
+		}
+	} else {
+		fn = func(host *HostInfo) error {
+			// TODO(zariel): should we fetch rpc_address from here?
+			iter := c.query("SELECT data_center, rack, host_id, tokens FROM system.peers WHERE peer=?", addr)
+			iter.Scan(&host.DataCenter, &host.Rack, &host.HostId, &host.Tokens)
+			return iter.Close()
+		}
+	}
+
+	host := &HostInfo{}
+	if err := fn(host); err != nil {
+		return nil, err
+	}
+	host.Peer = addr.String()
+
+	return host, nil
+}
+
 func (c *controlConn) awaitSchemaAgreement() error {
 func (c *controlConn) awaitSchemaAgreement() error {
 	return c.withConn(func(conn *Conn) *Iter {
 	return c.withConn(func(conn *Conn) *Iter {
 		return &Iter{err: conn.awaitSchemaAgreement()}
 		return &Iter{err: conn.awaitSchemaAgreement()}

+ 80 - 0
events.go

@@ -0,0 +1,80 @@
+package gocql
+
+import (
+	"log"
+	"net"
+)
+
+func (s *Session) handleEvent(framer *framer) {
+	defer framerPool.Put(framer)
+
+	frame, err := framer.parseFrame()
+	if err != nil {
+		// TODO: logger
+		log.Printf("gocql: unable to parse event frame: %v\n", err)
+		return
+	}
+
+	// TODO: handle medatadata events
+	switch f := frame.(type) {
+	case *schemaChangeKeyspace:
+	case *schemaChangeFunction:
+	case *schemaChangeTable:
+	case *topologyChangeEventFrame:
+		switch f.change {
+		case "NEW_NODE":
+			s.handleNewNode(f.host, f.port)
+		case "REMOVED_NODE":
+			s.handleRemovedNode(f.host, f.port)
+		case "MOVED_NODE":
+			// java-driver handles this, not mentioned in the spec
+			// TODO(zariel): refresh token map
+		}
+	case *statusChangeEventFrame:
+		// TODO(zariel): is it worth having 2 methods for these?
+		switch f.change {
+		case "UP":
+			s.handleNodeUp(f.host, f.port)
+		case "DOWN":
+		}
+	default:
+		log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
+	}
+}
+
+func (s *Session) handleNewNode(host net.IP, port int) {
+	if !s.cfg.DiscoverHosts || s.control == nil {
+		return
+	}
+
+	if s.control.addr() == host.String() {
+		go s.control.reconnect(false)
+	}
+
+	hostInfo, err := s.control.fetchHostInfo(host, port)
+	if err != nil {
+		log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err)
+		return
+	}
+
+	s.pool.addHost(hostInfo)
+}
+
+func (s *Session) handleRemovedNode(host net.IP, port int) {
+	if !s.cfg.DiscoverHosts {
+		return
+	}
+
+	s.pool.removeHost(host.String())
+}
+
+func (s *Session) handleNodeUp(host net.IP, port int) {
+	// even if were not disconvering new nodes we should still handle nodes going
+	// up.
+
+	s.pool.hostUp(host.String())
+}
+
+func (s *Session) handleNodeDown(host net.IP, port int) {
+	s.pool.hostDown(host.String())
+}

+ 44 - 0
frame.go

@@ -462,6 +462,8 @@ func (f *framer) parseFrame() (frame frame, err error) {
 		frame = f.parseAuthChallengeFrame()
 		frame = f.parseAuthChallengeFrame()
 	case opAuthSuccess:
 	case opAuthSuccess:
 		frame = f.parseAuthSuccessFrame()
 		frame = f.parseAuthSuccessFrame()
+	case opEvent:
+		frame = f.parseEventFrame()
 	default:
 	default:
 		return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
 		return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
 	}
 	}
@@ -1154,6 +1156,48 @@ func (f *framer) parseAuthChallengeFrame() frame {
 	}
 	}
 }
 }
 
 
+type statusChangeEventFrame struct {
+	frameHeader
+
+	change string
+	host   net.IP
+	port   int
+}
+
+// essentially the same as statusChange
+type topologyChangeEventFrame struct {
+	frameHeader
+
+	change string
+	host   net.IP
+	port   int
+}
+
+func (f *framer) parseEventFrame() frame {
+	eventType := f.readString()
+
+	switch eventType {
+	case "TOPOLOGY_CHANGE":
+		frame := &topologyChangeEventFrame{frameHeader: *f.header}
+		frame.change = f.readString()
+		frame.host, frame.port = f.readInet()
+
+		return frame
+	case "STATUS_CHANGE":
+		frame := &statusChangeEventFrame{frameHeader: *f.header}
+		frame.change = f.readString()
+		frame.host, frame.port = f.readInet()
+
+		return frame
+	case "SCHEMA_CHANGE":
+		// this should work for all versions
+		return f.parseResultSchemaChange()
+	default:
+		panic(fmt.Errorf("gocql: unknown event type: %q", eventType))
+	}
+
+}
+
 type writeAuthResponseFrame struct {
 type writeAuthResponseFrame struct {
 	data []byte
 	data []byte
 }
 }

+ 119 - 7
policies.go

@@ -12,6 +12,78 @@ import (
 	"github.com/hailocab/go-hostpool"
 	"github.com/hailocab/go-hostpool"
 )
 )
 
 
+// cowHostList implements a copy on write host list, its equivilent type is []HostInfo
+type cowHostList struct {
+	list atomic.Value
+	mu   sync.Mutex
+}
+
+func (c *cowHostList) get() []HostInfo {
+	// TODO(zariel): should we replace this with []*HostInfo?
+	l, ok := c.list.Load().(*[]HostInfo)
+	if !ok {
+		return nil
+	}
+	return *l
+}
+
+func (c *cowHostList) set(list []HostInfo) {
+	c.mu.Lock()
+	c.list.Store(&list)
+	c.mu.Unlock()
+}
+
+func (c *cowHostList) add(host HostInfo) {
+	c.mu.Lock()
+	l := c.get()
+
+	if n := len(l); n == 0 {
+		l = append(l, host)
+	} else {
+		newL := make([]HostInfo, n+1)
+		for i := 0; i < n; i++ {
+			if host.Peer == l[i].Peer && host.HostId == l[i].HostId {
+				c.mu.Unlock()
+				return
+			}
+			newL[i] = l[i]
+		}
+		newL[n] = host
+	}
+
+	c.list.Store(&l)
+	c.mu.Unlock()
+}
+
+func (c *cowHostList) remove(host HostInfo) {
+	c.mu.Lock()
+	l := c.get()
+	size := len(l)
+	if size == 0 {
+		c.mu.Unlock()
+		return
+	}
+
+	found := false
+	newL := make([]HostInfo, 0, size)
+	for i := 0; i < len(l); i++ {
+		if host.Peer != l[i].Peer && host.HostId != l[i].HostId {
+			newL = append(newL, l[i])
+		} else {
+			found = true
+		}
+	}
+
+	if !found {
+		c.mu.Unlock()
+		return
+	}
+
+	newL = newL[:size-1 : size-1]
+	c.list.Store(&newL)
+	c.mu.Unlock()
+}
+
 // RetryableQuery is an interface that represents a query or batch statement that
 // RetryableQuery is an interface that represents a query or batch statement that
 // exposes the correct functions for the retry policy logic to evaluate correctly.
 // exposes the correct functions for the retry policy logic to evaluate correctly.
 type RetryableQuery interface {
 type RetryableQuery interface {
@@ -50,9 +122,14 @@ func (s *SimpleRetryPolicy) Attempt(q RetryableQuery) bool {
 	return q.Attempts() <= s.NumRetries
 	return q.Attempts() <= s.NumRetries
 }
 }
 
 
+type HostStateNotifier interface {
+	AddHost(host *HostInfo)
+}
+
 // HostSelectionPolicy is an interface for selecting
 // HostSelectionPolicy is an interface for selecting
 // the most appropriate host to execute a given query.
 // the most appropriate host to execute a given query.
 type HostSelectionPolicy interface {
 type HostSelectionPolicy interface {
+	HostStateNotifier
 	SetHosts
 	SetHosts
 	SetPartitioner
 	SetPartitioner
 	//Pick returns an iteration function over selected hosts
 	//Pick returns an iteration function over selected hosts
@@ -76,15 +153,13 @@ func RoundRobinHostPolicy() HostSelectionPolicy {
 }
 }
 
 
 type roundRobinHostPolicy struct {
 type roundRobinHostPolicy struct {
-	hosts []HostInfo
+	hosts cowHostList
 	pos   uint32
 	pos   uint32
 	mu    sync.RWMutex
 	mu    sync.RWMutex
 }
 }
 
 
 func (r *roundRobinHostPolicy) SetHosts(hosts []HostInfo) {
 func (r *roundRobinHostPolicy) SetHosts(hosts []HostInfo) {
-	r.mu.Lock()
-	r.hosts = hosts
-	r.mu.Unlock()
+	r.hosts.set(hosts)
 }
 }
 
 
 func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {
 func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {
@@ -96,9 +171,8 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
 	// to the number of hosts known to this policy
 	// to the number of hosts known to this policy
 	var i int
 	var i int
 	return func() SelectedHost {
 	return func() SelectedHost {
-		r.mu.RLock()
-		defer r.mu.RUnlock()
-		if len(r.hosts) == 0 {
+		hosts := r.hosts.get()
+		if len(hosts) == 0 {
 			return nil
 			return nil
 		}
 		}
 
 
@@ -114,6 +188,10 @@ func (r *roundRobinHostPolicy) Pick(qry *Query) NextHost {
 	}
 	}
 }
 }
 
 
+func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
+	r.hosts.add(*host)
+}
+
 // selectedRoundRobinHost is a host returned by the roundRobinHostPolicy and
 // selectedRoundRobinHost is a host returned by the roundRobinHostPolicy and
 // implements the SelectedHost interface
 // implements the SelectedHost interface
 type selectedRoundRobinHost struct {
 type selectedRoundRobinHost struct {
@@ -166,6 +244,22 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
 	}
 	}
 }
 }
 
 
+func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+
+	t.fallback.AddHost(host)
+	for i := range t.hosts {
+		h := &t.hosts[i]
+		if h.HostId == host.HostId && h.Peer == host.Peer {
+			return
+		}
+	}
+
+	t.hosts = append(t.hosts, *host)
+	t.resetTokenRing()
+}
+
 func (t *tokenAwareHostPolicy) resetTokenRing() {
 func (t *tokenAwareHostPolicy) resetTokenRing() {
 	if t.partitioner == "" {
 	if t.partitioner == "" {
 		// partitioner not yet set
 		// partitioner not yet set
@@ -290,6 +384,24 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []HostInfo) {
 	r.mu.Unlock()
 	r.mu.Unlock()
 }
 }
 
 
+func (r *hostPoolHostPolicy) AddHost(host *HostInfo) {
+	r.mu.Lock()
+	defer r.mu.Unlock()
+
+	if _, ok := r.hostMap[host.Peer]; ok {
+		return
+	}
+
+	hosts := make([]string, 0, len(r.hostMap)+1)
+	for addr := range r.hostMap {
+		hosts = append(hosts, addr)
+	}
+	hosts = append(hosts, host.Peer)
+
+	r.hp.SetHosts(hosts)
+	r.hostMap[host.Peer] = *host
+}
+
 func (r *hostPoolHostPolicy) SetPartitioner(partitioner string) {
 func (r *hostPoolHostPolicy) SetPartitioner(partitioner string) {
 	// noop
 	// noop
 }
 }