multinode.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. package raft
  2. import (
  3. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  4. pb "github.com/coreos/etcd/raft/raftpb"
  5. )
  6. // MultiNode represents a node that is participating in multiple consensus groups.
  7. // A MultiNode is more efficient than a collection of Nodes.
  8. // The methods of this interface correspond to the methods of Node and are described
  9. // more fully there.
  10. type MultiNode interface {
  11. // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup
  12. // on each particpating node with the same group ID; it may create groups on demand as it
  13. // receives messages. If the given storage contains existing log entries the list of peers
  14. // may be empty.
  15. CreateGroup(group uint64, peers []Peer, storage Storage) error
  16. // RemoveGroup removes a group from the MultiNode.
  17. RemoveGroup(group uint64) error
  18. // Tick advances the internal logical clock by a single tick.
  19. Tick()
  20. // Campaign causes this MultiNode to transition to candidate state in the given group.
  21. Campaign(ctx context.Context, group uint64) error
  22. // Propose proposes that data be appended to the given group's log.
  23. Propose(ctx context.Context, group uint64, data []byte) error
  24. // ProposeConfChange proposes a config change.
  25. ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error
  26. // ApplyConfChange applies a config change to the local node.
  27. ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState
  28. // Step advances the state machine using the given message.
  29. Step(ctx context.Context, group uint64, msg pb.Message) error
  30. // Ready returns a channel that returns the current point-in-time state of any ready
  31. // groups. Only groups with something to report will appear in the map.
  32. Ready() <-chan map[uint64]Ready
  33. // Advance notifies the node that the application has applied and saved progress in the
  34. // last Ready results. It must be called with the last value returned from the Ready()
  35. // channel.
  36. Advance(map[uint64]Ready)
  37. // Status returns the current status of the given group.
  38. Status(group uint64) Status
  39. // Report reports the given node is not reachable for the last send.
  40. ReportUnreachable(id, groupID uint64)
  41. // ReportSnapshot reports the stutus of the sent snapshot.
  42. ReportSnapshot(id, groupID uint64, status SnapshotStatus)
  43. // Stop performs any necessary termination of the MultiNode.
  44. Stop()
  45. }
  46. // StartMultiNode creates a MultiNode and starts its background goroutine.
  47. // The id identifies this node and will be used as its node ID in all groups.
  48. // The election and heartbeat timers are in units of ticks.
  49. func StartMultiNode(id uint64, election, heartbeat int) MultiNode {
  50. mn := newMultiNode(id, election, heartbeat)
  51. go mn.run()
  52. return &mn
  53. }
  54. // TODO(bdarnell): add group ID to the underlying protos?
  55. type multiMessage struct {
  56. group uint64
  57. msg pb.Message
  58. }
  59. type multiConfChange struct {
  60. group uint64
  61. msg pb.ConfChange
  62. ch chan pb.ConfState
  63. }
  64. type multiStatus struct {
  65. group uint64
  66. ch chan Status
  67. }
  68. type groupCreation struct {
  69. id uint64
  70. peers []Peer
  71. storage Storage
  72. // TODO(bdarnell): do we really need the done channel here? It's
  73. // unlike the rest of this package, but we need the group creation
  74. // to be complete before any Propose or other calls.
  75. done chan struct{}
  76. }
  77. type groupRemoval struct {
  78. id uint64
  79. // TODO(bdarnell): see comment on groupCreation.done
  80. done chan struct{}
  81. }
  82. type multiNode struct {
  83. id uint64
  84. election int
  85. heartbeat int
  86. groupc chan groupCreation
  87. rmgroupc chan groupRemoval
  88. propc chan multiMessage
  89. recvc chan multiMessage
  90. confc chan multiConfChange
  91. readyc chan map[uint64]Ready
  92. advancec chan map[uint64]Ready
  93. tickc chan struct{}
  94. stop chan struct{}
  95. done chan struct{}
  96. status chan multiStatus
  97. }
  98. func newMultiNode(id uint64, election, heartbeat int) multiNode {
  99. return multiNode{
  100. id: id,
  101. election: election,
  102. heartbeat: heartbeat,
  103. groupc: make(chan groupCreation),
  104. rmgroupc: make(chan groupRemoval),
  105. propc: make(chan multiMessage),
  106. recvc: make(chan multiMessage),
  107. confc: make(chan multiConfChange),
  108. readyc: make(chan map[uint64]Ready),
  109. advancec: make(chan map[uint64]Ready),
  110. tickc: make(chan struct{}),
  111. stop: make(chan struct{}),
  112. done: make(chan struct{}),
  113. status: make(chan multiStatus),
  114. }
  115. }
  116. type groupState struct {
  117. id uint64
  118. raft *raft
  119. prevSoftSt *SoftState
  120. prevHardSt pb.HardState
  121. prevSnapi uint64
  122. }
  123. func (g *groupState) newReady() Ready {
  124. return newReady(g.raft, g.prevSoftSt, g.prevHardSt)
  125. }
  126. func (g *groupState) commitReady(rd Ready) {
  127. if rd.SoftState != nil {
  128. g.prevSoftSt = rd.SoftState
  129. }
  130. if !IsEmptyHardState(rd.HardState) {
  131. g.prevHardSt = rd.HardState
  132. }
  133. if !IsEmptySnap(rd.Snapshot) {
  134. g.prevSnapi = rd.Snapshot.Metadata.Index
  135. g.raft.raftLog.stableSnapTo(g.prevSnapi)
  136. }
  137. if len(rd.Entries) > 0 {
  138. // TODO(bdarnell): stableTo(rd.Snapshot.Index) if any
  139. e := rd.Entries[len(rd.Entries)-1]
  140. g.raft.raftLog.stableTo(e.Index, e.Term)
  141. }
  142. // TODO(bdarnell): in node.go, Advance() ignores CommittedEntries and calls
  143. // appliedTo with HardState.Commit, but this causes problems in multinode/cockroach.
  144. // The two should be the same except for the special-casing of the initial ConfChange
  145. // entries.
  146. if len(rd.CommittedEntries) > 0 {
  147. g.raft.raftLog.appliedTo(rd.CommittedEntries[len(rd.CommittedEntries)-1].Index)
  148. }
  149. //g.raft.raftLog.appliedTo(rd.HardState.Commit)
  150. }
  151. func (mn *multiNode) run() {
  152. groups := map[uint64]*groupState{}
  153. rds := map[uint64]Ready{}
  154. var advancec chan map[uint64]Ready
  155. for {
  156. // Only select readyc if we have something to report and we are not
  157. // currently waiting for an advance.
  158. readyc := mn.readyc
  159. if len(rds) == 0 || advancec != nil {
  160. readyc = nil
  161. }
  162. // group points to the group that was touched on this iteration (if any)
  163. var group *groupState
  164. select {
  165. case gc := <-mn.groupc:
  166. // TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it?
  167. r := newRaft(mn.id, nil, mn.election, mn.heartbeat, gc.storage, 0)
  168. group = &groupState{
  169. id: gc.id,
  170. raft: r,
  171. prevSoftSt: r.softState(),
  172. prevHardSt: r.HardState,
  173. }
  174. groups[gc.id] = group
  175. lastIndex, err := gc.storage.LastIndex()
  176. if err != nil {
  177. panic(err) // TODO(bdarnell)
  178. }
  179. // If the log is empty, this is a new group (like StartNode); otherwise it's
  180. // restoring an existing group (like RestartNode).
  181. // TODO(bdarnell): rethink group initialization and whether the application needs
  182. // to be able to tell us when it expects the group to exist.
  183. if lastIndex == 0 {
  184. r.becomeFollower(1, None)
  185. ents := make([]pb.Entry, len(gc.peers))
  186. for i, peer := range gc.peers {
  187. cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
  188. data, err := cc.Marshal()
  189. if err != nil {
  190. panic("unexpected marshal error")
  191. }
  192. ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
  193. }
  194. r.raftLog.append(ents...)
  195. r.raftLog.committed = uint64(len(ents))
  196. for _, peer := range gc.peers {
  197. r.addNode(peer.ID)
  198. }
  199. }
  200. close(gc.done)
  201. case gr := <-mn.rmgroupc:
  202. delete(groups, gr.id)
  203. delete(rds, gr.id)
  204. close(gr.done)
  205. case mm := <-mn.propc:
  206. // TODO(bdarnell): single-node impl doesn't read from propc unless the group
  207. // has a leader; we can't do that since we have one propc for many groups.
  208. // We'll have to buffer somewhere on a group-by-group basis, or just let
  209. // raft.Step drop any such proposals on the floor.
  210. mm.msg.From = mn.id
  211. group = groups[mm.group]
  212. group.raft.Step(mm.msg)
  213. case mm := <-mn.recvc:
  214. group = groups[mm.group]
  215. if _, ok := group.raft.prs[mm.msg.From]; ok || !IsResponseMsg(mm.msg) {
  216. group.raft.Step(mm.msg)
  217. }
  218. case mcc := <-mn.confc:
  219. group = groups[mcc.group]
  220. if mcc.msg.NodeID == None {
  221. group.raft.resetPendingConf()
  222. select {
  223. case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
  224. case <-mn.done:
  225. }
  226. break
  227. }
  228. switch mcc.msg.Type {
  229. case pb.ConfChangeAddNode:
  230. group.raft.addNode(mcc.msg.NodeID)
  231. case pb.ConfChangeRemoveNode:
  232. group.raft.removeNode(mcc.msg.NodeID)
  233. case pb.ConfChangeUpdateNode:
  234. group.raft.resetPendingConf()
  235. default:
  236. panic("unexpected conf type")
  237. }
  238. select {
  239. case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
  240. case <-mn.done:
  241. }
  242. case <-mn.tickc:
  243. // TODO(bdarnell): instead of calling every group on every tick,
  244. // we should have a priority queue of groups based on their next
  245. // time-based event.
  246. for _, g := range groups {
  247. g.raft.tick()
  248. rd := g.newReady()
  249. if rd.containsUpdates() {
  250. rds[g.id] = rd
  251. }
  252. }
  253. case readyc <- rds:
  254. // Clear outgoing messages as soon as we've passed them to the application.
  255. for g := range rds {
  256. groups[g].raft.msgs = nil
  257. }
  258. rds = map[uint64]Ready{}
  259. advancec = mn.advancec
  260. case advs := <-advancec:
  261. for groupID, rd := range advs {
  262. group, ok := groups[groupID]
  263. if !ok {
  264. continue
  265. }
  266. group.commitReady(rd)
  267. // We've been accumulating new entries in rds which may now be obsolete.
  268. // Drop the old Ready object and create a new one if needed.
  269. delete(rds, groupID)
  270. newRd := group.newReady()
  271. if newRd.containsUpdates() {
  272. rds[groupID] = newRd
  273. }
  274. }
  275. advancec = nil
  276. case ms := <-mn.status:
  277. ms.ch <- getStatus(groups[ms.group].raft)
  278. case <-mn.stop:
  279. close(mn.done)
  280. return
  281. }
  282. if group != nil {
  283. rd := group.newReady()
  284. if rd.containsUpdates() {
  285. rds[group.id] = rd
  286. }
  287. }
  288. }
  289. }
  290. func (mn *multiNode) CreateGroup(id uint64, peers []Peer, storage Storage) error {
  291. gc := groupCreation{
  292. id: id,
  293. peers: peers,
  294. storage: storage,
  295. done: make(chan struct{}),
  296. }
  297. mn.groupc <- gc
  298. select {
  299. case <-gc.done:
  300. return nil
  301. case <-mn.done:
  302. return ErrStopped
  303. }
  304. }
  305. func (mn *multiNode) RemoveGroup(id uint64) error {
  306. gr := groupRemoval{
  307. id: id,
  308. done: make(chan struct{}),
  309. }
  310. mn.rmgroupc <- gr
  311. select {
  312. case <-gr.done:
  313. return nil
  314. case <-mn.done:
  315. return ErrStopped
  316. }
  317. }
  318. func (mn *multiNode) Stop() {
  319. select {
  320. case mn.stop <- struct{}{}:
  321. case <-mn.done:
  322. }
  323. <-mn.done
  324. }
  325. func (mn *multiNode) Tick() {
  326. select {
  327. case mn.tickc <- struct{}{}:
  328. case <-mn.done:
  329. }
  330. }
  331. func (mn *multiNode) Campaign(ctx context.Context, group uint64) error {
  332. return mn.step(ctx, multiMessage{group,
  333. pb.Message{
  334. Type: pb.MsgHup,
  335. },
  336. })
  337. }
  338. func (mn *multiNode) Propose(ctx context.Context, group uint64, data []byte) error {
  339. return mn.step(ctx, multiMessage{group,
  340. pb.Message{
  341. Type: pb.MsgProp,
  342. Entries: []pb.Entry{
  343. {Data: data},
  344. },
  345. }})
  346. }
  347. func (mn *multiNode) ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error {
  348. data, err := cc.Marshal()
  349. if err != nil {
  350. return err
  351. }
  352. return mn.Step(ctx, group,
  353. pb.Message{
  354. Type: pb.MsgProp,
  355. Entries: []pb.Entry{
  356. {Type: pb.EntryConfChange, Data: data},
  357. },
  358. })
  359. }
  360. func (mn *multiNode) step(ctx context.Context, m multiMessage) error {
  361. ch := mn.recvc
  362. if m.msg.Type == pb.MsgProp {
  363. ch = mn.propc
  364. }
  365. select {
  366. case ch <- m:
  367. return nil
  368. case <-ctx.Done():
  369. return ctx.Err()
  370. case <-mn.done:
  371. return ErrStopped
  372. }
  373. }
  374. func (mn *multiNode) ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState {
  375. mcc := multiConfChange{group, cc, make(chan pb.ConfState)}
  376. select {
  377. case mn.confc <- mcc:
  378. case <-mn.done:
  379. }
  380. select {
  381. case cs := <-mcc.ch:
  382. return &cs
  383. case <-mn.done:
  384. // Per comments on Node.ApplyConfChange, this method should never return nil.
  385. return &pb.ConfState{}
  386. }
  387. }
  388. func (mn *multiNode) Step(ctx context.Context, group uint64, m pb.Message) error {
  389. // ignore unexpected local messages receiving over network
  390. if IsLocalMsg(m) {
  391. // TODO: return an error?
  392. return nil
  393. }
  394. return mn.step(ctx, multiMessage{group, m})
  395. }
  396. func (mn *multiNode) Ready() <-chan map[uint64]Ready {
  397. return mn.readyc
  398. }
  399. func (mn *multiNode) Advance(rds map[uint64]Ready) {
  400. select {
  401. case mn.advancec <- rds:
  402. case <-mn.done:
  403. }
  404. }
  405. func (mn *multiNode) Status(group uint64) Status {
  406. ms := multiStatus{
  407. group: group,
  408. ch: make(chan Status),
  409. }
  410. mn.status <- ms
  411. return <-ms.ch
  412. }
  413. func (mn *multiNode) ReportUnreachable(id, groupID uint64) {
  414. select {
  415. case mn.recvc <- multiMessage{
  416. group: groupID,
  417. msg: pb.Message{Type: pb.MsgUnreachable, From: id},
  418. }:
  419. case <-mn.done:
  420. }
  421. }
  422. func (mn *multiNode) ReportSnapshot(id, groupID uint64, status SnapshotStatus) {
  423. rej := status == SnapshotFailure
  424. select {
  425. case mn.recvc <- multiMessage{
  426. group: groupID,
  427. msg: pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej},
  428. }:
  429. case <-mn.done:
  430. }
  431. }