merge_logger.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package logutil
  15. import (
  16. "fmt"
  17. "sync"
  18. "time"
  19. "github.com/coreos/pkg/capnslog"
  20. )
  21. var (
  22. defaultMergePeriod = time.Second
  23. defaultTimeOutputScale = 10 * time.Millisecond
  24. outputInterval = time.Second
  25. )
  26. // line represents a log line that can be printed out
  27. // through capnslog.PackageLogger.
  28. type line struct {
  29. level capnslog.LogLevel
  30. str string
  31. }
  32. func (l line) append(s string) line {
  33. return line{
  34. level: l.level,
  35. str: l.str + " " + s,
  36. }
  37. }
  38. // status represents the merge status of a line.
  39. type status struct {
  40. period time.Duration
  41. start time.Time // start time of latest merge period
  42. count int // number of merged lines from starting
  43. }
  44. func (s *status) isInMergePeriod(now time.Time) bool {
  45. return s.period == 0 || s.start.Add(s.period).After(now)
  46. }
  47. func (s *status) isEmpty() bool { return s.count == 0 }
  48. func (s *status) summary(now time.Time) string {
  49. ts := s.start.Round(defaultTimeOutputScale)
  50. took := now.Round(defaultTimeOutputScale).Sub(ts)
  51. return fmt.Sprintf("[merged %d repeated lines in %s]", s.count, took)
  52. }
  53. func (s *status) reset(now time.Time) {
  54. s.start = now
  55. s.count = 0
  56. }
  57. // MergeLogger supports merge logging, which merges repeated log lines
  58. // and prints summary log lines instead.
  59. //
  60. // For merge logging, MergeLogger prints out the line when the line appears
  61. // at the first time. MergeLogger holds the same log line printed within
  62. // defaultMergePeriod, and prints out summary log line at the end of defaultMergePeriod.
  63. // It stops merging when the line doesn't appear within the
  64. // defaultMergePeriod.
  65. type MergeLogger struct {
  66. *capnslog.PackageLogger
  67. mu sync.Mutex // protect statusm
  68. statusm map[line]*status
  69. }
  70. func NewMergeLogger(logger *capnslog.PackageLogger) *MergeLogger {
  71. l := &MergeLogger{
  72. PackageLogger: logger,
  73. statusm: make(map[line]*status),
  74. }
  75. go l.outputLoop()
  76. return l
  77. }
  78. func (l *MergeLogger) MergeInfo(entries ...interface{}) {
  79. l.merge(line{
  80. level: capnslog.INFO,
  81. str: fmt.Sprint(entries...),
  82. })
  83. }
  84. func (l *MergeLogger) MergeInfof(format string, args ...interface{}) {
  85. l.merge(line{
  86. level: capnslog.INFO,
  87. str: fmt.Sprintf(format, args...),
  88. })
  89. }
  90. func (l *MergeLogger) MergeNotice(entries ...interface{}) {
  91. l.merge(line{
  92. level: capnslog.NOTICE,
  93. str: fmt.Sprint(entries...),
  94. })
  95. }
  96. func (l *MergeLogger) MergeNoticef(format string, args ...interface{}) {
  97. l.merge(line{
  98. level: capnslog.NOTICE,
  99. str: fmt.Sprintf(format, args...),
  100. })
  101. }
  102. func (l *MergeLogger) MergeWarning(entries ...interface{}) {
  103. l.merge(line{
  104. level: capnslog.WARNING,
  105. str: fmt.Sprint(entries...),
  106. })
  107. }
  108. func (l *MergeLogger) MergeWarningf(format string, args ...interface{}) {
  109. l.merge(line{
  110. level: capnslog.WARNING,
  111. str: fmt.Sprintf(format, args...),
  112. })
  113. }
  114. func (l *MergeLogger) MergeError(entries ...interface{}) {
  115. l.merge(line{
  116. level: capnslog.ERROR,
  117. str: fmt.Sprint(entries...),
  118. })
  119. }
  120. func (l *MergeLogger) MergeErrorf(format string, args ...interface{}) {
  121. l.merge(line{
  122. level: capnslog.ERROR,
  123. str: fmt.Sprintf(format, args...),
  124. })
  125. }
  126. func (l *MergeLogger) merge(ln line) {
  127. l.mu.Lock()
  128. // increase count if the logger is merging the line
  129. if status, ok := l.statusm[ln]; ok {
  130. status.count++
  131. l.mu.Unlock()
  132. return
  133. }
  134. // initialize status of the line
  135. l.statusm[ln] = &status{
  136. period: defaultMergePeriod,
  137. start: time.Now(),
  138. }
  139. // release the lock before IO operation
  140. l.mu.Unlock()
  141. // print out the line at its first time
  142. l.PackageLogger.Logf(ln.level, ln.str)
  143. }
  144. func (l *MergeLogger) outputLoop() {
  145. for now := range time.Tick(outputInterval) {
  146. var outputs []line
  147. l.mu.Lock()
  148. for ln, status := range l.statusm {
  149. if status.isInMergePeriod(now) {
  150. continue
  151. }
  152. if status.isEmpty() {
  153. delete(l.statusm, ln)
  154. continue
  155. }
  156. outputs = append(outputs, ln.append(status.summary(now)))
  157. status.reset(now)
  158. }
  159. l.mu.Unlock()
  160. for _, o := range outputs {
  161. l.PackageLogger.Logf(o.level, o.str)
  162. }
  163. }
  164. }