| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- package v2
- import (
- "net/http"
- "path"
- "strconv"
- "time"
- "github.com/coreos/go-etcd/etcd"
- "github.com/gorilla/mux"
- )
- // acquireHandler attempts to acquire a lock on the given key.
- // The "key" parameter specifies the resource to lock.
- // The "ttl" parameter specifies how long the lock will persist for.
- // The "timeout" parameter specifies how long the request should wait for the lock.
- func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
- h.client.SyncCluster()
- // Setup connection watcher.
- closeNotifier, _ := w.(http.CloseNotifier)
- closeChan := closeNotifier.CloseNotify()
- // Parse "key" and "ttl" query parameters.
- vars := mux.Vars(req)
- keypath := path.Join(prefix, vars["key"])
- ttl, err := strconv.Atoi(req.FormValue("ttl"))
- if err != nil {
- http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
- return
- }
-
- // Parse "timeout" parameter.
- var timeout int
- if len(req.FormValue("timeout")) == 0 {
- timeout = -1
- } else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
- http.Error(w, "invalid timeout: " + err.Error(), http.StatusInternalServerError)
- return
- }
- timeout = timeout + 1
- // Create an incrementing id for the lock.
- resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
- if err != nil {
- http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
- return
- }
- indexpath := resp.Key
- // Keep updating TTL to make sure lock request is not expired before acquisition.
- stop := make(chan bool)
- go h.ttlKeepAlive(indexpath, ttl, stop)
- // Monitor for broken connection.
- stopWatchChan := make(chan bool)
- go func() {
- select {
- case <-closeChan:
- stopWatchChan <- true
- case <-stop:
- // Stop watching for connection disconnect.
- }
- }()
- // Extract the lock index.
- index, _ := strconv.Atoi(path.Base(resp.Key))
- // Wait until we successfully get a lock or we get a failure.
- var success bool
- for {
- // Read all indices.
- resp, err = h.client.GetAll(keypath, true)
- if err != nil {
- http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
- break
- }
- indices := extractResponseIndices(resp)
- waitIndex := resp.ModifiedIndex
- prevIndex := findPrevIndex(indices, index)
- // If there is no previous index then we have the lock.
- if prevIndex == 0 {
- success = true
- break
- }
- // Otherwise watch previous index until it's gone.
- _, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, nil, stopWatchChan)
- if err == etcd.ErrWatchStoppedByUser {
- break
- } else if err != nil {
- http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
- break
- }
- }
- // Check for connection disconnect before we write the lock index.
- select {
- case <-stopWatchChan:
- success = false
- default:
- }
- // Stop the ttl keep-alive.
- close(stop)
- if success {
- // Write lock index to response body if we acquire the lock.
- h.client.Update(indexpath, "-", uint64(ttl))
- w.Write([]byte(strconv.Itoa(index)))
- } else {
- // Make sure key is deleted if we couldn't acquire.
- h.client.Delete(indexpath)
- }
- }
- // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
- func (h *handler) ttlKeepAlive(k string, ttl int, stop chan bool) {
- for {
- select {
- case <-time.After(time.Duration(ttl / 2) * time.Second):
- h.client.Update(k, "-", uint64(ttl))
- case <-stop:
- return
- }
- }
- }
|