|
|
@@ -151,29 +151,33 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
|
|
|
} else {
|
|
|
s, err := p.snapshotter.Load()
|
|
|
if err != nil && err != snap.ErrNoSnapshot {
|
|
|
- log.Printf("id=%x participant.snapload err=%s\n", p.id, err)
|
|
|
+ log.Printf("participant.snapload err=%s\n", err)
|
|
|
return nil, err
|
|
|
}
|
|
|
var snapIndex int64
|
|
|
if s != nil {
|
|
|
if err := p.Recovery(s.Data); err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Printf("store.recover err=%v", err)
|
|
|
+ return nil, err
|
|
|
}
|
|
|
- log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index)
|
|
|
+ log.Printf("participant.store.recovered index=%d\n", s.Index)
|
|
|
|
|
|
for _, node := range s.Nodes {
|
|
|
pp := path.Join(v2machineKVPrefix, fmt.Sprint(node))
|
|
|
ev, err := p.Store.Get(pp, false, false)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Printf("store.get err=%v", err)
|
|
|
+ return nil, err
|
|
|
}
|
|
|
q, err := url.ParseQuery(*ev.Node.Value)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Printf("url.parse err=%v", err)
|
|
|
+ return nil, err
|
|
|
}
|
|
|
peer, err := p.peerHub.add(node, q["raft"][0])
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Printf("peerHub.add err=%v", err)
|
|
|
+ return nil, err
|
|
|
}
|
|
|
peer.participate()
|
|
|
}
|
|
|
@@ -209,7 +213,7 @@ func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDura
|
|
|
return p, nil
|
|
|
}
|
|
|
|
|
|
-func (p *participant) run(stop chan struct{}) {
|
|
|
+func (p *participant) run(stop chan struct{}) error {
|
|
|
defer p.cleanup()
|
|
|
|
|
|
if p.node.IsEmpty() {
|
|
|
@@ -223,7 +227,8 @@ func (p *participant) run(stop chan struct{}) {
|
|
|
} else {
|
|
|
log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
|
|
|
if err := p.join(); err != nil {
|
|
|
- log.Fatalf("id=%x participant.join err=%q", p.id, err)
|
|
|
+ log.Printf("id=%x participant.join err=%q", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -266,45 +271,45 @@ func (p *participant) run(stop chan struct{}) {
|
|
|
node.Sync()
|
|
|
case <-stop:
|
|
|
log.Printf("id=%x participant.stop\n", p.id)
|
|
|
- return
|
|
|
+ return nil
|
|
|
}
|
|
|
if s := node.UnstableSnapshot(); !s.IsEmpty() {
|
|
|
if err := p.Recovery(s.Data); err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Printf("id=%x participant.recover err=%q", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
- log.Printf("id=%x recovered index=%d\n", p.id, s.Index)
|
|
|
+ log.Printf("id=%x participant.recovered index=%d", p.id, s.Index)
|
|
|
}
|
|
|
p.apply(node.Next())
|
|
|
- ents := node.UnstableEnts()
|
|
|
- p.save(ents, node.UnstableState())
|
|
|
+ if err := p.save(node.UnstableEnts(), node.UnstableState()); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
p.send(node.Msgs())
|
|
|
if node.IsRemoved() {
|
|
|
log.Printf("id=%x participant.end\n", p.id)
|
|
|
- return
|
|
|
+ return nil
|
|
|
}
|
|
|
if p.node.EntsLen() > defaultCompact {
|
|
|
d, err := p.Save()
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Printf("id=%x participant.compact err=%q", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
p.node.Compact(d)
|
|
|
snap := p.node.GetSnap()
|
|
|
log.Printf("id=%x compacted index=%d", p.id, snap.Index)
|
|
|
if err := p.snapshotter.Save(&snap); err != nil {
|
|
|
- log.Printf("id=%d snapshot err=%v", p.id, err)
|
|
|
- // todo(xiangli): consume the error?
|
|
|
- panic(err)
|
|
|
+ log.Printf("id=%x snapshot.save err=%v", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
if err := p.w.Cut(p.node.Index()); err != nil {
|
|
|
- log.Printf("id=%d wal.cut err=%v", p.id, err)
|
|
|
- // todo(xiangli): consume the error?
|
|
|
- panic(err)
|
|
|
+ log.Printf("id=%x wal.cut err=%v", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
info := p.node.Info()
|
|
|
if err = p.w.SaveInfo(&info); err != nil {
|
|
|
- log.Printf("id=%d wal.saveInfo err=%v", p.id, err)
|
|
|
- // todo(xiangli): consume the error?
|
|
|
- panic(err)
|
|
|
+ log.Printf("id=%x wal.saveInfo err=%v", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -459,21 +464,24 @@ func (p *participant) apply(ents []raft.Entry) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *participant) save(ents []raft.Entry, state raft.State) {
|
|
|
+func (p *participant) save(ents []raft.Entry, state raft.State) error {
|
|
|
for _, ent := range ents {
|
|
|
if err := p.w.SaveEntry(&ent); err != nil {
|
|
|
- log.Panicf("id=%x participant.save saveEntryErr=%q", p.id, err)
|
|
|
+ log.Printf("id=%x participant.save saveEntryErr=%q", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
}
|
|
|
if !state.IsEmpty() {
|
|
|
if err := p.w.SaveState(&state); err != nil {
|
|
|
- log.Panicf("id=%x participant.save saveStateErr=%q", p.id, err)
|
|
|
+ log.Printf("id=%x participant.save saveStateErr=%q", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
}
|
|
|
if err := p.w.Sync(); err != nil {
|
|
|
- log.Panicf("id=%x participant.save syncErr=%q", p.id, err)
|
|
|
+ log.Printf("id=%x participant.save syncErr=%q", p.id, err)
|
|
|
+ return err
|
|
|
}
|
|
|
-
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
func (p *participant) send(msgs []raft.Message) {
|