Browse Source

Merge pull request #3783 from yichengq/merge-logger

rafthttp: use MergeLogger for rafthttp logging
Yicheng Qin 10 years ago
parent
commit
65d153db73
6 changed files with 201 additions and 14 deletions
  1. 192 0
      pkg/logutil/merge_logger.go
  2. 1 1
      rafthttp/peer.go
  3. 4 10
      rafthttp/peer_status.go
  4. 1 1
      rafthttp/remote.go
  5. 1 1
      rafthttp/stream.go
  6. 2 1
      rafthttp/transport.go

+ 192 - 0
pkg/logutil/merge_logger.go

@@ -0,0 +1,192 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package logutil includes utilities to faciliate logging.
+package logutil
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
+)
+
+var (
+	defaultMergePeriod = time.Second
+
+	outputInterval = time.Second
+)
+
+// line represents a log line that can be printed out
+// through capnslog.PackageLogger.
+type line struct {
+	level capnslog.LogLevel
+	str   string
+}
+
+func (l line) append(s string) line {
+	return line{
+		level: l.level,
+		str:   l.str + " " + s,
+	}
+}
+
+// status represents the merge status of a line.
+type status struct {
+	period time.Duration
+
+	start time.Time // start time of latest merge period
+	count int       // number of merged lines from starting
+}
+
+func (s *status) isInMergePeriod(now time.Time) bool {
+	return s.period == 0 || s.start.Add(s.period).After(now)
+}
+
+func (s *status) isEmpty() bool { return s.count == 0 }
+
+func (s *status) summary(now time.Time) string {
+	return fmt.Sprintf("[merged %d repeated lines in %s]", s.count, now.Sub(s.start))
+}
+
+func (s *status) reset(now time.Time) {
+	s.start = now
+	s.count = 0
+}
+
+// MergeLogger supports merge logging, which merges repeated log lines
+// and prints summary log lines instead.
+//
+// For merge logging, MergeLogger prints out the line when the line appears
+// at the first time. MergeLogger holds the same log line printed within
+// defaultMergePeriod, and prints out summary log line at the end of defaultMergePeriod.
+// It stops merging when the line doesn't appear within the
+// defaultMergePeriod.
+type MergeLogger struct {
+	*capnslog.PackageLogger
+
+	mu      sync.Mutex // protect statusm
+	statusm map[line]*status
+}
+
+func NewMergeLogger(logger *capnslog.PackageLogger) *MergeLogger {
+	l := &MergeLogger{
+		PackageLogger: logger,
+		statusm:       make(map[line]*status),
+	}
+	go l.outputLoop()
+	return l
+}
+
+func (l *MergeLogger) MergeInfo(entries ...interface{}) {
+	l.merge(line{
+		level: capnslog.INFO,
+		str:   fmt.Sprint(entries...),
+	})
+}
+
+func (l *MergeLogger) MergeInfof(format string, args ...interface{}) {
+	l.merge(line{
+		level: capnslog.INFO,
+		str:   fmt.Sprintf(format, args...),
+	})
+}
+
+func (l *MergeLogger) MergeNotice(entries ...interface{}) {
+	l.merge(line{
+		level: capnslog.NOTICE,
+		str:   fmt.Sprint(entries...),
+	})
+}
+
+func (l *MergeLogger) MergeNoticef(format string, args ...interface{}) {
+	l.merge(line{
+		level: capnslog.NOTICE,
+		str:   fmt.Sprintf(format, args...),
+	})
+}
+
+func (l *MergeLogger) MergeWarning(entries ...interface{}) {
+	l.merge(line{
+		level: capnslog.WARNING,
+		str:   fmt.Sprint(entries...),
+	})
+}
+
+func (l *MergeLogger) MergeWarningf(format string, args ...interface{}) {
+	l.merge(line{
+		level: capnslog.WARNING,
+		str:   fmt.Sprintf(format, args...),
+	})
+}
+
+func (l *MergeLogger) MergeError(entries ...interface{}) {
+	l.merge(line{
+		level: capnslog.ERROR,
+		str:   fmt.Sprint(entries...),
+	})
+}
+
+func (l *MergeLogger) MergeErrorf(format string, args ...interface{}) {
+	l.merge(line{
+		level: capnslog.ERROR,
+		str:   fmt.Sprintf(format, args...),
+	})
+}
+
+func (l *MergeLogger) merge(ln line) {
+	l.mu.Lock()
+
+	// increase count if the logger is merging the line
+	if status, ok := l.statusm[ln]; ok {
+		status.count++
+		l.mu.Unlock()
+		return
+	}
+
+	// initialize status of the line
+	l.statusm[ln] = &status{
+		period: defaultMergePeriod,
+		start:  time.Now(),
+	}
+	// release the lock before IO operation
+	l.mu.Unlock()
+	// print out the line at its first time
+	l.PackageLogger.Logf(ln.level, ln.str)
+}
+
+func (l *MergeLogger) outputLoop() {
+	for now := range time.Tick(outputInterval) {
+		var outputs []line
+
+		l.mu.Lock()
+		for ln, status := range l.statusm {
+			if status.isInMergePeriod(now) {
+				continue
+			}
+			if status.isEmpty() {
+				delete(l.statusm, ln)
+				continue
+			}
+			outputs = append(outputs, ln.append(status.summary(now)))
+			status.reset(now)
+		}
+		l.mu.Unlock()
+
+		for _, o := range outputs {
+			l.PackageLogger.Logf(o.level, o.str)
+		}
+	}
+}

+ 1 - 1
rafthttp/peer.go

@@ -171,7 +171,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
 						p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 					}
 					if status.isActive() {
-						plog.Warningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+						plog.MergeWarningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
 					} else {
 						plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
 					}

+ 4 - 10
rafthttp/peer_status.go

@@ -31,14 +31,12 @@ type peerStatus struct {
 	id          types.ID
 	mu          sync.Mutex // protect variables below
 	active      bool
-	failureMap  map[failureType]string
 	activeSince time.Time
 }
 
 func newPeerStatus(id types.ID) *peerStatus {
 	return &peerStatus{
-		id:         id,
-		failureMap: make(map[failureType]string),
+		id: id,
 	}
 }
 
@@ -49,25 +47,21 @@ func (s *peerStatus) activate() {
 		plog.Infof("the connection with %s became active", s.id)
 		s.active = true
 		s.activeSince = time.Now()
-		s.failureMap = make(map[failureType]string)
 	}
 }
 
 func (s *peerStatus) deactivate(failure failureType, reason string) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
+	msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
 	if s.active {
+		plog.Errorf(msg)
 		plog.Infof("the connection with %s became inactive", s.id)
 		s.active = false
 		s.activeSince = time.Time{}
-	}
-	logline := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
-	if r, ok := s.failureMap[failure]; ok && r == reason {
-		plog.Debugf(logline)
 		return
 	}
-	s.failureMap[failure] = reason
-	plog.Errorf(logline)
+	plog.Debugf(msg)
 }
 
 func (s *peerStatus) isActive() bool {

+ 1 - 1
rafthttp/remote.go

@@ -42,7 +42,7 @@ func (g *remote) send(m raftpb.Message) {
 	case g.pipeline.msgc <- m:
 	default:
 		if g.status.isActive() {
-			plog.Warningf("dropped %s to %s since sending buffer is full", m.Type, g.id)
+			plog.MergeWarningf("dropped %s to %s since sending buffer is full", m.Type, g.id)
 		} else {
 			plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
 		}

+ 1 - 1
rafthttp/stream.go

@@ -331,7 +331,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 		case recvc <- m:
 		default:
 			if cr.status.isActive() {
-				plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
+				plog.MergeWarningf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
 			} else {
 				plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
 			}

+ 2 - 1
rafthttp/transport.go

@@ -24,13 +24,14 @@ import (
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing"
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/logutil"
 	"github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft/raftpb"
 )
 
-var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")
+var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
 
 type Raft interface {
 	Process(ctx context.Context, m raftpb.Message) error