submitter.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. // Copyright 2014 The Cockroach Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. // implied. See the License for the specific language governing
  13. // permissions and limitations under the License. See the AUTHORS file
  14. // for names of contributors.
  15. //
  16. // Author: Tyler Neely (t@jujit.su)
  17. package loghisto
  18. import (
  19. "net"
  20. "sync"
  21. "time"
  22. )
  23. type requestable interface{}
  24. type requestableArray interface {
  25. ToRequest() []byte
  26. }
  27. // Submitter encapsulates the state of a metric submitter.
  28. type Submitter struct {
  29. // backlog works as an evicting queue
  30. backlog [60][]byte
  31. backlogHead uint
  32. backlogTail uint
  33. backlogMu sync.Mutex
  34. serializer func(*ProcessedMetricSet) []byte
  35. DestinationNetwork string
  36. DestinationAddress string
  37. metricSystem *MetricSystem
  38. metricChan chan *ProcessedMetricSet
  39. shutdownChan chan struct{}
  40. }
  41. // NewSubmitter creates a Submitter that receives metrics off of a
  42. // specified metric channel, serializes them using the provided
  43. // serialization function, and attempts to send them to the
  44. // specified destination.
  45. func NewSubmitter(metricSystem *MetricSystem,
  46. serializer func(*ProcessedMetricSet) []byte, destinationNetwork string,
  47. destinationAddress string) *Submitter {
  48. metricChan := make(chan *ProcessedMetricSet, 60)
  49. metricSystem.SubscribeToProcessedMetrics(metricChan)
  50. return &Submitter{
  51. backlog: [60][]byte{},
  52. backlogHead: 0,
  53. backlogTail: 0,
  54. serializer: serializer,
  55. DestinationNetwork: destinationNetwork,
  56. DestinationAddress: destinationAddress,
  57. metricSystem: metricSystem,
  58. metricChan: metricChan,
  59. shutdownChan: make(chan struct{}),
  60. }
  61. }
  62. func (s *Submitter) retryBacklog() error {
  63. var request []byte
  64. for {
  65. s.backlogMu.Lock()
  66. head := s.backlogHead
  67. tail := s.backlogTail
  68. if head != tail {
  69. request = s.backlog[head]
  70. }
  71. s.backlogMu.Unlock()
  72. if head == tail {
  73. return nil
  74. }
  75. err := s.submit(request)
  76. if err != nil {
  77. return err
  78. }
  79. s.backlogMu.Lock()
  80. s.backlogHead = (s.backlogHead + 1) % 60
  81. s.backlogMu.Unlock()
  82. }
  83. }
  84. func (s *Submitter) appendToBacklog(request []byte) {
  85. s.backlogMu.Lock()
  86. s.backlog[s.backlogTail] = request
  87. s.backlogTail = (s.backlogTail + 1) % 60
  88. // if we've run into the head, evict it
  89. if s.backlogHead == s.backlogTail {
  90. s.backlogHead = (s.backlogHead + 1) % 60
  91. }
  92. s.backlogMu.Unlock()
  93. }
  94. func (s *Submitter) submit(request []byte) error {
  95. conn, err := net.DialTimeout(s.DestinationNetwork, s.DestinationAddress,
  96. 5*time.Second)
  97. if err != nil {
  98. return err
  99. }
  100. conn.SetDeadline(time.Now().Add(5 * time.Second))
  101. _, err = conn.Write(request)
  102. conn.Close()
  103. return err
  104. }
  105. // Start creates the goroutines that receive, serialize, and send metrics.
  106. func (s *Submitter) Start() {
  107. go func() {
  108. for {
  109. select {
  110. case metrics, ok := <-s.metricChan:
  111. if !ok {
  112. // We can no longer make progress.
  113. return
  114. }
  115. request := s.serializer(metrics)
  116. s.appendToBacklog(request)
  117. case <-s.shutdownChan:
  118. return
  119. }
  120. }
  121. }()
  122. go func() {
  123. for {
  124. select {
  125. case <-s.shutdownChan:
  126. return
  127. default:
  128. s.retryBacklog()
  129. tts := s.metricSystem.interval.Nanoseconds() -
  130. (time.Now().UnixNano() % s.metricSystem.interval.Nanoseconds())
  131. time.Sleep(time.Duration(tts))
  132. }
  133. }
  134. }()
  135. }
  136. // Shutdown shuts down a submitter
  137. func (s *Submitter) Shutdown() {
  138. select {
  139. case <-s.shutdownChan:
  140. // already closed
  141. default:
  142. close(s.shutdownChan)
  143. }
  144. }