kvstore.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "encoding/gob"
  18. "log"
  19. "sync"
  20. )
  21. // a key-value store backed by raft
  22. type kvstore struct {
  23. proposeC chan<- string // channel for proposing updates
  24. mu sync.RWMutex
  25. kvStore map[string]string // current committed key-value pairs
  26. }
  27. type kv struct {
  28. Key string
  29. Val string
  30. }
  31. func newKVStore(proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
  32. s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string)}
  33. // replay log into key-value map
  34. s.readCommits(commitC, errorC)
  35. // read commits from raft into kvStore map until error
  36. go s.readCommits(commitC, errorC)
  37. return s
  38. }
  39. func (s *kvstore) Lookup(key string) (string, bool) {
  40. s.mu.RLock()
  41. v, ok := s.kvStore[key]
  42. s.mu.RUnlock()
  43. return v, ok
  44. }
  45. func (s *kvstore) Propose(k string, v string) {
  46. var buf bytes.Buffer
  47. if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
  48. log.Fatal(err)
  49. }
  50. s.proposeC <- string(buf.Bytes())
  51. }
  52. func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
  53. for data := range commitC {
  54. if data == nil {
  55. // done replaying log; new data incoming
  56. return
  57. }
  58. var dataKv kv
  59. dec := gob.NewDecoder(bytes.NewBufferString(*data))
  60. if err := dec.Decode(&dataKv); err != nil {
  61. log.Fatalf("raftexample: could not decode message (%v)", err)
  62. }
  63. s.mu.Lock()
  64. s.kvStore[dataKv.Key] = dataKv.Val
  65. s.mu.Unlock()
  66. }
  67. if err, ok := <-errorC; ok {
  68. log.Fatal(err)
  69. }
  70. }