123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- // Copyright 2014 The Cockroach Authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- // implied. See the License for the specific language governing
- // permissions and limitations under the License. See the AUTHORS file
- // for names of contributors.
- //
- // Author: Tyler Neely (t@jujit.su)
- package loghisto
- import (
- "net"
- "sync"
- "time"
- )
- type requestable interface{}
- type requestableArray interface {
- ToRequest() []byte
- }
- // Submitter encapsulates the state of a metric submitter.
- type Submitter struct {
- // backlog works as an evicting queue
- backlog [60][]byte
- backlogHead uint
- backlogTail uint
- backlogMu sync.Mutex
- serializer func(*ProcessedMetricSet) []byte
- DestinationNetwork string
- DestinationAddress string
- metricSystem *MetricSystem
- metricChan chan *ProcessedMetricSet
- shutdownChan chan struct{}
- }
- // NewSubmitter creates a Submitter that receives metrics off of a
- // specified metric channel, serializes them using the provided
- // serialization function, and attempts to send them to the
- // specified destination.
- func NewSubmitter(metricSystem *MetricSystem,
- serializer func(*ProcessedMetricSet) []byte, destinationNetwork string,
- destinationAddress string) *Submitter {
- metricChan := make(chan *ProcessedMetricSet, 60)
- metricSystem.SubscribeToProcessedMetrics(metricChan)
- return &Submitter{
- backlog: [60][]byte{},
- backlogHead: 0,
- backlogTail: 0,
- serializer: serializer,
- DestinationNetwork: destinationNetwork,
- DestinationAddress: destinationAddress,
- metricSystem: metricSystem,
- metricChan: metricChan,
- shutdownChan: make(chan struct{}),
- }
- }
- func (s *Submitter) retryBacklog() error {
- var request []byte
- for {
- s.backlogMu.Lock()
- head := s.backlogHead
- tail := s.backlogTail
- if head != tail {
- request = s.backlog[head]
- }
- s.backlogMu.Unlock()
- if head == tail {
- return nil
- }
- err := s.submit(request)
- if err != nil {
- return err
- }
- s.backlogMu.Lock()
- s.backlogHead = (s.backlogHead + 1) % 60
- s.backlogMu.Unlock()
- }
- }
- func (s *Submitter) appendToBacklog(request []byte) {
- s.backlogMu.Lock()
- s.backlog[s.backlogTail] = request
- s.backlogTail = (s.backlogTail + 1) % 60
- // if we've run into the head, evict it
- if s.backlogHead == s.backlogTail {
- s.backlogHead = (s.backlogHead + 1) % 60
- }
- s.backlogMu.Unlock()
- }
- func (s *Submitter) submit(request []byte) error {
- conn, err := net.DialTimeout(s.DestinationNetwork, s.DestinationAddress,
- 5*time.Second)
- if err != nil {
- return err
- }
- conn.SetDeadline(time.Now().Add(5 * time.Second))
- _, err = conn.Write(request)
- conn.Close()
- return err
- }
- // Start creates the goroutines that receive, serialize, and send metrics.
- func (s *Submitter) Start() {
- go func() {
- for {
- select {
- case metrics, ok := <-s.metricChan:
- if !ok {
- // We can no longer make progress.
- return
- }
- request := s.serializer(metrics)
- s.appendToBacklog(request)
- case <-s.shutdownChan:
- return
- }
- }
- }()
- go func() {
- for {
- select {
- case <-s.shutdownChan:
- return
- default:
- s.retryBacklog()
- tts := s.metricSystem.interval.Nanoseconds() -
- (time.Now().UnixNano() % s.metricSystem.interval.Nanoseconds())
- time.Sleep(time.Duration(tts))
- }
- }
- }()
- }
- // Shutdown shuts down a submitter
- func (s *Submitter) Shutdown() {
- select {
- case <-s.shutdownChan:
- // already closed
- default:
- close(s.shutdownChan)
- }
- }
|