acquire_handler.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package lock
  2. import (
  3. "net/http"
  4. "path"
  5. "strconv"
  6. "time"
  7. "github.com/gorilla/mux"
  8. )
  9. // acquireHandler attempts to acquire a lock on the given key.
  10. func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
  11. h.client.SyncCluster()
  12. vars := mux.Vars(req)
  13. keypath := path.Join(prefix, vars["key"])
  14. ttl, err := strconv.Atoi(req.FormValue("ttl"))
  15. if err != nil {
  16. http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
  17. return
  18. }
  19. // Create an incrementing id for the lock.
  20. resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
  21. if err != nil {
  22. http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
  23. return
  24. }
  25. indexpath := resp.Key
  26. // Keep updating TTL to make sure lock request is not expired before acquisition.
  27. stopChan := make(chan bool)
  28. defer close(stopChan)
  29. go func(k string) {
  30. stopped := false
  31. for {
  32. select {
  33. case <-time.After(time.Duration(ttl / 2) * time.Second):
  34. case <-stopChan:
  35. stopped = true
  36. }
  37. h.client.Update(k, "-", uint64(ttl))
  38. if stopped {
  39. break
  40. }
  41. }
  42. }(indexpath)
  43. // Extract the lock index.
  44. index, _ := strconv.Atoi(path.Base(resp.Key))
  45. for {
  46. // Read all indices.
  47. resp, err = h.client.GetAll(keypath, true)
  48. if err != nil {
  49. http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
  50. return
  51. }
  52. indices := extractResponseIndices(resp)
  53. waitIndex := resp.ModifiedIndex
  54. prevIndex := findPrevIndex(indices, index)
  55. // If there is no previous index then we have the lock.
  56. if prevIndex == 0 {
  57. break
  58. }
  59. // Otherwise watch previous index until it's gone.
  60. _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, nil, nil)
  61. if err != nil {
  62. http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
  63. return
  64. }
  65. }
  66. // Write lock index to response body.
  67. w.Write([]byte(strconv.Itoa(index)))
  68. }