multinode.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package raft
  2. import (
  3. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  4. pb "github.com/coreos/etcd/raft/raftpb"
  5. )
  6. // MultiNode represents a node that is participating in multiple consensus groups.
  7. // A MultiNode is more efficient than a collection of Nodes.
  8. // The methods of this interface correspond to the methods of Node and are described
  9. // more fully there.
  10. type MultiNode interface {
  11. // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup
  12. // on each particpating node with the same group ID; it may create groups on demand as it
  13. // receives messages. If the given storage contains existing log entries the list of peers
  14. // may be empty.
  15. CreateGroup(group uint64, peers []Peer, storage Storage) error
  16. // RemoveGroup removes a group from the MultiNode.
  17. RemoveGroup(group uint64) error
  18. // Tick advances the internal logical clock by a single tick.
  19. Tick()
  20. // Campaign causes this MultiNode to transition to candidate state in the given group.
  21. Campaign(ctx context.Context, group uint64) error
  22. // Propose proposes that data be appended to the given group's log.
  23. Propose(ctx context.Context, group uint64, data []byte) error
  24. // ProposeConfChange proposes a config change.
  25. ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error
  26. // ApplyConfChange applies a config change to the local node.
  27. ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState
  28. // Step advances the state machine using the given message.
  29. Step(ctx context.Context, group uint64, msg pb.Message) error
  30. // Ready returns a channel that returns the current point-in-time state of any ready
  31. // groups. Only groups with something to report will appear in the map.
  32. Ready() <-chan map[uint64]Ready
  33. // Advance notifies the node that the application has applied and saved progress in the
  34. // last Ready results. It must be called with the last value returned from the Ready()
  35. // channel.
  36. Advance(map[uint64]Ready)
  37. // Status returns the current status of the given group.
  38. Status(group uint64) Status
  39. // Report reports the given node is not reachable for the last send.
  40. ReportUnreachable(id, groupID uint64)
  41. // ReportSnapshot reports the stutus of the sent snapshot.
  42. ReportSnapshot(id, groupID uint64, status SnapshotStatus)
  43. // Stop performs any necessary termination of the MultiNode.
  44. Stop()
  45. }
  46. // StartMultiNode creates a MultiNode and starts its background goroutine.
  47. // The id identifies this node and will be used as its node ID in all groups.
  48. // The election and heartbeat timers are in units of ticks.
  49. func StartMultiNode(id uint64, election, heartbeat int) MultiNode {
  50. mn := newMultiNode(id, election, heartbeat)
  51. go mn.run()
  52. return &mn
  53. }
  54. // TODO(bdarnell): add group ID to the underlying protos?
  55. type multiMessage struct {
  56. group uint64
  57. msg pb.Message
  58. }
  59. type multiConfChange struct {
  60. group uint64
  61. msg pb.ConfChange
  62. ch chan pb.ConfState
  63. }
  64. type multiStatus struct {
  65. group uint64
  66. ch chan Status
  67. }
  68. type groupCreation struct {
  69. id uint64
  70. peers []Peer
  71. storage Storage
  72. // TODO(bdarnell): do we really need the done channel here? It's
  73. // unlike the rest of this package, but we need the group creation
  74. // to be complete before any Propose or other calls.
  75. done chan struct{}
  76. }
  77. type groupRemoval struct {
  78. id uint64
  79. // TODO(bdarnell): see comment on groupCreation.done
  80. done chan struct{}
  81. }
  82. type multiNode struct {
  83. id uint64
  84. election int
  85. heartbeat int
  86. groupc chan groupCreation
  87. rmgroupc chan groupRemoval
  88. propc chan multiMessage
  89. recvc chan multiMessage
  90. confc chan multiConfChange
  91. readyc chan map[uint64]Ready
  92. advancec chan map[uint64]Ready
  93. tickc chan struct{}
  94. stop chan struct{}
  95. done chan struct{}
  96. status chan multiStatus
  97. }
  98. func newMultiNode(id uint64, election, heartbeat int) multiNode {
  99. return multiNode{
  100. id: id,
  101. election: election,
  102. heartbeat: heartbeat,
  103. groupc: make(chan groupCreation),
  104. rmgroupc: make(chan groupRemoval),
  105. propc: make(chan multiMessage),
  106. recvc: make(chan multiMessage),
  107. confc: make(chan multiConfChange),
  108. readyc: make(chan map[uint64]Ready),
  109. advancec: make(chan map[uint64]Ready),
  110. tickc: make(chan struct{}),
  111. stop: make(chan struct{}),
  112. done: make(chan struct{}),
  113. status: make(chan multiStatus),
  114. }
  115. }
  116. type groupState struct {
  117. id uint64
  118. raft *raft
  119. prevSoftSt *SoftState
  120. prevHardSt pb.HardState
  121. prevSnapi uint64
  122. }
  123. func (g *groupState) newReady() Ready {
  124. return newReady(g.raft, g.prevSoftSt, g.prevHardSt)
  125. }
  126. func (g *groupState) commitReady(rd Ready) {
  127. if rd.SoftState != nil {
  128. g.prevSoftSt = rd.SoftState
  129. }
  130. if !IsEmptyHardState(rd.HardState) {
  131. g.prevHardSt = rd.HardState
  132. }
  133. if g.prevHardSt.Commit != 0 {
  134. // In most cases, prevHardSt and rd.HardState will be the same
  135. // because when there are new entries to apply we just sent a
  136. // HardState with an updated Commit value. However, on initial
  137. // startup the two are different because we don't send a HardState
  138. // until something changes, but we do send any un-applied but
  139. // committed entries (and previously-committed entries may be
  140. // incorporated into the snapshot, even if rd.CommittedEntries is
  141. // empty). Therefore we mark all committed entries as applied
  142. // whether they were included in rd.HardState or not.
  143. g.raft.raftLog.appliedTo(g.prevHardSt.Commit)
  144. }
  145. if len(rd.Entries) > 0 {
  146. e := rd.Entries[len(rd.Entries)-1]
  147. g.raft.raftLog.stableTo(e.Index, e.Term)
  148. }
  149. if !IsEmptySnap(rd.Snapshot) {
  150. g.prevSnapi = rd.Snapshot.Metadata.Index
  151. g.raft.raftLog.stableSnapTo(g.prevSnapi)
  152. }
  153. }
  154. func (mn *multiNode) run() {
  155. groups := map[uint64]*groupState{}
  156. rds := map[uint64]Ready{}
  157. var advancec chan map[uint64]Ready
  158. for {
  159. // Only select readyc if we have something to report and we are not
  160. // currently waiting for an advance.
  161. readyc := mn.readyc
  162. if len(rds) == 0 || advancec != nil {
  163. readyc = nil
  164. }
  165. // group points to the group that was touched on this iteration (if any)
  166. var group *groupState
  167. select {
  168. case gc := <-mn.groupc:
  169. // TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it?
  170. // TODO(bdarnell): make maxSizePerMsg(InflightMsgs) configurable
  171. c := &Config{
  172. ID: mn.id,
  173. ElectionTick: mn.election,
  174. HeartbeatTick: mn.heartbeat,
  175. Storage: gc.storage,
  176. MaxSizePerMsg: noLimit,
  177. MaxInflightMsgs: 256,
  178. }
  179. r := newRaft(c)
  180. group = &groupState{
  181. id: gc.id,
  182. raft: r,
  183. }
  184. groups[gc.id] = group
  185. lastIndex, err := gc.storage.LastIndex()
  186. if err != nil {
  187. panic(err) // TODO(bdarnell)
  188. }
  189. // If the log is empty, this is a new group (like StartNode); otherwise it's
  190. // restoring an existing group (like RestartNode).
  191. // TODO(bdarnell): rethink group initialization and whether the application needs
  192. // to be able to tell us when it expects the group to exist.
  193. if lastIndex == 0 {
  194. r.becomeFollower(1, None)
  195. ents := make([]pb.Entry, len(gc.peers))
  196. for i, peer := range gc.peers {
  197. cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
  198. data, err := cc.Marshal()
  199. if err != nil {
  200. panic("unexpected marshal error")
  201. }
  202. ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
  203. }
  204. r.raftLog.append(ents...)
  205. r.raftLog.committed = uint64(len(ents))
  206. for _, peer := range gc.peers {
  207. r.addNode(peer.ID)
  208. }
  209. }
  210. // Set the initial hard and soft states after performing all initialization.
  211. group.prevSoftSt = r.softState()
  212. group.prevHardSt = r.HardState
  213. close(gc.done)
  214. case gr := <-mn.rmgroupc:
  215. delete(groups, gr.id)
  216. delete(rds, gr.id)
  217. close(gr.done)
  218. case mm := <-mn.propc:
  219. // TODO(bdarnell): single-node impl doesn't read from propc unless the group
  220. // has a leader; we can't do that since we have one propc for many groups.
  221. // We'll have to buffer somewhere on a group-by-group basis, or just let
  222. // raft.Step drop any such proposals on the floor.
  223. mm.msg.From = mn.id
  224. group = groups[mm.group]
  225. group.raft.Step(mm.msg)
  226. case mm := <-mn.recvc:
  227. group = groups[mm.group]
  228. if _, ok := group.raft.prs[mm.msg.From]; ok || !IsResponseMsg(mm.msg) {
  229. group.raft.Step(mm.msg)
  230. }
  231. case mcc := <-mn.confc:
  232. group = groups[mcc.group]
  233. if mcc.msg.NodeID == None {
  234. group.raft.resetPendingConf()
  235. select {
  236. case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
  237. case <-mn.done:
  238. }
  239. break
  240. }
  241. switch mcc.msg.Type {
  242. case pb.ConfChangeAddNode:
  243. group.raft.addNode(mcc.msg.NodeID)
  244. case pb.ConfChangeRemoveNode:
  245. group.raft.removeNode(mcc.msg.NodeID)
  246. case pb.ConfChangeUpdateNode:
  247. group.raft.resetPendingConf()
  248. default:
  249. panic("unexpected conf type")
  250. }
  251. select {
  252. case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
  253. case <-mn.done:
  254. }
  255. case <-mn.tickc:
  256. // TODO(bdarnell): instead of calling every group on every tick,
  257. // we should have a priority queue of groups based on their next
  258. // time-based event.
  259. for _, g := range groups {
  260. g.raft.tick()
  261. rd := g.newReady()
  262. if rd.containsUpdates() {
  263. rds[g.id] = rd
  264. }
  265. }
  266. case readyc <- rds:
  267. // Clear outgoing messages as soon as we've passed them to the application.
  268. for g := range rds {
  269. groups[g].raft.msgs = nil
  270. }
  271. rds = map[uint64]Ready{}
  272. advancec = mn.advancec
  273. case advs := <-advancec:
  274. for groupID, rd := range advs {
  275. group, ok := groups[groupID]
  276. if !ok {
  277. continue
  278. }
  279. group.commitReady(rd)
  280. // We've been accumulating new entries in rds which may now be obsolete.
  281. // Drop the old Ready object and create a new one if needed.
  282. delete(rds, groupID)
  283. newRd := group.newReady()
  284. if newRd.containsUpdates() {
  285. rds[groupID] = newRd
  286. }
  287. }
  288. advancec = nil
  289. case ms := <-mn.status:
  290. ms.ch <- getStatus(groups[ms.group].raft)
  291. case <-mn.stop:
  292. close(mn.done)
  293. return
  294. }
  295. if group != nil {
  296. rd := group.newReady()
  297. if rd.containsUpdates() {
  298. rds[group.id] = rd
  299. }
  300. }
  301. }
  302. }
  303. func (mn *multiNode) CreateGroup(id uint64, peers []Peer, storage Storage) error {
  304. gc := groupCreation{
  305. id: id,
  306. peers: peers,
  307. storage: storage,
  308. done: make(chan struct{}),
  309. }
  310. mn.groupc <- gc
  311. select {
  312. case <-gc.done:
  313. return nil
  314. case <-mn.done:
  315. return ErrStopped
  316. }
  317. }
  318. func (mn *multiNode) RemoveGroup(id uint64) error {
  319. gr := groupRemoval{
  320. id: id,
  321. done: make(chan struct{}),
  322. }
  323. mn.rmgroupc <- gr
  324. select {
  325. case <-gr.done:
  326. return nil
  327. case <-mn.done:
  328. return ErrStopped
  329. }
  330. }
  331. func (mn *multiNode) Stop() {
  332. select {
  333. case mn.stop <- struct{}{}:
  334. case <-mn.done:
  335. }
  336. <-mn.done
  337. }
  338. func (mn *multiNode) Tick() {
  339. select {
  340. case mn.tickc <- struct{}{}:
  341. case <-mn.done:
  342. }
  343. }
  344. func (mn *multiNode) Campaign(ctx context.Context, group uint64) error {
  345. return mn.step(ctx, multiMessage{group,
  346. pb.Message{
  347. Type: pb.MsgHup,
  348. },
  349. })
  350. }
  351. func (mn *multiNode) Propose(ctx context.Context, group uint64, data []byte) error {
  352. return mn.step(ctx, multiMessage{group,
  353. pb.Message{
  354. Type: pb.MsgProp,
  355. Entries: []pb.Entry{
  356. {Data: data},
  357. },
  358. }})
  359. }
  360. func (mn *multiNode) ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error {
  361. data, err := cc.Marshal()
  362. if err != nil {
  363. return err
  364. }
  365. return mn.Step(ctx, group,
  366. pb.Message{
  367. Type: pb.MsgProp,
  368. Entries: []pb.Entry{
  369. {Type: pb.EntryConfChange, Data: data},
  370. },
  371. })
  372. }
  373. func (mn *multiNode) step(ctx context.Context, m multiMessage) error {
  374. ch := mn.recvc
  375. if m.msg.Type == pb.MsgProp {
  376. ch = mn.propc
  377. }
  378. select {
  379. case ch <- m:
  380. return nil
  381. case <-ctx.Done():
  382. return ctx.Err()
  383. case <-mn.done:
  384. return ErrStopped
  385. }
  386. }
  387. func (mn *multiNode) ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState {
  388. mcc := multiConfChange{group, cc, make(chan pb.ConfState)}
  389. select {
  390. case mn.confc <- mcc:
  391. case <-mn.done:
  392. }
  393. select {
  394. case cs := <-mcc.ch:
  395. return &cs
  396. case <-mn.done:
  397. // Per comments on Node.ApplyConfChange, this method should never return nil.
  398. return &pb.ConfState{}
  399. }
  400. }
  401. func (mn *multiNode) Step(ctx context.Context, group uint64, m pb.Message) error {
  402. // ignore unexpected local messages receiving over network
  403. if IsLocalMsg(m) {
  404. // TODO: return an error?
  405. return nil
  406. }
  407. return mn.step(ctx, multiMessage{group, m})
  408. }
  409. func (mn *multiNode) Ready() <-chan map[uint64]Ready {
  410. return mn.readyc
  411. }
  412. func (mn *multiNode) Advance(rds map[uint64]Ready) {
  413. select {
  414. case mn.advancec <- rds:
  415. case <-mn.done:
  416. }
  417. }
  418. func (mn *multiNode) Status(group uint64) Status {
  419. ms := multiStatus{
  420. group: group,
  421. ch: make(chan Status),
  422. }
  423. mn.status <- ms
  424. return <-ms.ch
  425. }
  426. func (mn *multiNode) ReportUnreachable(id, groupID uint64) {
  427. select {
  428. case mn.recvc <- multiMessage{
  429. group: groupID,
  430. msg: pb.Message{Type: pb.MsgUnreachable, From: id},
  431. }:
  432. case <-mn.done:
  433. }
  434. }
  435. func (mn *multiNode) ReportSnapshot(id, groupID uint64, status SnapshotStatus) {
  436. rej := status == SnapshotFailure
  437. select {
  438. case mn.recvc <- multiMessage{
  439. group: groupID,
  440. msg: pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej},
  441. }:
  442. case <-mn.done:
  443. }
  444. }