| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- package gocql
- import (
- "errors"
- "fmt"
- "log"
- "math/rand"
- "net"
- "sync"
- "sync/atomic"
- "time"
- )
- // Ensure that the atomic variable is aligned to a 64bit boundary
- // so that atomic operations can be applied on 32bit architectures.
- type controlConn struct {
- connecting uint64
- session *Session
- conn atomic.Value
- retry RetryPolicy
- closeWg sync.WaitGroup
- quit chan struct{}
- }
- func createControlConn(session *Session) *controlConn {
- control := &controlConn{
- session: session,
- quit: make(chan struct{}),
- retry: &SimpleRetryPolicy{NumRetries: 3},
- }
- control.conn.Store((*Conn)(nil))
- return control
- }
- func (c *controlConn) heartBeat() {
- defer c.closeWg.Done()
- for {
- select {
- case <-c.quit:
- return
- case <-time.After(5 * time.Second):
- }
- resp, err := c.writeFrame(&writeOptionsFrame{})
- if err != nil {
- goto reconn
- }
- switch resp.(type) {
- case *supportedFrame:
- continue
- case error:
- goto reconn
- default:
- panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp))
- }
- reconn:
- c.reconnect(true)
- // time.Sleep(5 * time.Second)
- continue
- }
- }
- func (c *controlConn) connect(endpoints []string) error {
- // intial connection attmept, try to connect to each endpoint to get an initial
- // list of nodes.
- // shuffle endpoints so not all drivers will connect to the same initial
- // node.
- r := rand.New(rand.NewSource(time.Now().UnixNano()))
- perm := r.Perm(len(endpoints))
- shuffled := make([]string, len(endpoints))
- for i, endpoint := range endpoints {
- shuffled[perm[i]] = endpoint
- }
- connCfg, err := connConfig(c.session)
- if err != nil {
- return err
- }
- // store that we are not connected so that reconnect wont happen if we error
- atomic.StoreInt64(&c.connecting, -1)
- var (
- conn *Conn
- )
- for _, addr := range shuffled {
- conn, err = Connect(JoinHostPort(addr, c.session.cfg.Port), connCfg, c, c.session)
- if err != nil {
- log.Printf("gocql: unable to dial %v: %v\n", addr, err)
- continue
- }
- // we should fetch the initial ring here and update initial host data. So that
- // when we return from here we have a ring topology ready to go.
- break
- }
- if conn == nil {
- // this is fatal, not going to connect a session
- return err
- }
- c.conn.Store(conn)
- atomic.StoreInt64(&c.connecting, 0)
- c.closeWg.Add(1)
- go c.heartBeat()
- return nil
- }
- func (c *controlConn) reconnect(refreshring bool) {
- if !atomic.CompareAndSwapInt64(&c.connecting, 0, 1) {
- return
- }
- success := false
- defer func() {
- // debounce reconnect a little
- if success {
- go func() {
- time.Sleep(500 * time.Millisecond)
- atomic.StoreInt64(&c.connecting, 0)
- }()
- } else {
- atomic.StoreInt64(&c.connecting, 0)
- }
- }()
- oldConn := c.conn.Load().(*Conn)
- // TODO: should have our own roundrobbin for hosts so that we can try each
- // in succession and guantee that we get a different host each time.
- host, conn := c.session.pool.Pick(nil)
- if conn == nil {
- return
- }
- newConn, err := Connect(conn.addr, conn.cfg, c, c.session)
- if err != nil {
- host.Mark(err)
- // TODO: add log handler for things like this
- return
- }
- frame, err := c.writeFrame(&writeRegisterFrame{
- events: []string{"TOPOLOGY_CHANGE", "STATUS_CHANGE", "STATUS_CHANGE"},
- })
- if err != nil {
- host.Mark(err)
- return
- } else if _, ok := frame.(*readyFrame); !ok {
- log.Printf("gocql: unexpected frame in response to register: got %T: %v\n", frame, frame)
- return
- }
- host.Mark(nil)
- c.conn.Store(newConn)
- success = true
- if oldConn != nil {
- oldConn.Close()
- }
- if refreshring {
- c.session.hostSource.refreshRing()
- }
- }
- func (c *controlConn) HandleError(conn *Conn, err error, closed bool) {
- if !closed {
- return
- }
- oldConn := c.conn.Load().(*Conn)
- if oldConn != conn {
- return
- }
- c.reconnect(true)
- }
- func (c *controlConn) writeFrame(w frameWriter) (frame, error) {
- conn := c.conn.Load().(*Conn)
- if conn == nil {
- return nil, errNoControl
- }
- framer, err := conn.exec(w, nil)
- if err != nil {
- return nil, err
- }
- return framer.parseFrame()
- }
- func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
- const maxConnectAttempts = 5
- connectAttempts := 0
- for i := 0; i < maxConnectAttempts; i++ {
- conn := c.conn.Load().(*Conn)
- if conn == nil {
- if connectAttempts > maxConnectAttempts {
- break
- }
- connectAttempts++
- c.reconnect(false)
- continue
- }
- return fn(conn)
- }
- return &Iter{err: errNoControl}
- }
- // query will return nil if the connection is closed or nil
- func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
- q := c.session.Query(statement, values...).Consistency(One)
- for {
- iter = c.withConn(func(conn *Conn) *Iter {
- return conn.executeQuery(q)
- })
- q.attempts++
- if iter.err == nil || !c.retry.Attempt(q) {
- break
- }
- }
- return
- }
- func (c *controlConn) fetchHostInfo(addr net.IP, port int) (*HostInfo, error) {
- // TODO(zariel): we should probably move this into host_source or atleast
- // share code with it.
- isLocal := c.addr() == addr.String()
- var fn func(*HostInfo) error
- if isLocal {
- fn = func(host *HostInfo) error {
- // TODO(zariel): should we fetch rpc_address from here?
- iter := c.query("SELECT data_center, rack, host_id, tokens FROM system.local WHERE key='local'")
- iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens)
- return iter.Close()
- }
- } else {
- fn = func(host *HostInfo) error {
- // TODO(zariel): should we fetch rpc_address from here?
- iter := c.query("SELECT data_center, rack, host_id, tokens FROM system.peers WHERE peer=?", addr)
- iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens)
- return iter.Close()
- }
- }
- host := &HostInfo{}
- if err := fn(host); err != nil {
- return nil, err
- }
- host.peer = addr.String()
- return host, nil
- }
- func (c *controlConn) awaitSchemaAgreement() error {
- return c.withConn(func(conn *Conn) *Iter {
- return &Iter{err: conn.awaitSchemaAgreement()}
- }).err
- }
- func (c *controlConn) addr() string {
- conn := c.conn.Load().(*Conn)
- if conn == nil {
- return ""
- }
- return conn.addr
- }
- func (c *controlConn) close() {
- // TODO: handle more gracefully
- close(c.quit)
- c.closeWg.Wait()
- conn := c.conn.Load().(*Conn)
- if conn != nil {
- conn.Close()
- }
- }
- var errNoControl = errors.New("gocql: no control connection available")
|