etcd_handler_v1.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package main
  2. import (
  3. "encoding/json"
  4. "net/http"
  5. "strconv"
  6. etcdErr "github.com/coreos/etcd/error"
  7. "github.com/coreos/etcd/store"
  8. "github.com/coreos/go-raft"
  9. )
  10. //-------------------------------------------------------------------
  11. // Handlers to handle etcd-store related request via etcd url
  12. //-------------------------------------------------------------------
  13. // Multiplex GET/POST/DELETE request to corresponding handlers
  14. func (e *etcdServer) MultiplexerV1(w http.ResponseWriter, req *http.Request) error {
  15. switch req.Method {
  16. case "GET":
  17. return e.GetHttpHandlerV1(w, req)
  18. case "POST":
  19. return e.SetHttpHandlerV1(w, req)
  20. case "PUT":
  21. return e.SetHttpHandlerV1(w, req)
  22. case "DELETE":
  23. return e.DeleteHttpHandlerV1(w, req)
  24. default:
  25. w.WriteHeader(http.StatusMethodNotAllowed)
  26. return nil
  27. }
  28. }
  29. //--------------------------------------
  30. // State sensitive handlers
  31. // Set/Delete will dispatch to leader
  32. //--------------------------------------
  33. // Set Command Handler
  34. func (e *etcdServer) SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
  35. key := req.URL.Path[len("/v1/keys/"):]
  36. debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  37. req.ParseForm()
  38. value := req.Form.Get("value")
  39. if len(value) == 0 {
  40. return etcdErr.NewError(200, "Set", store.UndefIndex, store.UndefTerm)
  41. }
  42. strDuration := req.Form.Get("ttl")
  43. expireTime, err := durationToExpireTime(strDuration)
  44. if err != nil {
  45. return etcdErr.NewError(202, "Set", store.UndefIndex, store.UndefTerm)
  46. }
  47. if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
  48. command := &TestAndSetCommand{
  49. Key: key,
  50. Value: value,
  51. PrevValue: prevValueArr[0],
  52. ExpireTime: expireTime,
  53. }
  54. return dispatchEtcdCommandV1(command, w, req)
  55. } else {
  56. command := &CreateCommand{
  57. Key: key,
  58. Value: value,
  59. ExpireTime: expireTime,
  60. Force: true,
  61. }
  62. return dispatchEtcdCommandV1(command, w, req)
  63. }
  64. }
  65. // Delete Handler
  66. func (e *etcdServer) DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
  67. key := req.URL.Path[len("/v1/keys/"):]
  68. debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  69. command := &DeleteCommand{
  70. Key: key,
  71. }
  72. return dispatchEtcdCommandV1(command, w, req)
  73. }
  74. //--------------------------------------
  75. // State non-sensitive handlers
  76. // will not dispatch to leader
  77. // TODO: add sensitive version for these
  78. // command?
  79. //--------------------------------------
  80. // Get Handler
  81. func (e *etcdServer) GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
  82. key := req.URL.Path[len("/v1/keys/"):]
  83. r := e.raftServer
  84. debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  85. command := &GetCommand{
  86. Key: key,
  87. }
  88. if event, err := command.Apply(r.Server); err != nil {
  89. return err
  90. } else {
  91. event, _ := event.(*store.Event)
  92. response := eventToResponse(event)
  93. bytes, _ := json.Marshal(response)
  94. w.WriteHeader(http.StatusOK)
  95. w.Write(bytes)
  96. return nil
  97. }
  98. }
  99. // Watch handler
  100. func (e *etcdServer) WatchHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
  101. key := req.URL.Path[len("/v1/watch/"):]
  102. command := &WatchCommand{
  103. Key: key,
  104. }
  105. r := e.raftServer
  106. if req.Method == "GET" {
  107. debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  108. command.SinceIndex = 0
  109. } else if req.Method == "POST" {
  110. // watch from a specific index
  111. debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  112. content := req.FormValue("index")
  113. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  114. if err != nil {
  115. return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm)
  116. }
  117. command.SinceIndex = sinceIndex
  118. } else {
  119. w.WriteHeader(http.StatusMethodNotAllowed)
  120. return nil
  121. }
  122. if event, err := command.Apply(r.Server); err != nil {
  123. return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
  124. } else {
  125. event, _ := event.(*store.Event)
  126. response := eventToResponse(event)
  127. bytes, _ := json.Marshal(response)
  128. w.WriteHeader(http.StatusOK)
  129. w.Write(bytes)
  130. return nil
  131. }
  132. }
  133. // Dispatch the command to leader
  134. func dispatchEtcdCommandV1(c Command, w http.ResponseWriter, req *http.Request) error {
  135. return dispatchV1(c, w, req, nameToEtcdURL)
  136. }
  137. func dispatchV1(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
  138. r := e.raftServer
  139. if r.State() == raft.Leader {
  140. if event, err := r.Do(c); err != nil {
  141. return err
  142. } else {
  143. if event == nil {
  144. return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
  145. }
  146. event, _ := event.(*store.Event)
  147. response := eventToResponse(event)
  148. bytes, _ := json.Marshal(response)
  149. w.WriteHeader(http.StatusOK)
  150. w.Write(bytes)
  151. return nil
  152. }
  153. } else {
  154. leader := r.Leader()
  155. // current no leader
  156. if leader == "" {
  157. return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
  158. }
  159. url, _ := toURL(leader)
  160. redirect(url, w, req)
  161. return nil
  162. }
  163. }
  164. func eventToResponse(event *store.Event) interface{} {
  165. if !event.Dir {
  166. response := &store.Response{
  167. Action: event.Action,
  168. Key: event.Key,
  169. Value: event.Value,
  170. PrevValue: event.PrevValue,
  171. Index: event.Index,
  172. TTL: event.TTL,
  173. Expiration: event.Expiration,
  174. }
  175. if response.Action == store.Create || response.Action == store.Update {
  176. response.Action = "set"
  177. if response.PrevValue == "" {
  178. response.NewKey = true
  179. }
  180. }
  181. return response
  182. } else {
  183. responses := make([]*store.Response, len(event.KVPairs))
  184. for i, kv := range event.KVPairs {
  185. responses[i] = &store.Response{
  186. Action: event.Action,
  187. Key: kv.Key,
  188. Value: kv.Value,
  189. Dir: kv.Dir,
  190. Index: event.Index,
  191. }
  192. }
  193. return responses
  194. }
  195. }