command.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package main
  2. //------------------------------------------------------------------------------
  3. //
  4. // Commands
  5. //
  6. //------------------------------------------------------------------------------
  7. import (
  8. "encoding/json"
  9. "github.com/xiangli-cmu/go-raft"
  10. "github.com/xiangli-cmu/raft-etcd/store"
  11. "time"
  12. )
  13. // A command represents an action to be taken on the replicated state machine.
  14. type Command interface {
  15. CommandName() string
  16. Apply(server *raft.Server) (interface{}, error)
  17. }
  18. // Set command
  19. type SetCommand struct {
  20. Key string `json:"key"`
  21. Value string `json:"value"`
  22. ExpireTime time.Time `json:"expireTime"`
  23. }
  24. // The name of the command in the log
  25. func (c *SetCommand) CommandName() string {
  26. return "set"
  27. }
  28. // Set the value of key to value
  29. func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
  30. return store.Set(c.Key, c.Value, c.ExpireTime, server.CommittedIndex())
  31. }
  32. // Get the path for http request
  33. func (c *SetCommand) GeneratePath() string {
  34. return "set/" + c.Key
  35. }
  36. // Get command
  37. type GetCommand struct {
  38. Key string `json:"key"`
  39. }
  40. // The name of the command in the log
  41. func (c *GetCommand) CommandName() string {
  42. return "get"
  43. }
  44. // Set the value of key to value
  45. func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
  46. res := store.Get(c.Key)
  47. return json.Marshal(res)
  48. }
  49. func (c *GetCommand) GeneratePath() string {
  50. return "get/" + c.Key
  51. }
  52. // Delete command
  53. type DeleteCommand struct {
  54. Key string `json:"key"`
  55. }
  56. // The name of the command in the log
  57. func (c *DeleteCommand) CommandName() string {
  58. return "delete"
  59. }
  60. // Delete the key
  61. func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
  62. return store.Delete(c.Key, server.CommittedIndex())
  63. }
  64. // Watch command
  65. type WatchCommand struct {
  66. Key string `json:"key"`
  67. SinceIndex uint64 `json:"sinceIndex"`
  68. }
  69. //The name of the command in the log
  70. func (c *WatchCommand) CommandName() string {
  71. return "watch"
  72. }
  73. func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
  74. ch := make(chan store.Response, 1)
  75. // add to the watchers list
  76. store.AddWatcher(c.Key, ch, c.SinceIndex)
  77. // wait for the notification for any changing
  78. res := <-ch
  79. return json.Marshal(res)
  80. }
  81. // JoinCommand
  82. type JoinCommand struct {
  83. Name string `json:"name"`
  84. }
  85. func (c *JoinCommand) CommandName() string {
  86. return "join"
  87. }
  88. func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
  89. err := server.AddPeer(c.Name)
  90. // no result will be returned
  91. return nil, err
  92. }