v2_http_put.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package etcd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "net/url"
  8. "strconv"
  9. "time"
  10. etcdErr "github.com/coreos/etcd/error"
  11. "github.com/coreos/etcd/store"
  12. )
  13. func (p *participant) PutHandler(w http.ResponseWriter, req *http.Request) error {
  14. if !p.node.IsLeader() {
  15. return p.redirect(w, req, p.node.Leader())
  16. }
  17. key := req.URL.Path[len("/v2/keys"):]
  18. req.ParseForm()
  19. value := req.Form.Get("value")
  20. dir := (req.FormValue("dir") == "true")
  21. expireTime, err := store.TTL(req.Form.Get("ttl"))
  22. if err != nil {
  23. return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", p.Store.Index())
  24. }
  25. prevValue, valueOk := firstValue(req.Form, "prevValue")
  26. prevIndexStr, indexOk := firstValue(req.Form, "prevIndex")
  27. prevExist, existOk := firstValue(req.Form, "prevExist")
  28. // Set handler: create a new node or replace the old one.
  29. if !valueOk && !indexOk && !existOk {
  30. return p.serveSet(w, req, key, dir, value, expireTime)
  31. }
  32. // update with test
  33. if existOk {
  34. if prevExist == "false" {
  35. // Create command: create a new node. Fail, if a node already exists
  36. // Ignore prevIndex and prevValue
  37. return p.serveCreate(w, req, key, dir, value, expireTime)
  38. }
  39. if prevExist == "true" && !indexOk && !valueOk {
  40. return p.serveUpdate(w, req, key, value, expireTime)
  41. }
  42. }
  43. var prevIndex uint64
  44. if indexOk {
  45. prevIndex, err = strconv.ParseUint(prevIndexStr, 10, 64)
  46. // bad previous index
  47. if err != nil {
  48. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "CompareAndSwap", p.Store.Index())
  49. }
  50. } else {
  51. prevIndex = 0
  52. }
  53. if valueOk {
  54. if prevValue == "" {
  55. return etcdErr.NewError(etcdErr.EcodePrevValueRequired, "CompareAndSwap", p.Store.Index())
  56. }
  57. }
  58. return p.serveCAS(w, req, key, value, prevValue, prevIndex, expireTime)
  59. }
  60. func (p *participant) handleRet(w http.ResponseWriter, ret *store.Event) {
  61. b, _ := json.Marshal(ret)
  62. w.Header().Set("Content-Type", "application/json")
  63. // etcd index should be the same as the event index
  64. // which is also the last modified index of the node
  65. w.Header().Add("X-Etcd-Index", fmt.Sprint(ret.Index()))
  66. // w.Header().Add("X-Raft-Index", fmt.Sprint(p.CommitIndex()))
  67. // w.Header().Add("X-Raft-Term", fmt.Sprint(p.Term()))
  68. if ret.IsCreated() {
  69. w.WriteHeader(http.StatusCreated)
  70. } else {
  71. w.WriteHeader(http.StatusOK)
  72. }
  73. w.Write(b)
  74. }
  75. func (p *participant) serveSet(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
  76. ret, err := p.Set(key, dir, value, expireTime)
  77. if err == nil {
  78. p.handleRet(w, ret)
  79. return nil
  80. }
  81. log.Println("set:", err)
  82. return err
  83. }
  84. func (p *participant) serveCreate(w http.ResponseWriter, req *http.Request, key string, dir bool, value string, expireTime time.Time) error {
  85. ret, err := p.Create(key, dir, value, expireTime, false)
  86. if err == nil {
  87. p.handleRet(w, ret)
  88. return nil
  89. }
  90. log.Println("create:", err)
  91. return err
  92. }
  93. func (p *participant) serveUpdate(w http.ResponseWriter, req *http.Request, key, value string, expireTime time.Time) error {
  94. // Update should give at least one option
  95. if value == "" && expireTime.Sub(store.Permanent) == 0 {
  96. return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", p.Store.Index())
  97. }
  98. ret, err := p.Update(key, value, expireTime)
  99. if err == nil {
  100. p.handleRet(w, ret)
  101. return nil
  102. }
  103. log.Println("update:", err)
  104. return err
  105. }
  106. func (p *participant) serveCAS(w http.ResponseWriter, req *http.Request, key, value, prevValue string, prevIndex uint64, expireTime time.Time) error {
  107. ret, err := p.CAS(key, value, prevValue, prevIndex, expireTime)
  108. if err == nil {
  109. p.handleRet(w, ret)
  110. return nil
  111. }
  112. log.Println("update:", err)
  113. return err
  114. }
  115. func firstValue(f url.Values, key string) (string, bool) {
  116. l, ok := f[key]
  117. if !ok {
  118. return "", false
  119. }
  120. return l[0], true
  121. }