123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- // Copyright 2015 CoreOS, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- /*
- Package raft provides an implementation of the raft consensus algorithm.
- Usage
- The primary object in raft is a Node. You either start a Node from scratch
- using raft.StartNode or start a Node from some initial state using raft.RestartNode.
- storage := raft.NewMemoryStorage()
- c := &Config{
- ID: 0x01,
- ElectionTick: 10,
- HeartbeatTick: 1,
- Storage: storage,
- MaxSizePerMsg: 4096,
- MaxInflightMsgs: 256,
- }
- n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
- Now that you are holding onto a Node you have a few responsibilities:
- First, you must read from the Node.Ready() channel and process the updates
- it contains. These steps may be performed in parallel, except as noted in step
- 2.
- 1. Write HardState, Entries, and Snapshot to persistent storage if they are
- not empty. Note that when writing an Entry with Index i, any
- previously-persisted entries with Index >= i must be discarded.
- 2. Send all Messages to the nodes named in the To field. It is important that
- no messages be sent until after the latest HardState has been persisted to disk,
- and all Entries written by any previous Ready batch (Messages may be sent while
- entries from the same batch are being persisted). If any Message has type MsgSnap,
- call Node.ReportSnapshot() after it has been sent (these messages may be large).
- 3. Apply Snapshot (if any) and CommittedEntries to the state machine.
- If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange()
- to apply it to the node. The configuration change may be cancelled at this point
- by setting the NodeID field to zero before calling ApplyConfChange
- (but ApplyConfChange must be called one way or the other, and the decision to cancel
- must be based solely on the state machine and not external information such as
- the observed health of the node).
- 4. Call Node.Advance() to signal readiness for the next batch of updates.
- This may be done at any time after step 1, although all updates must be processed
- in the order they were returned by Ready.
- Second, all persisted log entries must be made available via an
- implementation of the Storage interface. The provided MemoryStorage
- type can be used for this (if you repopulate its state upon a
- restart), or you can supply your own disk-backed implementation.
- Third, when you receive a message from another node, pass it to Node.Step:
- func recvRaftRPC(ctx context.Context, m raftpb.Message) {
- n.Step(ctx, m)
- }
- Finally, you need to call Node.Tick() at regular intervals (probably
- via a time.Ticker). Raft has two important timeouts: heartbeat and the
- election timeout. However, internally to the raft package time is
- represented by an abstract "tick".
- The total state machine handling loop will look something like this:
- for {
- select {
- case <-s.Ticker:
- n.Tick()
- case rd := <-s.Node.Ready():
- saveToStorage(rd.State, rd.Entries, rd.Snapshot)
- send(rd.Messages)
- if !raft.IsEmptySnap(rd.Snapshot) {
- processSnapshot(rd.Snapshot)
- }
- for entry := range rd.CommittedEntries {
- process(entry)
- if entry.Type == raftpb.EntryConfChange:
- var cc raftpb.ConfChange
- cc.Unmarshal(entry.Data)
- s.Node.ApplyConfChange(cc)
- }
- s.Node.Advance()
- case <-s.done:
- return
- }
- }
- To propose changes to the state machine from your node take your application
- data, serialize it into a byte slice and call:
- n.Propose(ctx, data)
- If the proposal is committed, data will appear in committed entries with type
- raftpb.EntryNormal. There is no guarantee that a proposed command will be
- committed; you may have to re-propose after a timeout.
- To add or remove node in a cluster, build ConfChange struct 'cc' and call:
- n.ProposeConfChange(ctx, cc)
- After config change is committed, some committed entry with type
- raftpb.EntryConfChange will be returned. You must apply it to node through:
- var cc raftpb.ConfChange
- cc.Unmarshal(data)
- n.ApplyConfChange(cc)
- Note: An ID represents a unique node in a cluster for all time. A
- given ID MUST be used only once even if the old node has been removed.
- This means that for example IP addresses make poor node IDs since they
- may be reused. Node IDs must be non-zero.
- Implementation notes
- This implementation is up to date with the final Raft thesis
- (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our
- implementation of the membership change protocol differs somewhat from
- that described in chapter 4. The key invariant that membership changes
- happen one node at a time is preserved, but in our implementation the
- membership change takes effect when its entry is applied, not when it
- is added to the log (so the entry is committed under the old
- membership instead of the new). This is equivalent in terms of safety,
- since the old and new configurations are guaranteed to overlap.
- To ensure that we do not attempt to commit two membership changes at
- once by matching log positions (which would be unsafe since they
- should have different quorum requirements), we simply disallow any
- proposed membership change while any uncommitted change appears in
- the leader's log.
- This approach introduces a problem when you try to remove a member
- from a two-member cluster: If one of the members dies before the
- other one receives the commit of the confchange entry, then the member
- cannot be removed any more since the cluster cannot make progress.
- For this reason it is highly recommened to use three or more nodes in
- every cluster.
- */
- package raft
|