multinode.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. package raft
  2. import (
  3. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  4. pb "github.com/coreos/etcd/raft/raftpb"
  5. )
  6. // MultiNode represents a node that is participating in multiple consensus groups.
  7. // A MultiNode is more efficient than a collection of Nodes.
  8. // The methods of this interface correspond to the methods of Node and are described
  9. // more fully there.
  10. type MultiNode interface {
  11. // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup
  12. // on each particpating node with the same group ID; it may create groups on demand as it
  13. // receives messages. If the given storage contains existing log entries the list of peers
  14. // may be empty.
  15. CreateGroup(group uint64, peers []Peer, storage Storage) error
  16. // RemoveGroup removes a group from the MultiNode.
  17. RemoveGroup(group uint64) error
  18. // Tick advances the internal logical clock by a single tick.
  19. Tick()
  20. // Campaign causes this MultiNode to transition to candidate state in the given group.
  21. Campaign(ctx context.Context, group uint64) error
  22. // Propose proposes that data be appended to the given group's log.
  23. Propose(ctx context.Context, group uint64, data []byte) error
  24. // ProposeConfChange proposes a config change.
  25. ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error
  26. // ApplyConfChange applies a config change to the local node.
  27. ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState
  28. // Step advances the state machine using the given message.
  29. Step(ctx context.Context, group uint64, msg pb.Message) error
  30. // Ready returns a channel that returns the current point-in-time state of any ready
  31. // groups. Only groups with something to report will appear in the map.
  32. Ready() <-chan map[uint64]Ready
  33. // Advance notifies the node that the application has applied and saved progress in the
  34. // last Ready results. It must be called with the last value returned from the Ready()
  35. // channel.
  36. Advance(map[uint64]Ready)
  37. // Status returns the current status of the given group.
  38. Status(group uint64) Status
  39. // Stop performs any necessary termination of the MultiNode.
  40. Stop()
  41. }
  42. // StartMultiNode creates a MultiNode and starts its background goroutine.
  43. // The id identifies this node and will be used as its node ID in all groups.
  44. // The election and heartbeat timers are in units of ticks.
  45. func StartMultiNode(id uint64, election, heartbeat int) MultiNode {
  46. mn := newMultiNode(id, election, heartbeat)
  47. go mn.run()
  48. return &mn
  49. }
  50. // TODO(bdarnell): add group ID to the underlying protos?
  51. type multiMessage struct {
  52. group uint64
  53. msg pb.Message
  54. }
  55. type multiConfChange struct {
  56. group uint64
  57. msg pb.ConfChange
  58. ch chan pb.ConfState
  59. }
  60. type multiStatus struct {
  61. group uint64
  62. ch chan Status
  63. }
  64. type groupCreation struct {
  65. id uint64
  66. peers []Peer
  67. storage Storage
  68. // TODO(bdarnell): do we really need the done channel here? It's
  69. // unlike the rest of this package, but we need the group creation
  70. // to be complete before any Propose or other calls.
  71. done chan struct{}
  72. }
  73. type groupRemoval struct {
  74. id uint64
  75. // TODO(bdarnell): see comment on groupCreation.done
  76. done chan struct{}
  77. }
  78. type multiNode struct {
  79. id uint64
  80. election int
  81. heartbeat int
  82. groupc chan groupCreation
  83. rmgroupc chan groupRemoval
  84. propc chan multiMessage
  85. recvc chan multiMessage
  86. confc chan multiConfChange
  87. readyc chan map[uint64]Ready
  88. advancec chan map[uint64]Ready
  89. tickc chan struct{}
  90. stop chan struct{}
  91. done chan struct{}
  92. status chan multiStatus
  93. }
  94. func newMultiNode(id uint64, election, heartbeat int) multiNode {
  95. return multiNode{
  96. id: id,
  97. election: election,
  98. heartbeat: heartbeat,
  99. groupc: make(chan groupCreation),
  100. rmgroupc: make(chan groupRemoval),
  101. propc: make(chan multiMessage),
  102. recvc: make(chan multiMessage),
  103. confc: make(chan multiConfChange),
  104. readyc: make(chan map[uint64]Ready),
  105. advancec: make(chan map[uint64]Ready),
  106. tickc: make(chan struct{}),
  107. stop: make(chan struct{}),
  108. done: make(chan struct{}),
  109. status: make(chan multiStatus),
  110. }
  111. }
  112. type groupState struct {
  113. id uint64
  114. raft *raft
  115. prevSoftSt *SoftState
  116. prevHardSt pb.HardState
  117. prevSnapi uint64
  118. }
  119. func (g *groupState) newReady() Ready {
  120. return newReady(g.raft, g.prevSoftSt, g.prevHardSt)
  121. }
  122. func (g *groupState) commitReady(rd Ready) {
  123. if rd.SoftState != nil {
  124. g.prevSoftSt = rd.SoftState
  125. }
  126. if !IsEmptyHardState(rd.HardState) {
  127. g.prevHardSt = rd.HardState
  128. }
  129. if !IsEmptySnap(rd.Snapshot) {
  130. g.prevSnapi = rd.Snapshot.Metadata.Index
  131. g.raft.raftLog.stableSnapTo(g.prevSnapi)
  132. }
  133. if len(rd.Entries) > 0 {
  134. // TODO(bdarnell): stableTo(rd.Snapshot.Index) if any
  135. e := rd.Entries[len(rd.Entries)-1]
  136. g.raft.raftLog.stableTo(e.Index, e.Term)
  137. }
  138. // TODO(bdarnell): in node.go, Advance() ignores CommittedEntries and calls
  139. // appliedTo with HardState.Commit, but this causes problems in multinode/cockroach.
  140. // The two should be the same except for the special-casing of the initial ConfChange
  141. // entries.
  142. if len(rd.CommittedEntries) > 0 {
  143. g.raft.raftLog.appliedTo(rd.CommittedEntries[len(rd.CommittedEntries)-1].Index)
  144. }
  145. //g.raft.raftLog.appliedTo(rd.HardState.Commit)
  146. }
  147. func (mn *multiNode) run() {
  148. groups := map[uint64]*groupState{}
  149. rds := map[uint64]Ready{}
  150. var advancec chan map[uint64]Ready
  151. for {
  152. // Only select readyc if we have something to report and we are not
  153. // currently waiting for an advance.
  154. readyc := mn.readyc
  155. if len(rds) == 0 || advancec != nil {
  156. readyc = nil
  157. }
  158. // group points to the group that was touched on this iteration (if any)
  159. var group *groupState
  160. select {
  161. case gc := <-mn.groupc:
  162. // TODO(bdarnell): pass applied through gc and into newRaft. Or get rid of it?
  163. r := newRaft(mn.id, nil, mn.election, mn.heartbeat, gc.storage, 0)
  164. group = &groupState{
  165. id: gc.id,
  166. raft: r,
  167. prevSoftSt: r.softState(),
  168. prevHardSt: r.HardState,
  169. }
  170. groups[gc.id] = group
  171. lastIndex, err := gc.storage.LastIndex()
  172. if err != nil {
  173. panic(err) // TODO(bdarnell)
  174. }
  175. // If the log is empty, this is a new group (like StartNode); otherwise it's
  176. // restoring an existing group (like RestartNode).
  177. // TODO(bdarnell): rethink group initialization and whether the application needs
  178. // to be able to tell us when it expects the group to exist.
  179. if lastIndex == 0 {
  180. r.becomeFollower(1, None)
  181. ents := make([]pb.Entry, len(gc.peers))
  182. for i, peer := range gc.peers {
  183. cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
  184. data, err := cc.Marshal()
  185. if err != nil {
  186. panic("unexpected marshal error")
  187. }
  188. ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
  189. }
  190. r.raftLog.append(ents...)
  191. r.raftLog.committed = uint64(len(ents))
  192. for _, peer := range gc.peers {
  193. r.addNode(peer.ID)
  194. }
  195. }
  196. close(gc.done)
  197. case gr := <-mn.rmgroupc:
  198. delete(groups, gr.id)
  199. delete(rds, gr.id)
  200. close(gr.done)
  201. case mm := <-mn.propc:
  202. // TODO(bdarnell): single-node impl doesn't read from propc unless the group
  203. // has a leader; we can't do that since we have one propc for many groups.
  204. // We'll have to buffer somewhere on a group-by-group basis, or just let
  205. // raft.Step drop any such proposals on the floor.
  206. mm.msg.From = mn.id
  207. group = groups[mm.group]
  208. group.raft.Step(mm.msg)
  209. case mm := <-mn.recvc:
  210. group = groups[mm.group]
  211. if _, ok := group.raft.prs[mm.msg.From]; ok || !IsResponseMsg(mm.msg) {
  212. group.raft.Step(mm.msg)
  213. }
  214. case mcc := <-mn.confc:
  215. group = groups[mcc.group]
  216. if mcc.msg.NodeID == None {
  217. group.raft.resetPendingConf()
  218. select {
  219. case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
  220. case <-mn.done:
  221. }
  222. break
  223. }
  224. switch mcc.msg.Type {
  225. case pb.ConfChangeAddNode:
  226. group.raft.addNode(mcc.msg.NodeID)
  227. case pb.ConfChangeRemoveNode:
  228. group.raft.removeNode(mcc.msg.NodeID)
  229. case pb.ConfChangeUpdateNode:
  230. group.raft.resetPendingConf()
  231. default:
  232. panic("unexpected conf type")
  233. }
  234. select {
  235. case mcc.ch <- pb.ConfState{Nodes: group.raft.nodes()}:
  236. case <-mn.done:
  237. }
  238. case <-mn.tickc:
  239. // TODO(bdarnell): instead of calling every group on every tick,
  240. // we should have a priority queue of groups based on their next
  241. // time-based event.
  242. for _, g := range groups {
  243. g.raft.tick()
  244. rd := g.newReady()
  245. if rd.containsUpdates() {
  246. rds[g.id] = rd
  247. }
  248. }
  249. case readyc <- rds:
  250. // Clear outgoing messages as soon as we've passed them to the application.
  251. for g := range rds {
  252. groups[g].raft.msgs = nil
  253. }
  254. rds = map[uint64]Ready{}
  255. advancec = mn.advancec
  256. case advs := <-advancec:
  257. for groupID, rd := range advs {
  258. group, ok := groups[groupID]
  259. if !ok {
  260. continue
  261. }
  262. group.commitReady(rd)
  263. // We've been accumulating new entries in rds which may now be obsolete.
  264. // Drop the old Ready object and create a new one if needed.
  265. delete(rds, groupID)
  266. newRd := group.newReady()
  267. if newRd.containsUpdates() {
  268. rds[groupID] = newRd
  269. }
  270. }
  271. advancec = nil
  272. case ms := <-mn.status:
  273. ms.ch <- getStatus(groups[ms.group].raft)
  274. case <-mn.stop:
  275. close(mn.done)
  276. return
  277. }
  278. if group != nil {
  279. rd := group.newReady()
  280. if rd.containsUpdates() {
  281. rds[group.id] = rd
  282. }
  283. }
  284. }
  285. }
  286. func (mn *multiNode) CreateGroup(id uint64, peers []Peer, storage Storage) error {
  287. gc := groupCreation{
  288. id: id,
  289. peers: peers,
  290. storage: storage,
  291. done: make(chan struct{}),
  292. }
  293. mn.groupc <- gc
  294. select {
  295. case <-gc.done:
  296. return nil
  297. case <-mn.done:
  298. return ErrStopped
  299. }
  300. }
  301. func (mn *multiNode) RemoveGroup(id uint64) error {
  302. gr := groupRemoval{
  303. id: id,
  304. done: make(chan struct{}),
  305. }
  306. mn.rmgroupc <- gr
  307. select {
  308. case <-gr.done:
  309. return nil
  310. case <-mn.done:
  311. return ErrStopped
  312. }
  313. }
  314. func (mn *multiNode) Stop() {
  315. select {
  316. case mn.stop <- struct{}{}:
  317. case <-mn.done:
  318. }
  319. <-mn.done
  320. }
  321. func (mn *multiNode) Tick() {
  322. select {
  323. case mn.tickc <- struct{}{}:
  324. case <-mn.done:
  325. }
  326. }
  327. func (mn *multiNode) Campaign(ctx context.Context, group uint64) error {
  328. return mn.step(ctx, multiMessage{group,
  329. pb.Message{
  330. Type: pb.MsgHup,
  331. },
  332. })
  333. }
  334. func (mn *multiNode) Propose(ctx context.Context, group uint64, data []byte) error {
  335. return mn.step(ctx, multiMessage{group,
  336. pb.Message{
  337. Type: pb.MsgProp,
  338. Entries: []pb.Entry{
  339. {Data: data},
  340. },
  341. }})
  342. }
  343. func (mn *multiNode) ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error {
  344. data, err := cc.Marshal()
  345. if err != nil {
  346. return err
  347. }
  348. return mn.Step(ctx, group,
  349. pb.Message{
  350. Type: pb.MsgProp,
  351. Entries: []pb.Entry{
  352. {Type: pb.EntryConfChange, Data: data},
  353. },
  354. })
  355. }
  356. func (mn *multiNode) step(ctx context.Context, m multiMessage) error {
  357. ch := mn.recvc
  358. if m.msg.Type == pb.MsgProp {
  359. ch = mn.propc
  360. }
  361. select {
  362. case ch <- m:
  363. return nil
  364. case <-ctx.Done():
  365. return ctx.Err()
  366. case <-mn.done:
  367. return ErrStopped
  368. }
  369. }
  370. func (mn *multiNode) ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState {
  371. mcc := multiConfChange{group, cc, make(chan pb.ConfState)}
  372. select {
  373. case mn.confc <- mcc:
  374. case <-mn.done:
  375. }
  376. select {
  377. case cs := <-mcc.ch:
  378. return &cs
  379. case <-mn.done:
  380. // Per comments on Node.ApplyConfChange, this method should never return nil.
  381. return &pb.ConfState{}
  382. }
  383. }
  384. func (mn *multiNode) Step(ctx context.Context, group uint64, m pb.Message) error {
  385. // ignore unexpected local messages receiving over network
  386. if IsLocalMsg(m) {
  387. // TODO: return an error?
  388. return nil
  389. }
  390. return mn.step(ctx, multiMessage{group, m})
  391. }
  392. func (mn *multiNode) Ready() <-chan map[uint64]Ready {
  393. return mn.readyc
  394. }
  395. func (mn *multiNode) Advance(rds map[uint64]Ready) {
  396. select {
  397. case mn.advancec <- rds:
  398. case <-mn.done:
  399. }
  400. }
  401. func (mn *multiNode) Status(group uint64) Status {
  402. ms := multiStatus{
  403. group: group,
  404. ch: make(chan Status),
  405. }
  406. mn.status <- ms
  407. return <-ms.ch
  408. }