| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package gocql
- import (
- "errors"
- "fmt"
- "sync/atomic"
- "time"
- )
- type controlConn struct {
- connecting uint64
- session *Session
- conn atomic.Value
- retry RetryPolicy
- quit chan struct{}
- }
- func createControlConn(session *Session) *controlConn {
- control := &controlConn{
- session: session,
- quit: make(chan struct{}),
- retry: &SimpleRetryPolicy{NumRetries: 3},
- }
- control.conn.Store((*Conn)(nil))
- go control.heartBeat()
- return control
- }
- func (c *controlConn) heartBeat() {
- for {
- select {
- case <-c.quit:
- return
- case <-time.After(5 * time.Second):
- }
- resp, err := c.writeFrame(&writeOptionsFrame{})
- if err != nil {
- goto reconn
- }
- switch resp.(type) {
- case *supportedFrame:
- continue
- case error:
- goto reconn
- default:
- panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp))
- }
- reconn:
- c.reconnect(true)
- // time.Sleep(5 * time.Second)
- continue
- }
- }
- func (c *controlConn) reconnect(refreshring bool) {
- if !atomic.CompareAndSwapUint64(&c.connecting, 0, 1) {
- return
- }
- success := false
- defer func() {
- // debounce reconnect a little
- if success {
- go func() {
- time.Sleep(500 * time.Millisecond)
- atomic.StoreUint64(&c.connecting, 0)
- }()
- } else {
- atomic.StoreUint64(&c.connecting, 0)
- }
- }()
- oldConn := c.conn.Load().(*Conn)
- // TODO: should have our own roundrobbin for hosts so that we can try each
- // in succession and guantee that we get a different host each time.
- host, conn := c.session.pool.Pick(nil)
- if conn == nil {
- return
- }
- newConn, err := Connect(conn.addr, conn.cfg, c, c.session)
- if err != nil {
- host.Mark(err)
- // TODO: add log handler for things like this
- return
- }
- host.Mark(nil)
- c.conn.Store(newConn)
- success = true
- if oldConn != nil {
- oldConn.Close()
- }
- if refreshring && c.session.cfg.DiscoverHosts {
- c.session.hostSource.refreshRing()
- }
- }
- func (c *controlConn) HandleError(conn *Conn, err error, closed bool) {
- if !closed {
- return
- }
- oldConn := c.conn.Load().(*Conn)
- if oldConn != conn {
- return
- }
- c.reconnect(true)
- }
- func (c *controlConn) writeFrame(w frameWriter) (frame, error) {
- conn := c.conn.Load().(*Conn)
- if conn == nil {
- return nil, errNoControl
- }
- framer, err := conn.exec(w, nil)
- if err != nil {
- return nil, err
- }
- return framer.parseFrame()
- }
- func (c *controlConn) withConn(fn func(*Conn) *Iter) *Iter {
- const maxConnectAttempts = 5
- connectAttempts := 0
- for i := 0; i < maxConnectAttempts; i++ {
- conn := c.conn.Load().(*Conn)
- if conn == nil {
- if connectAttempts > maxConnectAttempts {
- break
- }
- connectAttempts++
- c.reconnect(false)
- continue
- }
- return fn(conn)
- }
- return &Iter{err: errNoControl}
- }
- // query will return nil if the connection is closed or nil
- func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter) {
- q := c.session.Query(statement, values...).Consistency(One)
- for {
- iter = c.withConn(func(conn *Conn) *Iter {
- return conn.executeQuery(q)
- })
- q.attempts++
- if iter.err == nil || !c.retry.Attempt(q) {
- break
- }
- }
- return
- }
- func (c *controlConn) awaitSchemaAgreement() error {
- return c.withConn(func(conn *Conn) *Iter {
- return &Iter{err: conn.awaitSchemaAgreement()}
- }).err
- }
- func (c *controlConn) addr() string {
- conn := c.conn.Load().(*Conn)
- if conn == nil {
- return ""
- }
- return conn.addr
- }
- func (c *controlConn) close() {
- // TODO: handle more gracefully
- close(c.quit)
- }
- var errNoControl = errors.New("gocql: no control connection available")
|