Browse Source

conn: add ConnectObserver (#1100)

Much like the QueryObserver, the ConnectObserver is called when a new connection
is made.
Thomas Meson 7 years ago
parent
commit
929922e7e4
3 changed files with 48 additions and 11 deletions
  1. 5 1
      cluster.go
  2. 6 4
      conn.go
  3. 37 6
      session.go

+ 5 - 1
cluster.go

@@ -125,7 +125,11 @@ type ClusterConfig struct {
 	// Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver.
 	BatchObserver BatchObserver
 
-	// FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session.
+	// ConnectObserver will set the provided connect observer on all queries
+	// created from this session.
+	ConnectObserver ConnectObserver
+
+  // FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session.
 	// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
 	FrameHeaderObserver FrameHeaderObserver
 

+ 6 - 4
conn.go

@@ -135,10 +135,11 @@ type Conn struct {
 	mu      sync.RWMutex
 	calls   map[int]*callReq
 
-	errorHandler    ConnErrorHandler
-	compressor      Compressor
-	auth            Authenticator
-	addr            string
+	errorHandler ConnErrorHandler
+	compressor   Compressor
+	auth         Authenticator
+	addr         string
+
 	version         uint8
 	currentKeyspace string
 	host            *HostInfo
@@ -1006,6 +1007,7 @@ func (c *Conn) executeBatch(batch *Batch) *Iter {
 	for i := 0; i < n; i++ {
 		entry := &batch.Entries[i]
 		b := &req.statements[i]
+
 		if len(entry.Args) > 0 || entry.binding != nil {
 			info, err := c.prepareStatement(batch.context, entry.Stmt, nil)
 			if err != nil {

+ 37 - 6
session.go

@@ -39,6 +39,7 @@ type Session struct {
 	trace               Tracer
 	queryObserver       QueryObserver
 	batchObserver       BatchObserver
+	connectObserver     ConnectObserver
 	frameObserver       FrameHeaderObserver
 	hostSource          *ringDescriber
 	stmtsLRU            *preparedLRU
@@ -107,12 +108,13 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	}
 
 	s := &Session{
-		cons:     cfg.Consistency,
-		prefetch: 0.25,
-		cfg:      cfg,
-		pageSize: cfg.PageSize,
-		stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
-		quit:     make(chan struct{}),
+		cons:            cfg.Consistency,
+		prefetch:        0.25,
+		cfg:             cfg,
+		pageSize:        cfg.PageSize,
+		stmtsLRU:        &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)},
+		quit:            make(chan struct{}),
+		connectObserver: cfg.ConnectObserver,
 	}
 
 	s.schemaDescriber = newSchemaDescriber(s)
@@ -139,6 +141,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 
 	s.queryObserver = cfg.QueryObserver
 	s.batchObserver = cfg.BatchObserver
+	s.connectObserver = cfg.ConnectObserver
 	s.frameObserver = cfg.FrameHeaderObserver
 
 	//Check the TLS Config before trying to connect to anything external
@@ -641,6 +644,17 @@ func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{})
 }
 
 func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) {
+	if s.connectObserver != nil {
+		obs := ObservedConnect{
+			Host:  host,
+			Start: time.Now(),
+		}
+		conn, err := s.dial(host, s.connCfg, errorHandler)
+		obs.End = time.Now()
+		obs.Err = err
+		s.connectObserver.ObserveConnect(obs)
+		return conn, err
+	}
 	return s.dial(host, s.connCfg, errorHandler)
 }
 
@@ -1715,6 +1729,23 @@ type BatchObserver interface {
 	ObserveBatch(context.Context, ObservedBatch)
 }
 
+type ObservedConnect struct {
+	// Host is the information about the host about to connect
+	Host *HostInfo
+
+	Start time.Time // time immediately before the dial is called
+	End   time.Time // time immediately after the dial returned
+
+	// Err is the connection error (if any)
+	Err error
+}
+
+// ConnectObserver is the interface implemented by connect observers / stat collectors.
+type ConnectObserver interface {
+	// ObserveConnect gets called when a new connection to cassandra is made.
+	ObserveConnect(ObservedConnect)
+}
+
 type Error struct {
 	Code    int
 	Message string