Prechádzať zdrojové kódy

Add frame header observer for per-frame instrumentation

Jacob Greenleaf 7 rokov pred
rodič
commit
cb53b83ccd
6 zmenil súbory, kde vykonal 105 pridanie a 19 odobranie
  1. 1 0
      AUTHORS
  2. 5 1
      cluster.go
  3. 34 18
      conn.go
  4. 41 0
      conn_test.go
  5. 22 0
      frame.go
  6. 2 0
      session.go

+ 1 - 0
AUTHORS

@@ -105,3 +105,4 @@ Zhixin Wen <john.wenzhixin@hotmail.com>
 Chang Liu <changliu.it@gmail.com>
 Ingo Oeser <nightlyone@gmail.com>
 Luke Hines <lukehines@protonmail.com>
+Jacob Greenleaf <jacob@jacobgreenleaf.com>

+ 5 - 1
cluster.go

@@ -122,9 +122,13 @@ type ClusterConfig struct {
 	QueryObserver QueryObserver
 
 	// BatchObserver will set the provided batch observer on all queries created from this session.
-	// Use it to collect metrics / stats from batche queries by providing an implementation of BatchObserver.
+	// Use it to collect metrics / stats from batch queries by providing an implementation of BatchObserver.
 	BatchObserver BatchObserver
 
+	// FrameHeaderObserver will set the provided frame header observer on all frames' headers created from this session.
+	// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
+	FrameHeaderObserver FrameHeaderObserver
+
 	// Default idempotence for queries
 	DefaultIdempotence bool
 

+ 34 - 18
conn.go

@@ -123,10 +123,11 @@ var TimeoutLimit int64 = 10
 // queries, but users are usually advised to use a more reliable, higher
 // level API.
 type Conn struct {
-	conn    net.Conn
-	r       *bufio.Reader
-	timeout time.Duration
-	cfg     *ConnConfig
+	conn          net.Conn
+	r             *bufio.Reader
+	timeout       time.Duration
+	cfg           *ConnConfig
+	frameObserver FrameHeaderObserver
 
 	headerBuf [maxFrameHeaderSize]byte
 
@@ -187,20 +188,21 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
 	}
 
 	c := &Conn{
-		conn:         conn,
-		r:            bufio.NewReader(conn),
-		cfg:          cfg,
-		calls:        make(map[int]*callReq),
-		timeout:      cfg.Timeout,
-		version:      uint8(cfg.ProtoVersion),
-		addr:         conn.RemoteAddr().String(),
-		errorHandler: errorHandler,
-		compressor:   cfg.Compressor,
-		auth:         cfg.Authenticator,
-		quit:         make(chan struct{}),
-		session:      s,
-		streams:      streams.New(cfg.ProtoVersion),
-		host:         host,
+		conn:          conn,
+		r:             bufio.NewReader(conn),
+		cfg:           cfg,
+		calls:         make(map[int]*callReq),
+		timeout:       cfg.Timeout,
+		version:       uint8(cfg.ProtoVersion),
+		addr:          conn.RemoteAddr().String(),
+		errorHandler:  errorHandler,
+		compressor:    cfg.Compressor,
+		auth:          cfg.Authenticator,
+		quit:          make(chan struct{}),
+		session:       s,
+		streams:       streams.New(cfg.ProtoVersion),
+		host:          host,
+		frameObserver: s.frameObserver,
 	}
 
 	if cfg.Keepalive > 0 {
@@ -459,12 +461,26 @@ func (c *Conn) recv() error {
 		c.conn.SetReadDeadline(time.Time{})
 	}
 
+	headStartTime := time.Now()
 	// were just reading headers over and over and copy bodies
 	head, err := readHeader(c.r, c.headerBuf[:])
+	headEndTime := time.Now()
 	if err != nil {
 		return err
 	}
 
+	if c.frameObserver != nil {
+		c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
+			Version: byte(head.version),
+			Flags:   head.flags,
+			Stream:  int16(head.stream),
+			Opcode:  byte(head.op),
+			Length:  int32(head.length),
+			Start:   headStartTime,
+			End:     headEndTime,
+		})
+	}
+
 	if head.stream > c.streams.NumStreams {
 		return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.stream)
 	} else if head.stream == -1 {

+ 41 - 0
conn_test.go

@@ -636,6 +636,47 @@ func TestContext_Timeout(t *testing.T) {
 	}
 }
 
+type recordingFrameHeaderObserver struct {
+	t *testing.T
+	frames []ObservedFrameHeader
+}
+func (r *recordingFrameHeaderObserver) ObserveFrameHeader(ctx context.Context, frm ObservedFrameHeader) {
+	r.frames = append(r.frames, frm)
+}
+
+func TestFrameHeaderObserver(t *testing.T) {
+	srv := NewTestServer(t, defaultProto, context.Background())
+	defer srv.Stop()
+
+	cluster := testCluster(srv.Address, defaultProto)
+	observer := &recordingFrameHeaderObserver{t: t}
+	cluster.FrameHeaderObserver = observer
+
+	db, err := cluster.CreateSession()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if err := db.Query("void").Exec(); err != nil {
+		t.Fatal(err)
+	}
+
+	if len(observer.frames) != 2 {
+		t.Fatalf("Expected to receive 2 frames, instead received %d", len(observer.frames))
+	}
+	readyFrame := observer.frames[0]
+	if readyFrame.Opcode != byte(opReady) {
+		t.Fatalf("Expected to receive ready frame, instead received frame of opcode %d", readyFrame.Opcode)
+	}
+	voidResultFrame := observer.frames[1]
+	if voidResultFrame.Opcode != byte(opResult) {
+		t.Fatalf("Expected to receive result frame, instead received frame of opcode %d", voidResultFrame.Opcode)
+	}
+	if voidResultFrame.Length != int32(4) {
+		t.Fatalf("Expected to receive frame with body length 4, instead received body length %d", voidResultFrame.Length)
+	}
+}
+
 func NewTestServer(t testing.TB, protocol uint8, ctx context.Context) *TestServer {
 	laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
 	if err != nil {

+ 22 - 0
frame.go

@@ -5,6 +5,7 @@
 package gocql
 
 import (
+	"context"
 	"errors"
 	"fmt"
 	"io"
@@ -345,6 +346,27 @@ func (f frameHeader) Header() frameHeader {
 
 const defaultBufSize = 128
 
+type ObservedFrameHeader struct {
+	Version byte
+	Flags   byte
+	Stream  int16
+	Opcode  byte
+	Length  int32
+
+	// StartHeader is the time we started reading the frame header off the network connection.
+	Start time.Time
+	// EndHeader is the time we finished reading the frame header off the network connection.
+	End time.Time
+}
+
+// FrameHeaderObserver is the interface implemented by frame observers / stat collectors.
+//
+// Experimental, this interface and use may change
+type FrameHeaderObserver interface {
+	// ObserveFrameHeader gets called on every received frame header.
+	ObserveFrameHeader(context.Context, ObservedFrameHeader)
+}
+
 // a framer is responsible for reading, writing and parsing frames on a single stream
 type framer struct {
 	r io.Reader

+ 2 - 0
session.go

@@ -39,6 +39,7 @@ type Session struct {
 	trace               Tracer
 	queryObserver       QueryObserver
 	batchObserver       BatchObserver
+	frameObserver       FrameHeaderObserver
 	hostSource          *ringDescriber
 	stmtsLRU            *preparedLRU
 
@@ -138,6 +139,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 
 	s.queryObserver = cfg.QueryObserver
 	s.batchObserver = cfg.BatchObserver
+	s.frameObserver = cfg.FrameHeaderObserver
 
 	//Check the TLS Config before trying to connect to anything external
 	connCfg, err := connConfig(&s.cfg)