Procházet zdrojové kódy

Merge pull request #238 from phillipCouto/query_stats2

Added stats to Query structure.
Ben Hood před 11 roky
rodič
revize
874844dd3f
3 změnil soubory, kde provedl 111 přidání a 22 odebrání
  1. 45 0
      cassandra_test.go
  2. 9 5
      conn_test.go
  3. 57 17
      session.go

+ 45 - 0
cassandra_test.go

@@ -1169,3 +1169,48 @@ func TestVarint(t *testing.T) {
 		t.Errorf("Expected %v, was %v", nil, *resultBig)
 	}
 }
+
+//TestQueryStats confirms that the stats are returning valid data. Accuracy may be questionable.
+func TestQueryStats(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+	qry := session.Query("SELECT * FROM system.peers")
+	if err := qry.Exec(); err != nil {
+		t.Fatalf("query failed. %v", err)
+	} else {
+		if qry.Attempts() < 1 {
+			t.Fatal("expected at least 1 attempt, but got 0")
+		}
+		if qry.Latency() <= 0 {
+			t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
+		}
+	}
+}
+
+//TestBatchStats confirms that the stats are returning valid data. Accuracy may be questionable.
+func TestBatchStats(t *testing.T) {
+	if *flagProto == 1 {
+		t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
+	}
+	session := createSession(t)
+	defer session.Close()
+
+	if err := createTable(session, "CREATE TABLE batchStats (id int, PRIMARY KEY (id))"); err != nil {
+		t.Fatalf("failed to create table with error '%v'", err)
+	}
+
+	b := session.NewBatch(LoggedBatch)
+	b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
+	b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)
+
+	if err := session.ExecuteBatch(b); err != nil {
+		t.Fatalf("query failed. %v", err)
+	} else {
+		if b.Attempts() < 1 {
+			t.Fatal("expected at least 1 attempt, but got 0")
+		}
+		if b.Latency() <= 0 {
+			t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
+		}
+	}
+}

+ 9 - 5
conn_test.go

@@ -86,13 +86,17 @@ func TestQueryRetry(t *testing.T) {
 		t.Fatal("no timeout")
 	}()
 	rt := RetryPolicy{NumRetries: 1}
-
-	if err := db.Query("kill").RetryPolicy(rt).Exec(); err == nil {
+	qry := db.Query("kill").RetryPolicy(rt)
+	if err := qry.Exec(); err == nil {
 		t.Fatal("expected error")
 	}
-	//Minus 1 from the nKillReq variable since there is the initial query attempt
-	if srv.nKillReq-1 != uint64(rt.NumRetries) {
-		t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, srv.nKillReq-1)
+	requests := srv.nKillReq
+	if requests != uint64(qry.Attempts()) {
+		t.Fatalf("expected requests %v to match query attemps %v", requests, qry.Attempts())
+	}
+	//Minus 1 from the requests variable since there is the initial query attempt
+	if requests-1 != uint64(rt.NumRetries) {
+		t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, requests-1)
 	}
 }
 

+ 57 - 17
session.go

@@ -134,7 +134,9 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 	}
 
 	var iter *Iter
-	for count := 0; count <= qry.rt.NumRetries; count++ {
+	qry.attempts = 0
+	qry.totalLatency = 0
+	for qry.attempts <= qry.rt.NumRetries {
 		conn := s.Pool.Pick(qry)
 
 		//Assign the error unavailable to the iterator
@@ -143,7 +145,11 @@ func (s *Session) executeQuery(qry *Query) *Iter {
 			break
 		}
 
+		t := time.Now()
 		iter = conn.executeQuery(qry)
+		qry.totalLatency += time.Now().Sub(t).Nanoseconds()
+		qry.attempts++
+
 		//Exit for loop if the query was successful
 		if iter.err == nil {
 			break
@@ -169,7 +175,9 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 	}
 
 	var err error
-	for count := 0; count <= batch.rt.NumRetries; count++ {
+	batch.attempts = 0
+	batch.totalLatency = 0
+	for batch.attempts <= batch.rt.NumRetries {
 		conn := s.Pool.Pick(nil)
 
 		//Assign the error unavailable and break loop
@@ -177,8 +185,10 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 			err = ErrNoConnections
 			break
 		}
-
+		t := time.Now()
 		err = conn.executeBatch(batch)
+		batch.totalLatency += time.Now().Sub(t).Nanoseconds()
+		batch.attempts++
 		//Exit loop if operation executed correctly
 		if err == nil {
 			return nil
@@ -190,16 +200,31 @@ func (s *Session) ExecuteBatch(batch *Batch) error {
 
 // Query represents a CQL statement that can be executed.
 type Query struct {
-	stmt      string
-	values    []interface{}
-	cons      Consistency
-	pageSize  int
-	pageState []byte
-	prefetch  float64
-	trace     Tracer
-	session   *Session
-	rt        RetryPolicy
-	binding   func(q *QueryInfo) ([]interface{}, error)
+	stmt         string
+	values       []interface{}
+	cons         Consistency
+	pageSize     int
+	pageState    []byte
+	prefetch     float64
+	trace        Tracer
+	session      *Session
+	rt           RetryPolicy
+	binding      func(q *QueryInfo) ([]interface{}, error)
+	attempts     int
+	totalLatency int64
+}
+
+//Attempts returns the number of times the query was executed.
+func (q *Query) Attempts() int {
+	return q.attempts
+}
+
+//Latency returns the average amount of nanoseconds per attempt of the query.
+func (q *Query) Latency() int64 {
+	if q.attempts > 0 {
+		return q.totalLatency / int64(q.attempts)
+	}
+	return 0
 }
 
 // Consistency sets the consistency level for this query. If no consistency
@@ -397,10 +422,12 @@ func (n *nextIter) fetch() *Iter {
 }
 
 type Batch struct {
-	Type    BatchType
-	Entries []BatchEntry
-	Cons    Consistency
-	rt      RetryPolicy
+	Type         BatchType
+	Entries      []BatchEntry
+	Cons         Consistency
+	rt           RetryPolicy
+	attempts     int
+	totalLatency int64
 }
 
 // NewBatch creates a new batch operation without defaults from the cluster
@@ -413,6 +440,19 @@ func (s *Session) NewBatch(typ BatchType) *Batch {
 	return &Batch{Type: typ, rt: s.cfg.RetryPolicy}
 }
 
+// Attempts returns the number of attempts made to execute the batch.
+func (b *Batch) Attempts() int {
+	return b.attempts
+}
+
+//Latency returns the average number of nanoseconds to execute a single attempt of the batch.
+func (b *Batch) Latency() int64 {
+	if b.attempts > 0 {
+		return b.totalLatency / int64(b.attempts)
+	}
+	return 0
+}
+
 // Query adds the query to the batch operation
 func (b *Batch) Query(stmt string, args ...interface{}) {
 	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})