Browse Source

server: add first level logging

Yicheng Qin 11 years ago
parent
commit
e7bc7becf3
12 changed files with 49 additions and 51 deletions
  1. 3 4
      etcd/discovery.go
  2. 12 2
      etcd/etcd.go
  3. 24 21
      etcd/participant.go
  4. 1 1
      etcd/peer.go
  5. 2 2
      etcd/raft_handler.go
  6. 3 2
      etcd/standby.go
  7. 2 2
      etcd/v2_apply.go
  8. 0 3
      etcd/v2_client.go
  9. 2 4
      etcd/v2_http.go
  10. 0 3
      etcd/v2_http_delete.go
  11. 0 2
      etcd/v2_http_post.go
  12. 0 5
      etcd/v2_http_put.go

+ 3 - 4
etcd/discovery.go

@@ -53,7 +53,6 @@ func newDiscoverer(u *url.URL, name, raftPubAddr string) *discoverer {
 	u.Path = ""
 
 	// Connect to a scheme://host not a full URL with path
-	log.Printf("Discovery via %s using prefix %s.\n", u.String(), d.prefix)
 	d.client = etcd.NewClient([]string{u.String()})
 
 	if !strings.HasPrefix(oldPath, "/v2/keys") {
@@ -63,6 +62,8 @@ func newDiscoverer(u *url.URL, name, raftPubAddr string) *discoverer {
 }
 
 func (d *discoverer) discover() ([]string, error) {
+	log.Printf("discoverer name=%s target=\"%q\" prefix=%s\n", d.name, d.client.GetCluster(), d.prefix)
+
 	if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil {
 		return nil, fmt.Errorf("discovery service error: %v", err)
 	}
@@ -79,7 +80,6 @@ func (d *discoverer) discover() ([]string, error) {
 	// If we got a response then the CAS was successful, we are leader
 	if resp != nil && resp.Node.Value == startedState {
 		// We are the leader, we have no peers
-		log.Println("Discovery _state was empty, so this machine is the initial leader.")
 		return nil, nil
 	}
 
@@ -111,7 +111,6 @@ func (d *discoverer) findPeers() (peers []string, err error) {
 		return nil, errors.New("Discovery found an initialized cluster but no reachable peers are registered.")
 	}
 
-	log.Printf("Discovery found peers %v\n", peers)
 	return
 }
 
@@ -122,7 +121,7 @@ func (d *discoverer) heartbeat(stopc <-chan struct{}) {
 	defer ticker.Stop()
 	for {
 		if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil {
-			log.Println("Discovery heartbeat failed: %v", err)
+			log.Println("discoverer heartbeatErr=\"%v\"", err)
 		}
 
 		select {

+ 12 - 2
etcd/etcd.go

@@ -51,11 +51,12 @@ type Server struct {
 	stopped bool
 	mu      sync.Mutex
 	stopc   chan struct{}
+	log     *log.Logger
 }
 
 func New(c *config.Config) *Server {
 	if err := c.Sanitize(); err != nil {
-		log.Fatalf("failed sanitizing configuration: %v", err)
+		log.Fatalf("server.new sanitizeErr=\"%v\"\n", err)
 	}
 
 	tc := &tls.Config{
@@ -65,7 +66,7 @@ func New(c *config.Config) *Server {
 	if c.PeerTLSInfo().Scheme() == "https" {
 		tc, err = c.PeerTLSInfo().ClientConfig()
 		if err != nil {
-			log.Fatal("failed to create raft transporter tls:", err)
+			log.Fatalf("server.new ClientConfigErr=\"%v\"\n", err)
 		}
 	}
 
@@ -87,12 +88,14 @@ func New(c *config.Config) *Server {
 
 		stopc: make(chan struct{}),
 	}
+	log.Printf("server.new id=%x raftPubAddr=%s\n", s.id, s.raftPubAddr)
 
 	return s
 }
 
 func (s *Server) SetTick(tick time.Duration) {
 	s.tickDuration = tick
+	log.Printf("server.setTick id=%x tick=%q\n", s.id, s.tickDuration)
 }
 
 // Stop stops the server elegently.
@@ -112,6 +115,7 @@ func (s *Server) Stop() {
 	<-s.stopc
 	s.client.CloseConnections()
 	s.peerHub.stop()
+	log.Printf("server.stop id=%x\n", s.id)
 }
 
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -151,8 +155,10 @@ func (s *Server) Run() error {
 		if seeds, err = d.discover(); err != nil {
 			return err
 		}
+		log.Printf("server.run id=%x source=-discovery seeds=\"%v\"\n", s.id, seeds)
 	} else {
 		seeds = s.config.Peers
+		log.Printf("server.run id=%x source=-peers seeds=\"%v\"\n", s.id, seeds)
 	}
 	s.peerHub.setSeeds(seeds)
 
@@ -170,6 +176,7 @@ func (s *Server) Run() error {
 				go d.heartbeat(dStopc)
 			}
 			s.mode.Set(participantMode)
+			log.Printf("server.run id=%x mode=participantMode\n", s.id)
 			s.mu.Unlock()
 			next = s.p.run()
 			if d != nil {
@@ -178,10 +185,12 @@ func (s *Server) Run() error {
 		case standbyMode:
 			s.s = newStandby(s.client, s.peerHub)
 			s.mode.Set(standbyMode)
+			log.Printf("server.run id=%x mode=standbyMode\n", s.id)
 			s.mu.Unlock()
 			next = s.s.run()
 		case stopMode:
 			s.mode.Set(stopMode)
+			log.Printf("server.run id=%x mode=stopMode\n", s.id)
 			s.mu.Unlock()
 			s.stopc <- struct{}{}
 			return nil
@@ -194,5 +203,6 @@ func (s *Server) Run() error {
 
 // setId sets the id for the participant. This should only be used for testing.
 func (s *Server) setId(id int64) {
+	log.Printf("server.setId id=%x oldId=%x\n", id, s.id)
 	s.id = id
 }

+ 24 - 21
etcd/participant.go

@@ -121,13 +121,13 @@ func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2clie
 func (p *participant) run() int64 {
 	seeds := p.peerHub.getSeeds()
 	if len(seeds) == 0 {
-		log.Println("starting a bootstrap node")
+		log.Printf("participant.run id=%x action=bootstrap\n", p.id)
 		p.node.Campaign()
 		p.node.InitCluster(genId())
 		p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
 		p.apply(p.node.Next())
 	} else {
-		log.Println("joining cluster via peers", seeds)
+		log.Printf("participant.run id=%x action=join seeds=\"%v\"\n", p.id, seeds)
 		p.join()
 	}
 
@@ -167,14 +167,14 @@ func (p *participant) run() int64 {
 		case <-v2SyncTicker.C:
 			node.Sync()
 		case <-p.stopc:
-			log.Printf("Participant %x stopped\n", p.id)
+			log.Printf("participant.stop id=%x\n", p.id)
 			return stopMode
 		}
 		p.apply(node.Next())
 		p.send(node.Msgs())
 		if node.IsRemoved() {
-			log.Printf("Participant %x return\n", p.id)
 			p.stop()
+			log.Printf("participant.end id=%x\n", p.id)
 			return standbyMode
 		}
 	}
@@ -195,6 +195,7 @@ func (p *participant) raftHandler() http.Handler {
 }
 
 func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
+	log.Printf("participant.add id=%x nodeId=%x raftPubAddr=%s pubAddr=%s\n", p.id, id, raftPubAddr, pubAddr)
 	pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
 
 	_, err := p.Get(pp, false, false)
@@ -202,12 +203,13 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
 		return nil
 	}
 	if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
+		log.Printf("participant.add id=%x getErr=\"%v\"\n", p.id, err)
 		return err
 	}
 
 	w, err := p.Watch(pp, true, false, 0)
 	if err != nil {
-		log.Println("add error:", err)
+		log.Printf("participant.add id=%x watchErr=\"%v\"\n", p.id, err)
 		return tmpErr
 	}
 
@@ -215,7 +217,7 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
 	case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
 	default:
 		w.Remove()
-		log.Println("unable to send out addNode proposal")
+		log.Printf("participant.add id=%x proposeErr=\"unable to send out addNode proposal\"\n", p.id)
 		return tmpErr
 	}
 
@@ -224,11 +226,11 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
 		if v.Action == store.Set {
 			return nil
 		}
-		log.Println("add error: action =", v.Action)
+		log.Printf("participant.add id=%x watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
 		return tmpErr
 	case <-time.After(6 * defaultHeartbeat * p.tickDuration):
 		w.Remove()
-		log.Println("add error: wait timeout")
+		log.Printf("participant.add id=%x watchErr=timeout\n", p.id)
 		return tmpErr
 	case <-p.stopc:
 		return stopErr
@@ -236,6 +238,7 @@ func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
 }
 
 func (p *participant) remove(id int64) error {
+	log.Printf("participant.remove id=%x nodeId=%x\n", p.id, id)
 	pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
 
 	v, err := p.Get(pp, false, false)
@@ -246,7 +249,7 @@ func (p *participant) remove(id int64) error {
 	select {
 	case p.removeNodeC <- raft.Config{NodeId: id}:
 	default:
-		log.Println("unable to send out removeNode proposal")
+		log.Printf("participant.remove id=%x proposeErr=\"unable to send out removeNode proposal\"\n", p.id)
 		return tmpErr
 	}
 
@@ -254,7 +257,7 @@ func (p *participant) remove(id int64) error {
 	// removal target is self
 	w, err := p.Watch(pp, true, false, v.Index()+1)
 	if err != nil {
-		log.Println("remove error:", err)
+		log.Printf("participant.remove id=%x watchErr=\"%v\"\n", p.id, err)
 		return tmpErr
 	}
 
@@ -263,11 +266,11 @@ func (p *participant) remove(id int64) error {
 		if v.Action == store.Delete {
 			return nil
 		}
-		log.Println("remove error: action =", v.Action)
+		log.Printf("participant.remove id=%x watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
 		return tmpErr
 	case <-time.After(6 * defaultHeartbeat * p.tickDuration):
 		w.Remove()
-		log.Println("remove error: wait timeout")
+		log.Printf("participant.remove id=%x watchErr=timeout\n", p.id)
 		return tmpErr
 	case <-p.stopc:
 		return stopErr
@@ -286,35 +289,36 @@ func (p *participant) apply(ents []raft.Entry) {
 			p.v2apply(offset+int64(i), ent)
 		case raft.ClusterInit:
 			p.clusterId = p.node.ClusterId()
+			log.Printf("participant.cluster.setId id=%x clusterId=%x\n", p.id, p.clusterId)
 		case raft.AddNode:
 			cfg := new(raft.Config)
 			if err := json.Unmarshal(ent.Data, cfg); err != nil {
-				log.Println(err)
+				log.Printf("participant.cluster.addNode id=%x UnmarshalErr=\"%v\"\n", p.id, err)
 				break
 			}
 			peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
 			if err != nil {
-				log.Println(err)
+				log.Printf("participant.cluster.addNode id=%x peerAddErr=\"%v\"\n", p.id, err)
 				break
 			}
 			peer.participate()
-			log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
 			pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
+			log.Printf("participant.cluster.addNode id=%x nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context)
 		case raft.RemoveNode:
 			cfg := new(raft.Config)
 			if err := json.Unmarshal(ent.Data, cfg); err != nil {
-				log.Println(err)
+				log.Printf("participant.cluster.removeNode id=%x UnmarshalErr=\"%v\"\n", p.id, err)
 				break
 			}
-			log.Printf("Remove Node %x\n", cfg.NodeId)
 			peer, err := p.peerHub.peer(cfg.NodeId)
 			if err != nil {
-				log.Fatal("cannot get the added peer:", err)
+				log.Fatal("participant.apply getPeerErr=\"%v\"", err)
 			}
 			peer.idle()
 			pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
 			p.Store.Delete(pp, false, false)
+			log.Printf("participant.cluster.removeNode id=%x nodeId=%x\n", p.id, cfg.NodeId)
 		default:
 			panic("unimplemented")
 		}
@@ -324,7 +328,7 @@ func (p *participant) apply(ents []raft.Entry) {
 func (p *participant) send(msgs []raft.Message) {
 	for i := range msgs {
 		if err := p.peerHub.send(msgs[i]); err != nil {
-			log.Println("send:", err)
+			log.Printf("participant.send id=%x err=\"%v\"\n", p.id, err)
 		}
 	}
 }
@@ -342,12 +346,11 @@ func (p *participant) join() {
 			if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
 				return
 			} else {
-				log.Println(err)
+				log.Printf("participant.join id=%x addMachineErr=\"%v\"\n", p.id, err)
 			}
 		}
 		time.Sleep(100 * time.Millisecond)
 	}
-	log.Println("fail to join the cluster")
 }
 
 func genId() int64 {

+ 1 - 1
etcd/peer.go

@@ -122,7 +122,7 @@ func (p *peer) post(d []byte) {
 	buf := bytes.NewBuffer(d)
 	resp, err := p.c.Post(p.url, "application/octet-stream", buf)
 	if err != nil {
-		log.Println("post:", err)
+		log.Println("peer.post url=%s err=\"%v\"", p.url, err)
 		return
 	}
 	resp.Body.Close()

+ 2 - 2
etcd/raft_handler.go

@@ -74,14 +74,14 @@ func (h *raftHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
 
 	msg := new(raft.Message)
 	if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
-		log.Println(err)
+		log.Printf("raftHandler.serve decodeErr=\"%v\"\n", err)
 		return
 	}
 
 	select {
 	case h.recv <- msg:
 	default:
-		log.Println("drop")
+		log.Printf("raftHandler.serve pushErr=\"recv channel is full\"\n")
 		// drop the incoming package at network layer if the upper layer
 		// cannot consume them in time.
 		// TODO(xiangli): not return 200.

+ 3 - 2
etcd/standby.go

@@ -69,12 +69,12 @@ func (s *standby) run() int64 {
 		select {
 		case <-time.After(syncDuration):
 		case <-s.stopc:
-			log.Printf("Standby stopped\n")
+			log.Printf("standby.stop\n")
 			return stopMode
 		}
 
 		if update, err := s.syncCluster(nodes); err != nil {
-			log.Println("standby sync:", err)
+			log.Println("standby.run syncErr=\"%v\"", err)
 			continue
 		} else {
 			nodes = update
@@ -83,6 +83,7 @@ func (s *standby) run() int64 {
 		if s.clusterConf.ActiveSize <= len(nodes) {
 			continue
 		}
+		log.Printf("standby.end\n")
 		return participantMode
 	}
 }

+ 2 - 2
etcd/v2_apply.go

@@ -32,7 +32,7 @@ func (p *participant) v2apply(index int64, ent raft.Entry) {
 
 	cmd := new(cmd)
 	if err := json.Unmarshal(ent.Data, cmd); err != nil {
-		log.Println("v2apply.decode:", err)
+		log.Printf("participant.store.apply id=%x decodeErr=\"%v\"\n", p.id, err)
 		return
 	}
 
@@ -53,7 +53,7 @@ func (p *participant) v2apply(index int64, ent raft.Entry) {
 		p.Store.DeleteExpiredKeys(cmd.Time)
 		return
 	default:
-		log.Println("unexpected command type:", cmd.Type)
+		log.Printf("participant.store.apply id=%x err=\"unexpected command type %s\"\n", p.id, cmd.Type)
 	}
 
 	if ent.Term > p.node.term {

+ 0 - 3
etcd/v2_client.go

@@ -24,7 +24,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"net/http"
 	"strconv"
 	"strings"
@@ -155,7 +154,6 @@ func (c *v2client) AddMachine(url string, name string, info *context) *etcdErr.E
 	b, _ := json.Marshal(info)
 	url = url + "/v2/admin/machines/" + name
 
-	log.Printf("Send Join Request to %s", url)
 	resp, err := c.put(url, b)
 	if err != nil {
 		return clientError(err)
@@ -182,7 +180,6 @@ func (c *v2client) readErrorBody(body io.ReadCloser) *etcdErr.Error {
 
 func (c *v2client) readJSONBody(body io.ReadCloser, val interface{}) *etcdErr.Error {
 	if err := json.NewDecoder(body).Decode(val); err != nil {
-		log.Printf("Error parsing join response: %v", err)
 		return clientError(err)
 	}
 	c.readBody(body)

+ 2 - 4
etcd/v2_http.go

@@ -98,7 +98,7 @@ func (eh handlerErr) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	log.Println("http error", err)
+	log.Printf("HTTP.serve: req=%s err=\"%v\"\n", r.URL, err)
 	http.Error(w, "Internal Server Error", http.StatusInternalServerError)
 }
 
@@ -118,7 +118,6 @@ func (w *HEADResponseWriter) Write([]byte) (int, error) {
 func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64) error {
 	e, err := p.Store.Get(fmt.Sprintf("%v/%d", v2machineKVPrefix, p.node.Leader()), false, false)
 	if err != nil {
-		log.Println("redirect cannot find node", id)
 		return fmt.Errorf("redirect cannot find node %d", id)
 	}
 
@@ -129,7 +128,6 @@ func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64)
 
 	redirectAddr, err := buildRedirectURL(m["etcd"][0], r.URL)
 	if err != nil {
-		log.Println("redirect cannot build new url:", err)
 		return err
 	}
 
@@ -140,7 +138,7 @@ func (p *participant) redirect(w http.ResponseWriter, r *http.Request, id int64)
 func buildRedirectURL(redirectAddr string, originalURL *url.URL) (string, error) {
 	redirectURL, err := url.Parse(redirectAddr)
 	if err != nil {
-		return "", fmt.Errorf("redirect cannot parse url: %v", err)
+		return "", fmt.Errorf("cannot parse url: %v", err)
 	}
 
 	redirectURL.Path = originalURL.Path

+ 0 - 3
etcd/v2_http_delete.go

@@ -17,7 +17,6 @@ limitations under the License.
 package etcd
 
 import (
-	"log"
 	"net/http"
 	"strconv"
 
@@ -70,7 +69,6 @@ func (p *participant) serveDelete(w http.ResponseWriter, req *http.Request, key
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("delete:", err)
 	return err
 }
 
@@ -80,6 +78,5 @@ func (p *participant) serveCAD(w http.ResponseWriter, req *http.Request, key str
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("cad:", err)
 	return err
 }

+ 0 - 2
etcd/v2_http_post.go

@@ -17,7 +17,6 @@ limitations under the License.
 package etcd
 
 import (
-	"log"
 	"net/http"
 
 	etcdErr "github.com/coreos/etcd/error"
@@ -43,6 +42,5 @@ func (p *participant) PostHandler(w http.ResponseWriter, req *http.Request) erro
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("unique:", err)
 	return err
 }

+ 0 - 5
etcd/v2_http_put.go

@@ -19,7 +19,6 @@ package etcd
 import (
 	"encoding/json"
 	"fmt"
-	"log"
 	"net/http"
 	"net/url"
 	"strconv"
@@ -115,7 +114,6 @@ func (p *participant) serveSet(w http.ResponseWriter, req *http.Request, key str
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("set:", err)
 	return err
 }
 
@@ -125,7 +123,6 @@ func (p *participant) serveCreate(w http.ResponseWriter, req *http.Request, key
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("create:", err)
 	return err
 }
 
@@ -139,7 +136,6 @@ func (p *participant) serveUpdate(w http.ResponseWriter, req *http.Request, key,
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("update:", err)
 	return err
 }
 
@@ -149,7 +145,6 @@ func (p *participant) serveCAS(w http.ResponseWriter, req *http.Request, key, va
 		p.handleRet(w, ret)
 		return nil
 	}
-	log.Println("update:", err)
 	return err
 }