ソースを参照

Merge pull request #1322 from kiwicom/ms/refactor-metrics

Refactor query metrics code
Alex Lourie 6 年 前
コミット
a325bb279f
2 ファイル変更74 行追加87 行削除
  1. 1 1
      conn_test.go
  2. 73 86
      session.go

+ 1 - 1
conn_test.go

@@ -418,7 +418,7 @@ func TestQueryMultinodeWithMetrics(t *testing.T) {
 
 	for i, ip := range addresses {
 		host := &HostInfo{connectAddress: net.ParseIP(ip)}
-		queryMetric := qry.getHostMetrics(host)
+		queryMetric := qry.metrics.hostMetrics(host)
 		observedMetrics := observer.GetMetrics(host)
 
 		requests := int(atomic.LoadInt64(&nodes[i].nKillReq))

+ 73 - 86
session.go

@@ -666,6 +666,69 @@ type queryMetrics struct {
 	m map[string]*hostMetrics
 }
 
+// hostMetricsLocked gets or creates host metrics for given host.
+func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics {
+	qm.l.Lock()
+	metrics := qm.hostMetricsLocked(host)
+	qm.l.Unlock()
+	return metrics
+}
+
+// hostMetricsLocked gets or creates host metrics for given host.
+// It must be called only while holding qm.l lock.
+func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics {
+	metrics, exists := qm.m[host.ConnectAddress().String()]
+	if !exists {
+		// if the host is not in the map, it means it's been accessed for the first time
+		metrics = &hostMetrics{}
+		qm.m[host.ConnectAddress().String()] = metrics
+	}
+
+	return metrics
+}
+
+// attempts returns the number of times the query was executed.
+func (qm *queryMetrics) attempts() int {
+	qm.l.Lock()
+	var attempts int
+	for _, metric := range qm.m {
+		attempts += metric.Attempts
+	}
+	qm.l.Unlock()
+	return attempts
+}
+
+func (qm *queryMetrics) addAttempts(i int, host *HostInfo) {
+	qm.l.Lock()
+	hostMetric := qm.hostMetricsLocked(host)
+	hostMetric.Attempts += i
+	qm.l.Unlock()
+}
+
+func (qm *queryMetrics) latency() int64 {
+	qm.l.Lock()
+	var (
+		attempts int
+		latency int64
+	)
+	for _, metric := range qm.m {
+		attempts += metric.Attempts
+		latency += metric.TotalLatency
+	}
+	qm.l.Unlock()
+	if attempts > 0 {
+		return latency / int64(attempts)
+	}
+	return 0
+}
+
+func (qm *queryMetrics) addLatency(l int64, host *HostInfo) {
+	qm.l.Lock()
+	hostMetric := qm.hostMetricsLocked(host)
+	hostMetric.TotalLatency += l
+	qm.l.Unlock()
+}
+
 // Query represents a CQL statement that can be executed.
 type Query struct {
 	stmt                  string
@@ -712,19 +775,6 @@ func (q *Query) defaultsFromSession() {
 	s.mu.RUnlock()
 }
 
-func (q *Query) getHostMetrics(host *HostInfo) *hostMetrics {
-	q.metrics.l.Lock()
-	metrics, exists := q.metrics.m[host.ConnectAddress().String()]
-	if !exists {
-		// if the host is not in the map, it means it's been accessed for the first time
-		metrics = &hostMetrics{}
-		q.metrics.m[host.ConnectAddress().String()] = metrics
-	}
-	q.metrics.l.Unlock()
-
-	return metrics
-}
-
 // Statement returns the statement that was used to generate this query.
 func (q Query) Statement() string {
 	return q.stmt
@@ -737,43 +787,20 @@ func (q Query) String() string {
 
 //Attempts returns the number of times the query was executed.
 func (q *Query) Attempts() int {
-	q.metrics.l.Lock()
-	var attempts int
-	for _, metric := range q.metrics.m {
-		attempts += metric.Attempts
-	}
-	q.metrics.l.Unlock()
-	return attempts
+	return q.metrics.attempts()
 }
 
 func (q *Query) AddAttempts(i int, host *HostInfo) {
-	hostMetric := q.getHostMetrics(host)
-	q.metrics.l.Lock()
-	hostMetric.Attempts += i
-	q.metrics.l.Unlock()
+	q.metrics.addAttempts(i, host)
 }
 
 //Latency returns the average amount of nanoseconds per attempt of the query.
 func (q *Query) Latency() int64 {
-	q.metrics.l.Lock()
-	var attempts int
-	var latency int64
-	for _, metric := range q.metrics.m {
-		attempts += metric.Attempts
-		latency += metric.TotalLatency
-	}
-	q.metrics.l.Unlock()
-	if attempts > 0 {
-		return latency / int64(attempts)
-	}
-	return 0
+	return q.metrics.latency()
 }
 
 func (q *Query) AddLatency(l int64, host *HostInfo) {
-	hostMetric := q.getHostMetrics(host)
-	q.metrics.l.Lock()
-	hostMetric.TotalLatency += l
-	q.metrics.l.Unlock()
+	q.metrics.addLatency(l, host)
 }
 
 // Consistency sets the consistency level for this query. If no consistency
@@ -899,7 +926,7 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host
 			End:       end,
 			Rows:      iter.numRows,
 			Host:      host,
-			Metrics:   q.getHostMetrics(host),
+			Metrics:   q.metrics.hostMetrics(host),
 			Err:       iter.err,
 		})
 	}
@@ -1478,19 +1505,6 @@ func (s *Session) NewBatch(typ BatchType) *Batch {
 	return batch
 }
 
-func (b *Batch) getHostMetrics(host *HostInfo) *hostMetrics {
-	b.metrics.l.Lock()
-	metrics, exists := b.metrics.m[host.ConnectAddress().String()]
-	if !exists {
-		// if the host is not in the map, it means it's been accessed for the first time
-		metrics = &hostMetrics{}
-		b.metrics.m[host.ConnectAddress().String()] = metrics
-	}
-	b.metrics.l.Unlock()
-
-	return metrics
-}
-
 // Observer enables batch-level observer on this batch.
 // The provided observer will be called every time this batched query is executed.
 func (b *Batch) Observer(observer BatchObserver) *Batch {
@@ -1504,47 +1518,20 @@ func (b *Batch) Keyspace() string {
 
 // Attempts returns the number of attempts made to execute the batch.
 func (b *Batch) Attempts() int {
-	b.metrics.l.Lock()
-	defer b.metrics.l.Unlock()
-
-	var attempts int
-	for _, metric := range b.metrics.m {
-		attempts += metric.Attempts
-	}
-	return attempts
+	return b.metrics.attempts()
 }
 
 func (b *Batch) AddAttempts(i int, host *HostInfo) {
-	hostMetric := b.getHostMetrics(host)
-	b.metrics.l.Lock()
-	hostMetric.Attempts += i
-	b.metrics.l.Unlock()
+	b.metrics.addAttempts(i, host)
 }
 
 //Latency returns the average number of nanoseconds to execute a single attempt of the batch.
 func (b *Batch) Latency() int64 {
-	b.metrics.l.Lock()
-	defer b.metrics.l.Unlock()
-
-	var (
-		attempts int
-		latency  int64
-	)
-	for _, metric := range b.metrics.m {
-		attempts += metric.Attempts
-		latency += metric.TotalLatency
-	}
-	if attempts > 0 {
-		return latency / int64(attempts)
-	}
-	return 0
+	return b.metrics.latency()
 }
 
 func (b *Batch) AddLatency(l int64, host *HostInfo) {
-	hostMetric := b.getHostMetrics(host)
-	b.metrics.l.Lock()
-	hostMetric.TotalLatency += l
-	b.metrics.l.Unlock()
+	b.metrics.addLatency(l, host)
 }
 
 // GetConsistency returns the currently configured consistency level for the batch
@@ -1687,7 +1674,7 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
 		End:        end,
 		// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
 		Host:    host,
-		Metrics: b.getHostMetrics(host),
+		Metrics: b.metrics.hostMetrics(host),
 		Err:     iter.err,
 	})
 }