|
|
@@ -1,54 +1,18 @@
|
|
|
package etcd
|
|
|
|
|
|
import (
|
|
|
- "encoding/json"
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"io/ioutil"
|
|
|
"math/rand"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"path"
|
|
|
- "reflect"
|
|
|
"strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-// Valid options for GET, PUT, POST, DELETE
|
|
|
-// Using CAPITALIZED_UNDERSCORE to emphasize that these
|
|
|
-// values are meant to be used as constants.
|
|
|
-var (
|
|
|
- VALID_GET_OPTIONS = validOptions{
|
|
|
- "recursive": reflect.Bool,
|
|
|
- "consistent": reflect.Bool,
|
|
|
- "sorted": reflect.Bool,
|
|
|
- "wait": reflect.Bool,
|
|
|
- "waitIndex": reflect.Uint64,
|
|
|
- }
|
|
|
-
|
|
|
- VALID_PUT_OPTIONS = validOptions{
|
|
|
- "prevValue": reflect.String,
|
|
|
- "prevIndex": reflect.Uint64,
|
|
|
- "prevExist": reflect.Bool,
|
|
|
- }
|
|
|
-
|
|
|
- VALID_POST_OPTIONS = validOptions{}
|
|
|
-
|
|
|
- VALID_DELETE_OPTIONS = validOptions{
|
|
|
- "recursive": reflect.Bool,
|
|
|
- }
|
|
|
-
|
|
|
- curlChan chan string
|
|
|
-)
|
|
|
-
|
|
|
-// SetCurlChan sets a channel to which cURL commands which can be used to
|
|
|
-// re-produce requests are sent. This is useful for debugging.
|
|
|
-func SetCurlChan(c chan string) {
|
|
|
- curlChan = c
|
|
|
-}
|
|
|
-
|
|
|
// get issues a GET request
|
|
|
-func (c *Client) get(key string, options options) (*Response, error) {
|
|
|
+func (c *Client) get(key string, options options) (*RawResponse, error) {
|
|
|
logger.Debugf("get %s [%s]", key, c.cluster.Leader)
|
|
|
|
|
|
p := path.Join("keys", key)
|
|
|
@@ -57,15 +21,14 @@ func (c *Client) get(key string, options options) (*Response, error) {
|
|
|
if c.config.Consistency == STRONG_CONSISTENCY {
|
|
|
options["consistent"] = true
|
|
|
}
|
|
|
- if options != nil {
|
|
|
- str, err := optionsToString(options, VALID_GET_OPTIONS)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- p += str
|
|
|
+
|
|
|
+ str, err := options.toParameters(VALID_GET_OPTIONS)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
+ p += str
|
|
|
|
|
|
- resp, err := c.sendRequest("GET", p, url.Values{})
|
|
|
+ resp, err := c.sendRequest("GET", p, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -75,28 +38,19 @@ func (c *Client) get(key string, options options) (*Response, error) {
|
|
|
}
|
|
|
|
|
|
// put issues a PUT request
|
|
|
-func (c *Client) put(key string, value string, ttl uint64, options options) (*Response, error) {
|
|
|
- logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
- v := url.Values{}
|
|
|
-
|
|
|
- if value != "" {
|
|
|
- v.Set("value", value)
|
|
|
- }
|
|
|
-
|
|
|
- if ttl > 0 {
|
|
|
- v.Set("ttl", fmt.Sprintf("%v", ttl))
|
|
|
- }
|
|
|
+func (c *Client) put(key string, value string, ttl uint64,
|
|
|
+ options options) (*RawResponse, error) {
|
|
|
|
|
|
+ logger.Debugf("put %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
p := path.Join("keys", key)
|
|
|
- if options != nil {
|
|
|
- str, err := optionsToString(options, VALID_PUT_OPTIONS)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- p += str
|
|
|
+
|
|
|
+ str, err := options.toParameters(VALID_PUT_OPTIONS)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
+ p += str
|
|
|
|
|
|
- resp, err := c.sendRequest("PUT", p, v)
|
|
|
+ resp, err := c.sendRequest("PUT", p, buildValues(value, ttl))
|
|
|
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -106,19 +60,11 @@ func (c *Client) put(key string, value string, ttl uint64, options options) (*Re
|
|
|
}
|
|
|
|
|
|
// post issues a POST request
|
|
|
-func (c *Client) post(key string, value string, ttl uint64) (*Response, error) {
|
|
|
+func (c *Client) post(key string, value string, ttl uint64) (*RawResponse, error) {
|
|
|
logger.Debugf("post %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
|
|
|
- v := url.Values{}
|
|
|
-
|
|
|
- if value != "" {
|
|
|
- v.Set("value", value)
|
|
|
- }
|
|
|
-
|
|
|
- if ttl > 0 {
|
|
|
- v.Set("ttl", fmt.Sprintf("%v", ttl))
|
|
|
- }
|
|
|
+ p := path.Join("keys", key)
|
|
|
|
|
|
- resp, err := c.sendRequest("POST", path.Join("keys", key), v)
|
|
|
+ resp, err := c.sendRequest("POST", p, buildValues(value, ttl))
|
|
|
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -128,20 +74,18 @@ func (c *Client) post(key string, value string, ttl uint64) (*Response, error) {
|
|
|
}
|
|
|
|
|
|
// delete issues a DELETE request
|
|
|
-func (c *Client) delete(key string, options options) (*Response, error) {
|
|
|
+func (c *Client) delete(key string, options options) (*RawResponse, error) {
|
|
|
logger.Debugf("delete %s [%s]", key, c.cluster.Leader)
|
|
|
- v := url.Values{}
|
|
|
|
|
|
p := path.Join("keys", key)
|
|
|
- if options != nil {
|
|
|
- str, err := optionsToString(options, VALID_DELETE_OPTIONS)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- p += str
|
|
|
+
|
|
|
+ str, err := options.toParameters(VALID_DELETE_OPTIONS)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
+ p += str
|
|
|
|
|
|
- resp, err := c.sendRequest("DELETE", p, v)
|
|
|
+ resp, err := c.sendRequest("DELETE", p, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -151,126 +95,128 @@ func (c *Client) delete(key string, options options) (*Response, error) {
|
|
|
}
|
|
|
|
|
|
// sendRequest sends a HTTP request and returns a Response as defined by etcd
|
|
|
-func (c *Client) sendRequest(method string, _path string, values url.Values) (*Response, error) {
|
|
|
- var body string = values.Encode()
|
|
|
- var resp *http.Response
|
|
|
+func (c *Client) sendRequest(method string, relativePath string,
|
|
|
+ values url.Values) (*RawResponse, error) {
|
|
|
+
|
|
|
var req *http.Request
|
|
|
+ var resp *http.Response
|
|
|
+ var httpPath string
|
|
|
+ var err error
|
|
|
+ var b []byte
|
|
|
+
|
|
|
+ trial := 0
|
|
|
|
|
|
- retry := 0
|
|
|
// if we connect to a follower, we will retry until we found a leader
|
|
|
for {
|
|
|
- var httpPath string
|
|
|
-
|
|
|
- // If _path has schema already, then it's assumed to be
|
|
|
- // a complete URL and therefore needs no further processing.
|
|
|
- u, err := url.Parse(_path)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
+ trial++
|
|
|
+ logger.Debug("begin trail ", trial)
|
|
|
+ if trial > 2*len(c.cluster.Machines) {
|
|
|
+ return nil, fmt.Errorf("Cannot reach servers after %v time", trial)
|
|
|
}
|
|
|
|
|
|
- if u.Scheme != "" {
|
|
|
- httpPath = _path
|
|
|
+ if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
|
|
|
+ // If it's a GET and consistency level is set to WEAK,
|
|
|
+ // then use a random machine.
|
|
|
+ httpPath = c.getHttpPath(true, relativePath)
|
|
|
} else {
|
|
|
- if method == "GET" && c.config.Consistency == WEAK_CONSISTENCY {
|
|
|
- // If it's a GET and consistency level is set to WEAK,
|
|
|
- // then use a random machine.
|
|
|
- httpPath = c.getHttpPath(true, _path)
|
|
|
- } else {
|
|
|
- // Else use the leader.
|
|
|
- httpPath = c.getHttpPath(false, _path)
|
|
|
- }
|
|
|
+ // Else use the leader.
|
|
|
+ httpPath = c.getHttpPath(false, relativePath)
|
|
|
}
|
|
|
|
|
|
// Return a cURL command if curlChan is set
|
|
|
- if curlChan != nil {
|
|
|
+ if c.cURLch != nil {
|
|
|
command := fmt.Sprintf("curl -X %s %s", method, httpPath)
|
|
|
for key, value := range values {
|
|
|
command += fmt.Sprintf(" -d %s=%s", key, value[0])
|
|
|
}
|
|
|
- curlChan <- command
|
|
|
+ c.sendCURL(command)
|
|
|
}
|
|
|
|
|
|
logger.Debug("send.request.to ", httpPath, " | method ", method)
|
|
|
- if body == "" {
|
|
|
|
|
|
+ if values == nil {
|
|
|
req, _ = http.NewRequest(method, httpPath, nil)
|
|
|
-
|
|
|
} else {
|
|
|
- req, _ = http.NewRequest(method, httpPath, strings.NewReader(body))
|
|
|
- req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
|
|
|
- }
|
|
|
+ req, _ = http.NewRequest(method, httpPath,
|
|
|
+ strings.NewReader(values.Encode()))
|
|
|
|
|
|
- resp, err = c.httpClient.Do(req)
|
|
|
+ req.Header.Set("Content-Type",
|
|
|
+ "application/x-www-form-urlencoded; param=value")
|
|
|
+ }
|
|
|
|
|
|
- logger.Debug("recv.response.from ", httpPath)
|
|
|
// network error, change a machine!
|
|
|
- if err != nil {
|
|
|
- retry++
|
|
|
- if retry > 2*len(c.cluster.Machines) {
|
|
|
- return nil, errors.New("Cannot reach servers")
|
|
|
- }
|
|
|
- num := retry % len(c.cluster.Machines)
|
|
|
- logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")
|
|
|
- c.cluster.Leader = c.cluster.Machines[num]
|
|
|
+ if resp, err = c.httpClient.Do(req); err != nil {
|
|
|
+ c.switchLeader(trial % len(c.cluster.Machines))
|
|
|
time.Sleep(time.Millisecond * 200)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
if resp != nil {
|
|
|
- if resp.StatusCode == http.StatusTemporaryRedirect {
|
|
|
- httpPath := resp.Header.Get("Location")
|
|
|
-
|
|
|
- resp.Body.Close()
|
|
|
-
|
|
|
- if httpPath == "" {
|
|
|
- return nil, errors.New("Cannot get redirection location")
|
|
|
- }
|
|
|
+ logger.Debug("recv.response.from ", httpPath)
|
|
|
|
|
|
- c.updateLeader(httpPath)
|
|
|
- logger.Debug("send.redirect")
|
|
|
- // try to connect the leader
|
|
|
- continue
|
|
|
- } else if resp.StatusCode == http.StatusInternalServerError {
|
|
|
- resp.Body.Close()
|
|
|
+ var ok bool
|
|
|
+ ok, b = c.handleResp(resp)
|
|
|
|
|
|
- retry++
|
|
|
- if retry > 2*len(c.cluster.Machines) {
|
|
|
- return nil, errors.New("Cannot reach servers")
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
continue
|
|
|
- } else {
|
|
|
- logger.Debug("send.return.response ", httpPath)
|
|
|
- break
|
|
|
}
|
|
|
|
|
|
+ logger.Debug("recv.success.", httpPath)
|
|
|
+ break
|
|
|
}
|
|
|
- logger.Debug("error.from ", httpPath, " ", err.Error())
|
|
|
+
|
|
|
+ // should not reach here
|
|
|
+ // err and resp should not be nil at the same time
|
|
|
+ logger.Debug("error.from ", httpPath)
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- // Convert HTTP response to etcd response
|
|
|
- b, err := ioutil.ReadAll(resp.Body)
|
|
|
+ r := &RawResponse{
|
|
|
+ StatusCode: resp.StatusCode,
|
|
|
+ Body: b,
|
|
|
+ Header: resp.Header,
|
|
|
+ }
|
|
|
|
|
|
- resp.Body.Close()
|
|
|
+ return r, nil
|
|
|
+}
|
|
|
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+// handleResp handles the responses from the etcd server
|
|
|
+// If status code is OK, read the http body and return it as byte array
|
|
|
+// If status code is TemporaryRedirect, update leader.
|
|
|
+// If status code is InternalServerError, sleep for 200ms.
|
|
|
+func (c *Client) handleResp(resp *http.Response) (bool, []byte) {
|
|
|
+ defer resp.Body.Close()
|
|
|
|
|
|
- if !(resp.StatusCode == http.StatusOK ||
|
|
|
- resp.StatusCode == http.StatusCreated) {
|
|
|
- return nil, handleError(b)
|
|
|
- }
|
|
|
+ code := resp.StatusCode
|
|
|
|
|
|
- var result Response
|
|
|
+ if code == http.StatusTemporaryRedirect {
|
|
|
+ u, err := resp.Location()
|
|
|
|
|
|
- err = json.Unmarshal(b, &result)
|
|
|
+ if err != nil {
|
|
|
+ logger.Warning(err)
|
|
|
+ } else {
|
|
|
+ c.updateLeader(u)
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
+ return false, nil
|
|
|
+
|
|
|
+ } else if code == http.StatusInternalServerError {
|
|
|
+ time.Sleep(time.Millisecond * 200)
|
|
|
+
|
|
|
+ } else if code == http.StatusOK ||
|
|
|
+ code == http.StatusCreated ||
|
|
|
+ code == http.StatusBadRequest {
|
|
|
+ b, err := ioutil.ReadAll(resp.Body)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return false, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ return true, b
|
|
|
}
|
|
|
|
|
|
- return &result, nil
|
|
|
+ logger.Warning("bad status code ", resp.StatusCode)
|
|
|
+ return false, nil
|
|
|
}
|
|
|
|
|
|
func (c *Client) getHttpPath(random bool, s ...string) string {
|
|
|
@@ -288,3 +234,18 @@ func (c *Client) getHttpPath(random bool, s ...string) string {
|
|
|
|
|
|
return fullPath
|
|
|
}
|
|
|
+
|
|
|
+// buildValues builds a url.Values map according to the given value and ttl
|
|
|
+func buildValues(value string, ttl uint64) url.Values {
|
|
|
+ v := url.Values{}
|
|
|
+
|
|
|
+ if value != "" {
|
|
|
+ v.Set("value", value)
|
|
|
+ }
|
|
|
+
|
|
|
+ if ttl > 0 {
|
|
|
+ v.Set("ttl", fmt.Sprintf("%v", ttl))
|
|
|
+ }
|
|
|
+
|
|
|
+ return v
|
|
|
+}
|