kvstore.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bytes"
  17. "encoding/gob"
  18. "log"
  19. "sync"
  20. )
  21. // a key-value store backed by raft
  22. type kvstore struct {
  23. proposeC chan<- string // channel for proposing updates
  24. mu sync.RWMutex
  25. kvStore map[string]string // current committed key-value pairs
  26. }
  27. type kv struct {
  28. Key string
  29. Val string
  30. }
  31. func newKVStore(proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore {
  32. s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string)}
  33. // replay log into key-value map
  34. s.readCommits(commitC, errorC)
  35. // read commits from raft into kvStore map until error
  36. go s.readCommits(commitC, errorC)
  37. return s
  38. }
  39. func (s *kvstore) Lookup(key string) (string, bool) {
  40. s.mu.RLock()
  41. v, ok := s.kvStore[key]
  42. s.mu.RUnlock()
  43. return v, ok
  44. }
  45. func (s *kvstore) Propose(k string, v string) {
  46. var buf bytes.Buffer
  47. if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
  48. log.Fatal(err)
  49. }
  50. s.proposeC <- string(buf.Bytes())
  51. }
  52. func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
  53. for {
  54. select {
  55. case data := <-commitC:
  56. if data == nil {
  57. // done replaying log; new data incoming
  58. return
  59. }
  60. var data_kv kv
  61. dec := gob.NewDecoder(bytes.NewBufferString(*data))
  62. if err := dec.Decode(&data_kv); err != nil {
  63. log.Fatalf("raftexample: could not decode message (%v)", err)
  64. }
  65. s.mu.Lock()
  66. s.kvStore[data_kv.Key] = data_kv.Val
  67. s.mu.Unlock()
  68. case err := <-errorC:
  69. log.Println(err)
  70. return
  71. }
  72. }
  73. }