raftd.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "github.com/benbjohnson/go-raft"
  8. "log"
  9. "io"
  10. "io/ioutil"
  11. "net/http"
  12. "strings"
  13. "os"
  14. "time"
  15. "strconv"
  16. )
  17. //------------------------------------------------------------------------------
  18. //
  19. // Initialization
  20. //
  21. //------------------------------------------------------------------------------
  22. var verbose bool
  23. var leaderHost string
  24. var address string
  25. func init() {
  26. flag.BoolVar(&verbose, "v", false, "verbose logging")
  27. flag.StringVar(&leaderHost, "c", "", "join to a existing cluster")
  28. flag.StringVar(&address, "a", "", "the address of the local machine")
  29. }
  30. const (
  31. ELECTIONTIMTOUT = 3 * time.Second
  32. HEARTBEATTIMEOUT = 1 * time.Second
  33. )
  34. //------------------------------------------------------------------------------
  35. //
  36. // Typedefs
  37. //
  38. //------------------------------------------------------------------------------
  39. type Info struct {
  40. Host string `json:"host"`
  41. Port int `json:"port"`
  42. }
  43. //------------------------------------------------------------------------------
  44. //
  45. // Variables
  46. //
  47. //------------------------------------------------------------------------------
  48. var server *raft.Server
  49. var logger *log.Logger
  50. //------------------------------------------------------------------------------
  51. //
  52. // Functions
  53. //
  54. //------------------------------------------------------------------------------
  55. //--------------------------------------
  56. // Main
  57. //--------------------------------------
  58. func main() {
  59. var err error
  60. logger = log.New(os.Stdout, "", log.LstdFlags)
  61. flag.Parse()
  62. // Setup commands.
  63. raft.RegisterCommand(&JoinCommand{})
  64. raft.RegisterCommand(&SetCommand{})
  65. raft.RegisterCommand(&GetCommand{})
  66. raft.RegisterCommand(&DeleteCommand{})
  67. // Use the present working directory if a directory was not passed in.
  68. var path string
  69. if flag.NArg() == 0 {
  70. path, _ = os.Getwd()
  71. } else {
  72. path = flag.Arg(0)
  73. if err := os.MkdirAll(path, 0744); err != nil {
  74. fatal("Unable to create path: %v", err)
  75. }
  76. }
  77. // Read server info from file or grab it from user.
  78. var info *Info = getInfo(path)
  79. name := fmt.Sprintf("%s:%d", info.Host, info.Port)
  80. fmt.Printf("Name: %s\n\n", name)
  81. t := transHandler{}
  82. // Setup new raft server.
  83. server, err = raft.NewServer(name, path, t, s, nil)
  84. if err != nil {
  85. fatal("%v", err)
  86. }
  87. server.LoadSnapshot()
  88. server.Initialize()
  89. server.SetElectionTimeout(ELECTIONTIMTOUT)
  90. server.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
  91. if server.IsLogEmpty() {
  92. // start as a leader in a new cluster
  93. if leaderHost == "" {
  94. server.StartHeartbeatTimeout()
  95. server.StartLeader()
  96. // join self as a peer
  97. command := &JoinCommand{}
  98. command.Name = server.Name()
  99. server.Do(command)
  100. // start as a fellower in a existing cluster
  101. } else {
  102. server.StartElectionTimeout()
  103. server.StartFollower()
  104. Join(server, leaderHost)
  105. fmt.Println("success join")
  106. }
  107. // rejoin the previous cluster
  108. } else {
  109. server.StartElectionTimeout()
  110. server.StartFollower()
  111. }
  112. // open the snapshot
  113. go server.Snapshot()
  114. // internal commands
  115. http.HandleFunc("/join", JoinHttpHandler)
  116. http.HandleFunc("/vote", VoteHttpHandler)
  117. http.HandleFunc("/log", GetLogHttpHandler)
  118. http.HandleFunc("/log/append", AppendEntriesHttpHandler)
  119. http.HandleFunc("/snapshot", SnapshotHttpHandler)
  120. // external commands
  121. http.HandleFunc("/set/", SetHttpHandler)
  122. http.HandleFunc("/get/", GetHttpHandler)
  123. http.HandleFunc("/delete/", DeleteHttpHandler)
  124. http.HandleFunc("/watch/", WatchHttpHandler)
  125. // listen on http port
  126. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
  127. }
  128. func usage() {
  129. fatal("usage: raftd [PATH]")
  130. }
  131. //--------------------------------------
  132. // Config
  133. //--------------------------------------
  134. func getInfo(path string) *Info {
  135. info := &Info{}
  136. // Read in the server info if available.
  137. infoPath := fmt.Sprintf("%s/info", path)
  138. if file, err := os.Open(infoPath); err == nil {
  139. if content, err := ioutil.ReadAll(file); err != nil {
  140. fatal("Unable to read info: %v", err)
  141. } else {
  142. if err = json.Unmarshal(content, &info); err != nil {
  143. fatal("Unable to parse info: %v", err)
  144. }
  145. }
  146. file.Close()
  147. // Otherwise ask user for info and write it to file.
  148. } else {
  149. if address == "" {
  150. fatal("Please give the address of the local machine")
  151. }
  152. input := strings.Split(address, ":")
  153. if len(input) != 2 {
  154. fatal("Wrong address %s", address)
  155. }
  156. info.Host = input[0]
  157. info.Host = strings.TrimSpace(info.Host)
  158. info.Port, err = strconv.Atoi(input[1])
  159. if err != nil {
  160. fatal("Wrong port %s", address)
  161. }
  162. // Write to file.
  163. content, _ := json.Marshal(info)
  164. content = []byte(string(content) + "\n")
  165. if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
  166. fatal("Unable to write info to file: %v", err)
  167. }
  168. }
  169. return info
  170. }
  171. //--------------------------------------
  172. // Handlers
  173. //--------------------------------------
  174. // Send join requests to the leader.
  175. func Join(s *raft.Server, serverName string) error {
  176. var b bytes.Buffer
  177. command := &JoinCommand{}
  178. command.Name = s.Name()
  179. json.NewEncoder(&b).Encode(command)
  180. debug("[send] POST http://%v/join", "localhost:4001")
  181. resp, err := http.Post(fmt.Sprintf("http://%s/join", serverName), "application/json", &b)
  182. if resp != nil {
  183. resp.Body.Close()
  184. if resp.StatusCode == http.StatusOK {
  185. return nil
  186. }
  187. }
  188. return fmt.Errorf("raftd: Unable to join: %v", err)
  189. }
  190. //--------------------------------------
  191. // HTTP Utilities
  192. //--------------------------------------
  193. func decodeJsonRequest(req *http.Request, data interface{}) error {
  194. decoder := json.NewDecoder(req.Body)
  195. if err := decoder.Decode(&data); err != nil && err != io.EOF {
  196. logger.Println("Malformed json request: %v", err)
  197. return fmt.Errorf("Malformed json request: %v", err)
  198. }
  199. return nil
  200. }
  201. func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
  202. w.Header().Set("Content-Type", "application/json")
  203. w.WriteHeader(status)
  204. if data != nil {
  205. encoder := json.NewEncoder(w)
  206. encoder.Encode(data)
  207. }
  208. }
  209. //--------------------------------------
  210. // Log
  211. //--------------------------------------
  212. func debug(msg string, v ...interface{}) {
  213. if verbose {
  214. logger.Printf("DEBUG " + msg + "\n", v...)
  215. }
  216. }
  217. func info(msg string, v ...interface{}) {
  218. logger.Printf("INFO " + msg + "\n", v...)
  219. }
  220. func warn(msg string, v ...interface{}) {
  221. logger.Printf("WARN " + msg + "\n", v...)
  222. }
  223. func fatal(msg string, v ...interface{}) {
  224. logger.Printf("FATAL " + msg + "\n", v...)
  225. os.Exit(1)
  226. }