raftd.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "github.com/benbjohnson/go-raft"
  8. "log"
  9. "io"
  10. "io/ioutil"
  11. "net/http"
  12. "strings"
  13. "os"
  14. "time"
  15. "strconv"
  16. "github.com/xiangli-cmu/raft-etcd/web"
  17. "github.com/xiangli-cmu/raft-etcd/store"
  18. )
  19. //------------------------------------------------------------------------------
  20. //
  21. // Initialization
  22. //
  23. //------------------------------------------------------------------------------
  24. var verbose bool
  25. var leaderHost string
  26. var address string
  27. var webPort int
  28. var cert string
  29. var key string
  30. var CAFile string
  31. func init() {
  32. flag.BoolVar(&verbose, "v", false, "verbose logging")
  33. flag.StringVar(&leaderHost, "c", "", "join to a existing cluster")
  34. flag.StringVar(&address, "a", "", "the address of the local machine")
  35. flag.IntVar(&webPort, "w", -1, "the port of web interface")
  36. }
  37. const (
  38. ELECTIONTIMTOUT = 3 * time.Second
  39. HEARTBEATTIMEOUT = 1 * time.Second
  40. )
  41. //------------------------------------------------------------------------------
  42. //
  43. // Typedefs
  44. //
  45. //------------------------------------------------------------------------------
  46. type Info struct {
  47. Host string `json:"host"`
  48. Port int `json:"port"`
  49. }
  50. //------------------------------------------------------------------------------
  51. //
  52. // Variables
  53. //
  54. //------------------------------------------------------------------------------
  55. var server *raft.Server
  56. var logger *log.Logger
  57. var storeMsg chan string
  58. //------------------------------------------------------------------------------
  59. //
  60. // Functions
  61. //
  62. //------------------------------------------------------------------------------
  63. //--------------------------------------
  64. // Main
  65. //--------------------------------------
  66. func main() {
  67. var err error
  68. logger = log.New(os.Stdout, "", log.LstdFlags)
  69. flag.Parse()
  70. // Setup commands.
  71. raft.RegisterCommand(&JoinCommand{})
  72. raft.RegisterCommand(&SetCommand{})
  73. raft.RegisterCommand(&GetCommand{})
  74. raft.RegisterCommand(&DeleteCommand{})
  75. // Use the present working directory if a directory was not passed in.
  76. var path string
  77. if flag.NArg() == 0 {
  78. path, _ = os.Getwd()
  79. } else {
  80. path = flag.Arg(0)
  81. if err := os.MkdirAll(path, 0744); err != nil {
  82. fatal("Unable to create path: %v", err)
  83. }
  84. }
  85. // Read server info from file or grab it from user.
  86. var info *Info = getInfo(path)
  87. name := fmt.Sprintf("%s:%d", info.Host, info.Port)
  88. fmt.Printf("Name: %s\n\n", name)
  89. t := transHandler{}
  90. // Setup new raft server.
  91. s := store.GetStore()
  92. server, err = raft.NewServer(name, path, t, s, nil)
  93. if err != nil {
  94. fatal("%v", err)
  95. }
  96. server.LoadSnapshot()
  97. debug("%s finished load snapshot", server.Name())
  98. server.Initialize()
  99. debug("%s finished init", server.Name())
  100. server.SetElectionTimeout(ELECTIONTIMTOUT)
  101. server.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
  102. debug("%s finished set timeout", server.Name())
  103. if server.IsLogEmpty() {
  104. // start as a leader in a new cluster
  105. if leaderHost == "" {
  106. server.StartHeartbeatTimeout()
  107. server.StartLeader()
  108. // join self as a peer
  109. command := &JoinCommand{}
  110. command.Name = server.Name()
  111. server.Do(command)
  112. debug("%s start as a leader", server.Name())
  113. // start as a fellower in a existing cluster
  114. } else {
  115. server.StartElectionTimeout()
  116. server.StartFollower()
  117. Join(server, leaderHost)
  118. fmt.Println("success join")
  119. }
  120. // rejoin the previous cluster
  121. } else {
  122. server.StartElectionTimeout()
  123. server.StartFollower()
  124. debug("%s start as a follower", server.Name())
  125. }
  126. // open the snapshot
  127. go server.Snapshot()
  128. // internal commands
  129. http.HandleFunc("/join", JoinHttpHandler)
  130. http.HandleFunc("/vote", VoteHttpHandler)
  131. http.HandleFunc("/log", GetLogHttpHandler)
  132. http.HandleFunc("/log/append", AppendEntriesHttpHandler)
  133. http.HandleFunc("/snapshot", SnapshotHttpHandler)
  134. // external commands
  135. http.HandleFunc("/set/", SetHttpHandler)
  136. http.HandleFunc("/get/", GetHttpHandler)
  137. http.HandleFunc("/delete/", DeleteHttpHandler)
  138. http.HandleFunc("/watch/", WatchHttpHandler)
  139. if webPort != -1 {
  140. // start web
  141. s.SetMessager(&storeMsg)
  142. go webHelper()
  143. go web.Start(server, webPort)
  144. }
  145. // listen on http port
  146. log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
  147. }
  148. func usage() {
  149. fatal("usage: raftd [PATH]")
  150. }
  151. //--------------------------------------
  152. // Config
  153. //--------------------------------------
  154. func getInfo(path string) *Info {
  155. info := &Info{}
  156. // Read in the server info if available.
  157. infoPath := fmt.Sprintf("%s/info", path)
  158. if file, err := os.Open(infoPath); err == nil {
  159. if content, err := ioutil.ReadAll(file); err != nil {
  160. fatal("Unable to read info: %v", err)
  161. } else {
  162. if err = json.Unmarshal(content, &info); err != nil {
  163. fatal("Unable to parse info: %v", err)
  164. }
  165. }
  166. file.Close()
  167. // Otherwise ask user for info and write it to file.
  168. } else {
  169. if address == "" {
  170. fatal("Please give the address of the local machine")
  171. }
  172. input := strings.Split(address, ":")
  173. if len(input) != 2 {
  174. fatal("Wrong address %s", address)
  175. }
  176. info.Host = input[0]
  177. info.Host = strings.TrimSpace(info.Host)
  178. info.Port, err = strconv.Atoi(input[1])
  179. if err != nil {
  180. fatal("Wrong port %s", address)
  181. }
  182. // Write to file.
  183. content, _ := json.Marshal(info)
  184. content = []byte(string(content) + "\n")
  185. if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
  186. fatal("Unable to write info to file: %v", err)
  187. }
  188. }
  189. return info
  190. }
  191. //--------------------------------------
  192. // Handlers
  193. //--------------------------------------
  194. // Send join requests to the leader.
  195. func Join(s *raft.Server, serverName string) error {
  196. var b bytes.Buffer
  197. command := &JoinCommand{}
  198. command.Name = s.Name()
  199. json.NewEncoder(&b).Encode(command)
  200. debug("[send] POST http://%v/join", "localhost:4001")
  201. resp, err := http.Post(fmt.Sprintf("http://%s/join", serverName), "application/json", &b)
  202. if resp != nil {
  203. resp.Body.Close()
  204. if resp.StatusCode == http.StatusOK {
  205. return nil
  206. }
  207. }
  208. return fmt.Errorf("raftd: Unable to join: %v", err)
  209. }
  210. //--------------------------------------
  211. // Web Helper
  212. //--------------------------------------
  213. func webHelper() {
  214. storeMsg = make(chan string)
  215. for {
  216. web.Hub().Send(<-storeMsg)
  217. }
  218. }
  219. //--------------------------------------
  220. // HTTP Utilities
  221. //--------------------------------------
  222. func decodeJsonRequest(req *http.Request, data interface{}) error {
  223. decoder := json.NewDecoder(req.Body)
  224. if err := decoder.Decode(&data); err != nil && err != io.EOF {
  225. logger.Println("Malformed json request: %v", err)
  226. return fmt.Errorf("Malformed json request: %v", err)
  227. }
  228. return nil
  229. }
  230. func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
  231. w.Header().Set("Content-Type", "application/json")
  232. w.WriteHeader(status)
  233. if data != nil {
  234. encoder := json.NewEncoder(w)
  235. encoder.Encode(data)
  236. }
  237. }
  238. //--------------------------------------
  239. // Log
  240. //--------------------------------------
  241. func debug(msg string, v ...interface{}) {
  242. if verbose {
  243. logger.Printf("DEBUG " + msg + "\n", v...)
  244. }
  245. }
  246. func info(msg string, v ...interface{}) {
  247. logger.Printf("INFO " + msg + "\n", v...)
  248. }
  249. func warn(msg string, v ...interface{}) {
  250. logger.Printf("WARN " + msg + "\n", v...)
  251. }
  252. func fatal(msg string, v ...interface{}) {
  253. logger.Printf("FATAL " + msg + "\n", v...)
  254. os.Exit(1)
  255. }