acquire_handler.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package v2
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "path"
  7. "strconv"
  8. "time"
  9. etcdErr "github.com/coreos/etcd/error"
  10. "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
  11. "github.com/coreos/etcd/third_party/github.com/gorilla/mux"
  12. )
  13. // acquireHandler attempts to acquire a lock on the given key.
  14. // The "key" parameter specifies the resource to lock.
  15. // The "value" parameter specifies a value to associate with the lock.
  16. // The "ttl" parameter specifies how long the lock will persist for.
  17. // The "timeout" parameter specifies how long the request should wait for the lock.
  18. func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) error {
  19. h.client.SyncCluster()
  20. // Setup connection watcher.
  21. closeNotifier, _ := w.(http.CloseNotifier)
  22. closeChan := closeNotifier.CloseNotify()
  23. // Wrap closeChan so we can pass it to subsequent components
  24. timeoutChan := make(chan bool)
  25. stopChan := make(chan bool)
  26. go func() {
  27. select {
  28. case <-closeChan:
  29. // Client closed connection
  30. stopChan <- true
  31. case <-timeoutChan:
  32. // Timeout expired
  33. stopChan <- true
  34. case <-stopChan:
  35. }
  36. close(stopChan)
  37. }()
  38. // Parse the lock "key".
  39. vars := mux.Vars(req)
  40. keypath := path.Join(prefix, vars["key"])
  41. value := req.FormValue("value")
  42. // Parse "timeout" parameter.
  43. var timeout int
  44. var err error
  45. if req.FormValue("timeout") == "" {
  46. timeout = -1
  47. } else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
  48. return etcdErr.NewError(etcdErr.EcodeTimeoutNaN, "Acquire", 0)
  49. }
  50. // Parse TTL.
  51. ttl, err := strconv.Atoi(req.FormValue("ttl"))
  52. if err != nil {
  53. return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Acquire", 0)
  54. }
  55. // Search for the node
  56. _, index, pos := h.findExistingNode(keypath, value)
  57. if index == 0 {
  58. // Node doesn't exist; Create it
  59. pos = -1 // Invalidate previous position
  60. index, err = h.createNode(keypath, value, ttl)
  61. if err != nil {
  62. return err
  63. }
  64. }
  65. indexpath := path.Join(keypath, strconv.Itoa(index))
  66. // If pos != 0, we do not already have the lock
  67. if pos != 0 {
  68. if timeout == 0 {
  69. // Attempt to get lock once, no waiting
  70. err = h.get(keypath, index)
  71. } else {
  72. // Keep updating TTL while we wait
  73. go h.ttlKeepAlive(keypath, value, ttl, stopChan)
  74. // Start timeout
  75. go h.timeoutExpire(timeout, timeoutChan, stopChan)
  76. // wait for lock
  77. err = h.watch(keypath, index, stopChan)
  78. }
  79. }
  80. // Return on error, deleting our lock request on the way
  81. if err != nil {
  82. if index > 0 {
  83. h.client.Delete(indexpath, false)
  84. }
  85. return err
  86. }
  87. // Check for connection disconnect before we write the lock index.
  88. select {
  89. case <-stopChan:
  90. err = errors.New("user interrupted")
  91. default:
  92. }
  93. // Update TTL one last time if lock was acquired. Otherwise delete.
  94. if err == nil {
  95. h.client.Update(indexpath, value, uint64(ttl))
  96. } else {
  97. h.client.Delete(indexpath, false)
  98. }
  99. // Write response.
  100. w.Write([]byte(strconv.Itoa(index)))
  101. return nil
  102. }
  103. // createNode creates a new lock node and watches it until it is acquired or acquisition fails.
  104. func (h *handler) createNode(keypath string, value string, ttl int) (int, error) {
  105. // Default the value to "-" if it is blank.
  106. if len(value) == 0 {
  107. value = "-"
  108. }
  109. // Create an incrementing id for the lock.
  110. resp, err := h.client.AddChild(keypath, value, uint64(ttl))
  111. if err != nil {
  112. return 0, err
  113. }
  114. indexpath := resp.Node.Key
  115. index, err := strconv.Atoi(path.Base(indexpath))
  116. return index, err
  117. }
  118. // findExistingNode search for a node on the lock with the given value.
  119. func (h *handler) findExistingNode(keypath string, value string) (*etcd.Node, int, int) {
  120. if len(value) > 0 {
  121. resp, err := h.client.Get(keypath, true, true)
  122. if err == nil {
  123. nodes := lockNodes{resp.Node.Nodes}
  124. if node, pos := nodes.FindByValue(value); node != nil {
  125. index, _ := strconv.Atoi(path.Base(node.Key))
  126. return node, index, pos
  127. }
  128. }
  129. }
  130. return nil, 0, 0
  131. }
  132. // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
  133. func (h *handler) ttlKeepAlive(k string, value string, ttl int, stopChan chan bool) {
  134. for {
  135. select {
  136. case <-time.After(time.Duration(ttl/2) * time.Second):
  137. h.client.Update(k, value, uint64(ttl))
  138. case <-stopChan:
  139. return
  140. }
  141. }
  142. }
  143. // timeoutExpire sets the countdown timer is a positive integer
  144. // cancels on stopChan, sends true on timeoutChan after timer expires
  145. func (h *handler) timeoutExpire(timeout int, timeoutChan chan bool, stopChan chan bool) {
  146. // Set expiration timer if timeout is 1 or higher
  147. if timeout < 1 {
  148. timeoutChan = nil
  149. return
  150. }
  151. select {
  152. case <-stopChan:
  153. return
  154. case <-time.After(time.Duration(timeout) * time.Second):
  155. timeoutChan <- true
  156. return
  157. }
  158. }
  159. func (h *handler) getLockIndex(keypath string, index int) (int, int, error) {
  160. // Read all nodes for the lock.
  161. resp, err := h.client.Get(keypath, true, true)
  162. if err != nil {
  163. return 0, 0, fmt.Errorf("lock watch lookup error: %s", err.Error())
  164. }
  165. nodes := lockNodes{resp.Node.Nodes}
  166. prevIndex, modifiedIndex := nodes.PrevIndex(index)
  167. return prevIndex, modifiedIndex, nil
  168. }
  169. // get tries once to get the lock; no waiting
  170. func (h *handler) get(keypath string, index int) error {
  171. prevIndex, _, err := h.getLockIndex(keypath, index)
  172. if err != nil {
  173. return err
  174. }
  175. if prevIndex == 0 {
  176. // Lock acquired
  177. return nil
  178. }
  179. return fmt.Errorf("failed to acquire lock")
  180. }
  181. // watch continuously waits for a given lock index to be acquired or until lock fails.
  182. // Returns a boolean indicating success.
  183. func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error {
  184. // Wrap close chan so we can pass it to Client.Watch().
  185. stopWatchChan := make(chan bool)
  186. stopWrapChan := make(chan bool)
  187. go func() {
  188. select {
  189. case <-closeChan:
  190. stopWatchChan <- true
  191. case <-stopWrapChan:
  192. stopWatchChan <- true
  193. case <-stopWatchChan:
  194. }
  195. }()
  196. defer close(stopWrapChan)
  197. for {
  198. prevIndex, modifiedIndex, err := h.getLockIndex(keypath, index)
  199. // If there is no previous index then we have the lock.
  200. if prevIndex == 0 {
  201. return nil
  202. }
  203. // Wait from the last modification of the node.
  204. waitIndex := modifiedIndex + 1
  205. _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), uint64(waitIndex), false, nil, stopWatchChan)
  206. if err == etcd.ErrWatchStoppedByUser {
  207. return fmt.Errorf("lock watch closed")
  208. } else if err != nil {
  209. return fmt.Errorf("lock watch error: %s", err.Error())
  210. }
  211. return nil
  212. }
  213. }