metrics.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package stat
  2. import (
  3. "os"
  4. "sync"
  5. "time"
  6. "github.com/tal-tech/go-zero/core/executors"
  7. "github.com/tal-tech/go-zero/core/logx"
  8. )
  9. var (
  10. LogInterval = time.Minute
  11. writerLock sync.Mutex
  12. reportWriter Writer = nil
  13. )
  14. type (
  15. Writer interface {
  16. Write(report *StatReport) error
  17. }
  18. StatReport struct {
  19. Name string `json:"name"`
  20. Timestamp int64 `json:"tm"`
  21. Pid int `json:"pid"`
  22. ReqsPerSecond float32 `json:"qps"`
  23. Drops int `json:"drops"`
  24. Average float32 `json:"avg"`
  25. Median float32 `json:"med"`
  26. Top90th float32 `json:"t90"`
  27. Top99th float32 `json:"t99"`
  28. Top99p9th float32 `json:"t99p9"`
  29. }
  30. Metrics struct {
  31. executor *executors.PeriodicalExecutor
  32. container *metricsContainer
  33. }
  34. )
  35. func SetReportWriter(writer Writer) {
  36. writerLock.Lock()
  37. reportWriter = writer
  38. writerLock.Unlock()
  39. }
  40. func NewMetrics(name string) *Metrics {
  41. container := &metricsContainer{
  42. name: name,
  43. pid: os.Getpid(),
  44. }
  45. return &Metrics{
  46. executor: executors.NewPeriodicalExecutor(LogInterval, container),
  47. container: container,
  48. }
  49. }
  50. func (m *Metrics) Add(task Task) {
  51. m.executor.Add(task)
  52. }
  53. func (m *Metrics) AddDrop() {
  54. m.executor.Add(Task{
  55. Drop: true,
  56. })
  57. }
  58. func (m *Metrics) SetName(name string) {
  59. m.executor.Sync(func() {
  60. m.container.name = name
  61. })
  62. }
  63. type (
  64. tasksDurationPair struct {
  65. tasks []Task
  66. duration time.Duration
  67. drops int
  68. }
  69. metricsContainer struct {
  70. name string
  71. pid int
  72. tasks []Task
  73. duration time.Duration
  74. drops int
  75. }
  76. )
  77. func (c *metricsContainer) AddTask(v interface{}) bool {
  78. if task, ok := v.(Task); ok {
  79. if task.Drop {
  80. c.drops++
  81. } else {
  82. c.tasks = append(c.tasks, task)
  83. c.duration += task.Duration
  84. }
  85. }
  86. return false
  87. }
  88. func (c *metricsContainer) Execute(v interface{}) {
  89. pair := v.(tasksDurationPair)
  90. tasks := pair.tasks
  91. duration := pair.duration
  92. drops := pair.drops
  93. size := len(tasks)
  94. report := &StatReport{
  95. Name: c.name,
  96. Timestamp: time.Now().Unix(),
  97. Pid: c.pid,
  98. ReqsPerSecond: float32(size) / float32(LogInterval/time.Second),
  99. Drops: drops,
  100. }
  101. if size > 0 {
  102. report.Average = float32(duration/time.Millisecond) / float32(size)
  103. fiftyPercent := size >> 1
  104. if fiftyPercent > 0 {
  105. top50pTasks := topK(tasks, fiftyPercent)
  106. medianTask := top50pTasks[0]
  107. report.Median = float32(medianTask.Duration) / float32(time.Millisecond)
  108. tenPercent := fiftyPercent / 5
  109. if tenPercent > 0 {
  110. top10pTasks := topK(tasks, tenPercent)
  111. task90th := top10pTasks[0]
  112. report.Top90th = float32(task90th.Duration) / float32(time.Millisecond)
  113. onePercent := tenPercent / 10
  114. if onePercent > 0 {
  115. top1pTasks := topK(top10pTasks, onePercent)
  116. task99th := top1pTasks[0]
  117. report.Top99th = float32(task99th.Duration) / float32(time.Millisecond)
  118. pointOnePercent := onePercent / 10
  119. if pointOnePercent > 0 {
  120. topPointOneTasks := topK(top1pTasks, pointOnePercent)
  121. task99Point9th := topPointOneTasks[0]
  122. report.Top99p9th = float32(task99Point9th.Duration) / float32(time.Millisecond)
  123. } else {
  124. report.Top99p9th = getTopDuration(top1pTasks)
  125. }
  126. } else {
  127. mostDuration := getTopDuration(top10pTasks)
  128. report.Top99th = mostDuration
  129. report.Top99p9th = mostDuration
  130. }
  131. } else {
  132. mostDuration := getTopDuration(tasks)
  133. report.Top90th = mostDuration
  134. report.Top99th = mostDuration
  135. report.Top99p9th = mostDuration
  136. }
  137. } else {
  138. mostDuration := getTopDuration(tasks)
  139. report.Median = mostDuration
  140. report.Top90th = mostDuration
  141. report.Top99th = mostDuration
  142. report.Top99p9th = mostDuration
  143. }
  144. }
  145. log(report)
  146. }
  147. func (c *metricsContainer) RemoveAll() interface{} {
  148. tasks := c.tasks
  149. duration := c.duration
  150. drops := c.drops
  151. c.tasks = nil
  152. c.duration = 0
  153. c.drops = 0
  154. return tasksDurationPair{
  155. tasks: tasks,
  156. duration: duration,
  157. drops: drops,
  158. }
  159. }
  160. func getTopDuration(tasks []Task) float32 {
  161. top := topK(tasks, 1)
  162. if len(top) < 1 {
  163. return 0
  164. } else {
  165. return float32(top[0].Duration) / float32(time.Millisecond)
  166. }
  167. }
  168. func log(report *StatReport) {
  169. writeReport(report)
  170. logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+
  171. "90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms",
  172. report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median,
  173. report.Top90th, report.Top99th, report.Top99p9th)
  174. }
  175. func writeReport(report *StatReport) {
  176. writerLock.Lock()
  177. defer writerLock.Unlock()
  178. if reportWriter != nil {
  179. if err := reportWriter.Write(report); err != nil {
  180. logx.Error(err)
  181. }
  182. }
  183. }