Browse Source

contrib: example key-value store using raft

Anthony Romano 10 years ago
parent
commit
1f858e10c8

+ 1 - 0
contrib/README.md

@@ -3,3 +3,4 @@
 Scripts and files which may be useful but aren't part of the core etcd project.
 Scripts and files which may be useful but aren't part of the core etcd project.
 
 
 - [systemd](systemd) - an example unit file for deploying etcd on systemd-based distributions
 - [systemd](systemd) - an example unit file for deploying etcd on systemd-based distributions
+- [raftexample](raftexample) - an example distributed key-value store using raft

+ 4 - 0
contrib/raftexample/Procfile

@@ -0,0 +1,4 @@
+# Use goreman to run `go get github.com/mattn/goreman`
+raftexample1: ./raftexample --id 1 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 12380
+raftexample2: ./raftexample --id 2 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 22380
+raftexample3: ./raftexample --id 3 --cluster http://127.0.0.1:12379,http://127.0.0.1:22379,http://127.0.0.1:32379 --port 32380

+ 90 - 0
contrib/raftexample/README.md

@@ -0,0 +1,90 @@
+# raftexample
+
+raftexample is an example usage of etcd's [raft library](../../raft). It provides a simple REST API for a key-value store cluster backed by the [Raft][raft] consensus algorithm.
+
+[raft]: http://raftconsensus.github.io/
+
+## Getting Started
+
+### Running single node raftexample
+
+First start a single-member cluster of raftexample:
+
+```sh
+raftexample --id 1 --cluster http://127.0.0.1:12379 --port 12380
+```
+
+Each raftexample process maintains a single raft instance and a key-value server.
+The process's list of comma separated peers (--cluster), its raft ID index into the peer list (--id), and http key-value server port (--port) are passed through the command line.
+
+Next, store a value ("hello") to a key ("my-key"):
+
+```
+curl -L http://127.0.0.1:12380/my-key -XPUT -d hello
+```
+
+Finally, retrieve the stored key:
+
+```
+curl -L http://127.0.0.1:12380/my-key
+```
+
+### Running a local cluster
+
+First install [goreman](https://github.com/mattn/goreman), which manages Procfile-based applications.
+
+The [Procfile script](./Procfile) will set up a local example cluster. You can start it with:
+
+```sh
+goreman start
+```
+
+This will bring up three raftexample instances.
+
+You can write a key-value pair to any member of the cluster and likewise retrieve it from any member.
+
+### Fault Tolerance
+
+To test cluster recovery, first start a cluster and write a value "foo":
+```sh
+goreman start
+curl -L http://127.0.0.1:12380/my-key -XPUT -d foo
+```
+
+Next, remove a node and replace the value with "bar" to check cluster availability:
+
+```sh
+goreman run stop raftexample2
+curl -L http://127.0.0.1:12380/my-key -XPUT -d bar
+curl -L http://127.0.0.1:32380/my-key
+```
+
+Finally, bring the node back up and verify it recovers with the updated value "bar":
+```sh
+goreman run start raftexample2
+curl -L http://127.0.0.1:22380/my-key
+```
+
+## Design
+
+The raftexample consists of three components: a raft-backed key-value store, a REST API server, and a raft consensus server based on etcd's raft implementation.
+
+The raft-backed key-value store is a key-value map that holds all committed key-values.
+The store bridges communication between the raft server and the REST server.
+Key-value updates are issued through the store to the raft server.
+The store updates its map once raft reports the updates are committed.
+
+The REST server exposes the current raft consensus by accessing the raft-backed key-value store.
+A GET command looks up a key in the store and returns the value, if any.
+A key-value PUT command issues an update proposal to the store.
+
+The raft server participates in consensus with its cluster peers.
+When the REST server submits a proposal, the raft server transmits the proposal to its peers.
+When raft reaches a consensus, the server publishes all committed updates over a commit channel.
+For raftexample, this commit channel is consumed by the key-value store.
+
+## Project Details
+
+### TODO
+- Snapshot support
+- Dynamic reconfiguration

+ 67 - 0
contrib/raftexample/httpapi.go

@@ -0,0 +1,67 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"io/ioutil"
+	"log"
+	"net/http"
+	"strconv"
+)
+
+// Handler for a http based key-value store backed by raft
+type httpKVAPI struct {
+	store *kvstore
+}
+
+func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	key := r.RequestURI
+	switch {
+	case r.Method == "PUT":
+		v, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			log.Printf("Failed to read on PUT (%v)\n", err)
+			http.Error(w, "Failed on PUT", http.StatusBadRequest)
+			return
+		}
+
+		h.store.Propose(key, string(v))
+
+		// Optimistic-- no waiting for ack from raft. Value is not yet
+		// committed so a subsequent GET on the key may return old value
+		w.WriteHeader(http.StatusNoContent)
+	case r.Method == "GET":
+		if v, ok := h.store.Lookup(key); ok {
+			w.Write([]byte(v))
+		} else {
+			http.Error(w, "Failed to GET", http.StatusNotFound)
+		}
+	default:
+		w.Header().Set("Allow", "PUT")
+		w.Header().Add("Allow", "GET")
+		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
+	}
+}
+
+// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
+func serveHttpKVAPI(port int, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) {
+	srv := http.Server{
+		Addr:    ":" + strconv.Itoa(port),
+		Handler: &httpKVAPI{newKVStore(proposeC, commitC, errorC)},
+	}
+	if err := srv.ListenAndServe(); err != nil {
+		log.Fatal(err)
+	}
+}

+ 82 - 0
contrib/raftexample/kvstore.go

@@ -0,0 +1,82 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"bytes"
+	"encoding/gob"
+	"log"
+	"sync"
+)
+
+// a key-value store backed by raft
+type kvstore struct {
+	proposeC chan<- string // channel for proposing updates
+	mu       sync.RWMutex
+	kvStore  map[string]string // current committed key-value pairs
+}
+
+type kv struct {
+	Key string
+	Val string
+}
+
+func newKVStore(proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
+	s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string)}
+	// replay log into key-value map
+	s.readCommits(commitC, errorC)
+	// read commits from raft into kvStore map until error
+	go s.readCommits(commitC, errorC)
+	return s
+}
+
+func (s *kvstore) Lookup(key string) (string, bool) {
+	s.mu.RLock()
+	v, ok := s.kvStore[key]
+	s.mu.RUnlock()
+	return v, ok
+}
+
+func (s *kvstore) Propose(k string, v string) {
+	var buf bytes.Buffer
+	if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
+		log.Fatal(err)
+	}
+	s.proposeC <- string(buf.Bytes())
+}
+
+func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
+	for {
+		select {
+		case data := <-commitC:
+			if data == nil {
+				// done replaying log; new data incoming
+				return
+			}
+
+			var data_kv kv
+			dec := gob.NewDecoder(bytes.NewBufferString(*data))
+			if err := dec.Decode(&data_kv); err != nil {
+				log.Fatalf("raftexample: could not decode message (%v)", err)
+			}
+			s.mu.Lock()
+			s.kvStore[data_kv.Key] = data_kv.Val
+			s.mu.Unlock()
+		case err := <-errorC:
+			log.Println(err)
+			return
+		}
+	}
+}

+ 36 - 0
contrib/raftexample/main.go

@@ -0,0 +1,36 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"flag"
+	"strings"
+)
+
+func main() {
+	cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
+	id := flag.Int("id", 1, "node ID")
+	kvport := flag.Int("port", 9121, "key-value server port")
+	flag.Parse()
+
+	proposeC := make(chan string)
+	defer close(proposeC)
+
+	// raft provides a commit stream for the proposals from the http api
+	commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), proposeC)
+
+	// the key-value http handler will propose updates to raft
+	serveHttpKVAPI(*kvport, proposeC, commitC, errorC)
+}

+ 234 - 0
contrib/raftexample/raft.go

@@ -0,0 +1,234 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"fmt"
+	"log"
+	"os"
+	"strconv"
+	"time"
+
+	"net/http"
+	"net/url"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/etcd/etcdserver/stats"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
+	"github.com/coreos/etcd/wal"
+	"github.com/coreos/etcd/wal/walpb"
+)
+
+// A key-value stream backed by raft
+type raftNode struct {
+	proposeC <-chan string // proposed messages (k,v)
+	commitC  chan *string  // entries committed to log (k,v)
+	errorC   chan error    // errors from raft session
+
+	id     int      // client ID for raft session
+	peers  []string // raft peer URLs
+	waldir string   // path to WAL directory
+
+	// raft backing for the commit/error channel
+	node        raft.Node
+	raftStorage *raft.MemoryStorage
+	wal         *wal.WAL
+	transport   *rafthttp.Transport
+}
+
+// newRaftNode initiates a raft instance and returns a committed log entry
+// channel and error channel. Proposals for log updates are sent over the
+// provided the proposal channel. All log entries are replayed over the
+// commit channel, followed by a nil message (to indicate the channel is
+// current), then new log entries.
+func newRaftNode(id int, peers []string, proposeC <-chan string) (<-chan *string, <-chan error) {
+	rc := &raftNode{
+		proposeC:    proposeC,
+		commitC:     make(chan *string),
+		errorC:      make(chan error),
+		id:          id,
+		peers:       peers,
+		waldir:      fmt.Sprintf("raftexample-%d", id),
+		raftStorage: raft.NewMemoryStorage(),
+		// rest of structure populated after WAL replay
+	}
+	go rc.startRaft()
+	return rc.commitC, rc.errorC
+}
+
+// publishEntries writes committed log entries to commit channel.
+func (rc *raftNode) publishEntries(ents []raftpb.Entry) {
+	for i := range ents {
+		if ents[i].Type != raftpb.EntryNormal || len(ents[i].Data) == 0 {
+			// ignore conf changes and empty messages
+			continue
+		}
+		s := string(ents[i].Data)
+		rc.commitC <- &s
+	}
+}
+
+// openWAL returns a WAL ready for reading.
+func (rc *raftNode) openWAL() *wal.WAL {
+	if wal.Exist(rc.waldir) == false {
+		if err := os.Mkdir(rc.waldir, 0750); err != nil {
+			log.Fatalf("raftexample: cannot create dir for wal (%v)", err)
+		}
+
+		w, err := wal.Create(rc.waldir, nil)
+		if err != nil {
+			log.Fatalf("raftexample: create wal error (%v)", err)
+		}
+		w.Close()
+	}
+
+	w, err := wal.Open(rc.waldir, walpb.Snapshot{})
+	if err != nil {
+		log.Fatalf("raftexample: error loading wal (%v)", err)
+	}
+
+	return w
+}
+
+// replayWAL replays WAL entries into the raft instance and the commit
+// channel and returns an appendable WAL.
+func (rc *raftNode) replayWAL() *wal.WAL {
+	w := rc.openWAL()
+	_, _, ents, err := w.ReadAll()
+	if err != nil {
+		log.Fatalf("raftexample: failed to read WAL (%v)", err)
+	}
+	// append to storage so raft starts at the right place in log
+	rc.raftStorage.Append(ents)
+	rc.publishEntries(ents)
+	// send nil value so client knows commit channel is current
+	rc.commitC <- nil
+	return w
+}
+
+func (rc *raftNode) writeError(err error) {
+	rc.errorC <- err
+	rc.stop()
+}
+
+func (rc *raftNode) stop() {
+	close(rc.commitC)
+	close(rc.errorC)
+	rc.node.Stop()
+}
+
+func (rc *raftNode) startRaft() {
+	oldwal := wal.Exist(rc.waldir)
+	rc.wal = rc.replayWAL()
+
+	rpeers := make([]raft.Peer, len(rc.peers))
+	for i := range rpeers {
+		rpeers[i] = raft.Peer{ID: uint64(i + 1)}
+	}
+	c := &raft.Config{
+		ID:              uint64(rc.id),
+		ElectionTick:    10,
+		HeartbeatTick:   1,
+		Storage:         rc.raftStorage,
+		MaxSizePerMsg:   1024 * 1024,
+		MaxInflightMsgs: 256,
+	}
+
+	if oldwal {
+		rc.node = raft.RestartNode(c)
+	} else {
+		rc.node = raft.StartNode(c, rpeers)
+	}
+
+	ss := &stats.ServerStats{}
+	ss.Initialize()
+
+	rc.transport = &rafthttp.Transport{
+		ID:          types.ID(rc.id),
+		ClusterID:   0x1000,
+		Raft:        rc,
+		ServerStats: ss,
+		LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
+		ErrorC:      make(chan error),
+	}
+
+	rc.transport.Start()
+	for i := range rc.peers {
+		if i+1 != rc.id {
+			rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
+		}
+	}
+
+	go rc.serveRaft()
+	go rc.serveChannels()
+}
+
+func (rc *raftNode) serveChannels() {
+	defer rc.wal.Close()
+
+	ticker := time.NewTicker(100 * time.Millisecond)
+	defer ticker.Stop()
+
+	// event loop on client proposals and raft updates
+	for {
+		select {
+		case <-ticker.C:
+			rc.node.Tick()
+
+		// send proposals over raft
+		case prop, ok := <-rc.proposeC:
+			if !ok {
+				// client closed channel; shut down
+				rc.stop()
+				return
+			}
+			rc.node.Propose(context.TODO(), []byte(prop))
+
+		// store raft entries to wal, then publish over commit channel
+		case rd := <-rc.node.Ready():
+			rc.wal.Save(rd.HardState, rd.Entries)
+			rc.raftStorage.Append(rd.Entries)
+			rc.transport.Send(rd.Messages)
+			rc.publishEntries(rd.Entries)
+			rc.node.Advance()
+
+		case err := <-rc.transport.ErrorC:
+			rc.writeError(err)
+			return
+		}
+	}
+}
+
+func (rc *raftNode) serveRaft() {
+	url, err := url.Parse(rc.peers[rc.id-1])
+	if err != nil {
+		log.Fatalf("raftexample: Failed parsing URL (%v)", err)
+	}
+
+	srv := http.Server{Addr: url.Host, Handler: rc.transport.Handler()}
+	if err := srv.ListenAndServe(); err != nil {
+		log.Fatalf("raftexample: Failed serving rafthttp (%v)", err)
+	}
+}
+
+func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
+	return rc.node.Step(ctx, m)
+}
+func (rc *raftNode) IsIDRemoved(id uint64) bool                           { return false }
+func (rc *raftNode) ReportUnreachable(id uint64)                          {}
+func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}