leader.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package stats
  15. import (
  16. "encoding/json"
  17. "log"
  18. "math"
  19. "sync"
  20. "time"
  21. )
  22. // LeaderStats is used by the leader in an etcd cluster, and encapsulates
  23. // statistics about communication with its followers
  24. type LeaderStats struct {
  25. // TODO(jonboulle): clarify that these are IDs, not names
  26. Leader string `json:"leader"`
  27. Followers map[string]*FollowerStats `json:"followers"`
  28. sync.Mutex
  29. }
  30. // NewLeaderStats generates a new LeaderStats with the given id as leader
  31. func NewLeaderStats(id string) *LeaderStats {
  32. return &LeaderStats{
  33. Leader: id,
  34. Followers: make(map[string]*FollowerStats),
  35. }
  36. }
  37. func (ls *LeaderStats) JSON() []byte {
  38. ls.Lock()
  39. stats := *ls
  40. ls.Unlock()
  41. b, err := json.Marshal(stats)
  42. // TODO(jonboulle): appropriate error handling?
  43. if err != nil {
  44. log.Printf("stats: error marshalling leader stats: %v", err)
  45. }
  46. return b
  47. }
  48. func (ls *LeaderStats) Follower(name string) *FollowerStats {
  49. ls.Lock()
  50. defer ls.Unlock()
  51. fs, ok := ls.Followers[name]
  52. if !ok {
  53. fs = &FollowerStats{}
  54. fs.Latency.Minimum = 1 << 63
  55. ls.Followers[name] = fs
  56. }
  57. return fs
  58. }
  59. // FollowerStats encapsulates various statistics about a follower in an etcd cluster
  60. type FollowerStats struct {
  61. Latency struct {
  62. Current float64 `json:"current"`
  63. Average float64 `json:"average"`
  64. averageSquare float64
  65. StandardDeviation float64 `json:"standardDeviation"`
  66. Minimum float64 `json:"minimum"`
  67. Maximum float64 `json:"maximum"`
  68. } `json:"latency"`
  69. Counts struct {
  70. Fail uint64 `json:"fail"`
  71. Success uint64 `json:"success"`
  72. } `json:"counts"`
  73. sync.Mutex
  74. }
  75. // Succ updates the FollowerStats with a successful send
  76. func (fs *FollowerStats) Succ(d time.Duration) {
  77. fs.Lock()
  78. defer fs.Unlock()
  79. total := float64(fs.Counts.Success) * fs.Latency.Average
  80. totalSquare := float64(fs.Counts.Success) * fs.Latency.averageSquare
  81. fs.Counts.Success++
  82. fs.Latency.Current = float64(d) / (1000000.0)
  83. if fs.Latency.Current > fs.Latency.Maximum {
  84. fs.Latency.Maximum = fs.Latency.Current
  85. }
  86. if fs.Latency.Current < fs.Latency.Minimum {
  87. fs.Latency.Minimum = fs.Latency.Current
  88. }
  89. fs.Latency.Average = (total + fs.Latency.Current) / float64(fs.Counts.Success)
  90. fs.Latency.averageSquare = (totalSquare + fs.Latency.Current*fs.Latency.Current) / float64(fs.Counts.Success)
  91. // sdv = sqrt(avg(x^2) - avg(x)^2)
  92. fs.Latency.StandardDeviation = math.Sqrt(fs.Latency.averageSquare - fs.Latency.Average*fs.Latency.Average)
  93. }
  94. // Fail updates the FollowerStats with an unsuccessful send
  95. func (fs *FollowerStats) Fail() {
  96. fs.Lock()
  97. defer fs.Unlock()
  98. fs.Counts.Fail++
  99. }