|
|
@@ -12,15 +12,9 @@ import (
|
|
|
"net/url"
|
|
|
"os"
|
|
|
"path"
|
|
|
- "strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-const (
|
|
|
- HTTP = iota
|
|
|
- HTTPS
|
|
|
-)
|
|
|
-
|
|
|
// See SetConsistency for how to use these constants.
|
|
|
const (
|
|
|
// Using strings rather than iota because the consistency level
|
|
|
@@ -34,121 +28,174 @@ const (
|
|
|
defaultBufferSize = 10
|
|
|
)
|
|
|
|
|
|
-type Cluster struct {
|
|
|
- Leader string `json:"leader"`
|
|
|
- Machines []string `json:"machines"`
|
|
|
-}
|
|
|
-
|
|
|
type Config struct {
|
|
|
CertFile string `json:"certFile"`
|
|
|
KeyFile string `json:"keyFile"`
|
|
|
- CaCertFile string `json:"caCertFile"`
|
|
|
- Scheme string `json:"scheme"`
|
|
|
+ CaCertFile []string `json:"caCertFiles"`
|
|
|
Timeout time.Duration `json:"timeout"`
|
|
|
Consistency string `json: "consistency"`
|
|
|
}
|
|
|
|
|
|
type Client struct {
|
|
|
- cluster Cluster `json:"cluster"`
|
|
|
- config Config `json:"config"`
|
|
|
+ config Config `json:"config"`
|
|
|
+ cluster *Cluster `json:"cluster"`
|
|
|
httpClient *http.Client
|
|
|
persistence io.Writer
|
|
|
cURLch chan string
|
|
|
+ keyPrefix string
|
|
|
}
|
|
|
|
|
|
// NewClient create a basic client that is configured to be used
|
|
|
// with the given machine list.
|
|
|
func NewClient(machines []string) *Client {
|
|
|
- // if an empty slice was sent in then just assume localhost
|
|
|
- if len(machines) == 0 {
|
|
|
- machines = []string{"http://127.0.0.1:4001"}
|
|
|
+ config := Config{
|
|
|
+ // default timeout is one second
|
|
|
+ Timeout: time.Second,
|
|
|
+ // default consistency level is STRONG
|
|
|
+ Consistency: STRONG_CONSISTENCY,
|
|
|
}
|
|
|
|
|
|
- // default leader and machines
|
|
|
- cluster := Cluster{
|
|
|
- Leader: machines[0],
|
|
|
- Machines: machines,
|
|
|
+ client := &Client{
|
|
|
+ cluster: NewCluster(machines),
|
|
|
+ config: config,
|
|
|
+ keyPrefix: path.Join(version, "keys"),
|
|
|
+ }
|
|
|
+
|
|
|
+ client.initHTTPClient()
|
|
|
+ client.saveConfig()
|
|
|
+
|
|
|
+ return client
|
|
|
+}
|
|
|
+
|
|
|
+// NewTLSClient create a basic client with TLS configuration
|
|
|
+func NewTLSClient(machines []string, cert, key, caCert string) (*Client, error) {
|
|
|
+ // overwrite the default machine to use https
|
|
|
+ if len(machines) == 0 {
|
|
|
+ machines = []string{"https://127.0.0.1:4001"}
|
|
|
}
|
|
|
|
|
|
config := Config{
|
|
|
- // default use http
|
|
|
- Scheme: "http",
|
|
|
// default timeout is one second
|
|
|
Timeout: time.Second,
|
|
|
// default consistency level is STRONG
|
|
|
Consistency: STRONG_CONSISTENCY,
|
|
|
+ CertFile: cert,
|
|
|
+ KeyFile: key,
|
|
|
+ CaCertFile: make([]string, 0),
|
|
|
}
|
|
|
|
|
|
client := &Client{
|
|
|
- cluster: cluster,
|
|
|
- config: config,
|
|
|
+ cluster: NewCluster(machines),
|
|
|
+ config: config,
|
|
|
+ keyPrefix: path.Join(version, "keys"),
|
|
|
}
|
|
|
|
|
|
- err := setupHttpClient(client)
|
|
|
+ err := client.initHTTPSClient(cert, key)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
- return client
|
|
|
+ err = client.AddRootCA(caCert)
|
|
|
+
|
|
|
+ client.saveConfig()
|
|
|
+
|
|
|
+ return client, nil
|
|
|
}
|
|
|
|
|
|
-// NewClientFile creates a client from a given file path.
|
|
|
+// NewClientFromFile creates a client from a given file path.
|
|
|
// The given file is expected to use the JSON format.
|
|
|
-func NewClientFile(fpath string) (*Client, error) {
|
|
|
+func NewClientFromFile(fpath string) (*Client, error) {
|
|
|
fi, err := os.Open(fpath)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+
|
|
|
defer func() {
|
|
|
if err := fi.Close(); err != nil {
|
|
|
panic(err)
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
- return NewClientReader(fi)
|
|
|
+ return NewClientFromReader(fi)
|
|
|
}
|
|
|
|
|
|
-// NewClientReader creates a Client configured from a given reader.
|
|
|
-// The config is expected to use the JSON format.
|
|
|
-func NewClientReader(reader io.Reader) (*Client, error) {
|
|
|
- var client Client
|
|
|
+// NewClientFromReader creates a Client configured from a given reader.
|
|
|
+// The configuration is expected to use the JSON format.
|
|
|
+func NewClientFromReader(reader io.Reader) (*Client, error) {
|
|
|
+ c := new(Client)
|
|
|
|
|
|
b, err := ioutil.ReadAll(reader)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- err = json.Unmarshal(b, &client)
|
|
|
+ err = json.Unmarshal(b, c)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+ if c.config.CertFile == "" {
|
|
|
+ c.initHTTPClient()
|
|
|
+ } else {
|
|
|
+ err = c.initHTTPSClient(c.config.CertFile, c.config.KeyFile)
|
|
|
+ }
|
|
|
|
|
|
- err = setupHttpClient(&client)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- return &client, nil
|
|
|
+ for _, caCert := range c.config.CaCertFile {
|
|
|
+ if err := c.AddRootCA(caCert); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return c, nil
|
|
|
}
|
|
|
|
|
|
-func setupHttpClient(client *Client) error {
|
|
|
- if client.config.CertFile != "" && client.config.KeyFile != "" {
|
|
|
- err := client.SetCertAndKey(client.config.CertFile, client.config.KeyFile, client.config.CaCertFile)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- } else {
|
|
|
- client.config.CertFile = ""
|
|
|
- client.config.KeyFile = ""
|
|
|
- tr := &http.Transport{
|
|
|
- Dial: dialTimeout,
|
|
|
- TLSClientConfig: &tls.Config{
|
|
|
- InsecureSkipVerify: true,
|
|
|
- },
|
|
|
- }
|
|
|
- client.httpClient = &http.Client{Transport: tr}
|
|
|
+// Override the Client's HTTP Transport object
|
|
|
+func (c *Client) SetTransport(tr *http.Transport) {
|
|
|
+ c.httpClient.Transport = tr
|
|
|
+}
|
|
|
+
|
|
|
+// SetKeyPrefix changes the key prefix from the default `/v2/keys` to whatever
|
|
|
+// is set.
|
|
|
+func (c *Client) SetKeyPrefix(prefix string) {
|
|
|
+ c.keyPrefix = prefix
|
|
|
+}
|
|
|
+
|
|
|
+// initHTTPClient initializes a HTTP client for etcd client
|
|
|
+func (c *Client) initHTTPClient() {
|
|
|
+ tr := &http.Transport{
|
|
|
+ Dial: dialTimeout,
|
|
|
+ TLSClientConfig: &tls.Config{
|
|
|
+ InsecureSkipVerify: true,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ c.httpClient = &http.Client{Transport: tr}
|
|
|
+}
|
|
|
+
|
|
|
+// initHTTPClient initializes a HTTPS client for etcd client
|
|
|
+func (c *Client) initHTTPSClient(cert, key string) error {
|
|
|
+ if cert == "" || key == "" {
|
|
|
+ return errors.New("Require both cert and key path")
|
|
|
+ }
|
|
|
+
|
|
|
+ tlsCert, err := tls.LoadX509KeyPair(cert, key)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ tlsConfig := &tls.Config{
|
|
|
+ Certificates: []tls.Certificate{tlsCert},
|
|
|
+ InsecureSkipVerify: true,
|
|
|
+ }
|
|
|
+
|
|
|
+ tr := &http.Transport{
|
|
|
+ TLSClientConfig: tlsConfig,
|
|
|
+ Dial: dialTimeout,
|
|
|
}
|
|
|
|
|
|
+ c.httpClient = &http.Client{Transport: tr}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -179,114 +226,45 @@ func (c *Client) SetConsistency(consistency string) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// MarshalJSON implements the Marshaller interface
|
|
|
-// as defined by the standard JSON package.
|
|
|
-func (c *Client) MarshalJSON() ([]byte, error) {
|
|
|
- b, err := json.Marshal(struct {
|
|
|
- Config Config `json:"config"`
|
|
|
- Cluster Cluster `json:"cluster"`
|
|
|
- }{
|
|
|
- Config: c.config,
|
|
|
- Cluster: c.cluster,
|
|
|
- })
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
+// AddRootCA adds a root CA cert for the etcd client
|
|
|
+func (c *Client) AddRootCA(caCert string) error {
|
|
|
+ if c.httpClient == nil {
|
|
|
+ return errors.New("Client has not been initialized yet!")
|
|
|
}
|
|
|
|
|
|
- return b, nil
|
|
|
-}
|
|
|
-
|
|
|
-// UnmarshalJSON implements the Unmarshaller interface
|
|
|
-// as defined by the standard JSON package.
|
|
|
-func (c *Client) UnmarshalJSON(b []byte) error {
|
|
|
- temp := struct {
|
|
|
- Config Config `json: "config"`
|
|
|
- Cluster Cluster `json: "cluster"`
|
|
|
- }{}
|
|
|
- err := json.Unmarshal(b, &temp)
|
|
|
+ certBytes, err := ioutil.ReadFile(caCert)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- c.cluster = temp.Cluster
|
|
|
- c.config = temp.Config
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// saveConfig saves the current config using c.persistence.
|
|
|
-func (c *Client) saveConfig() error {
|
|
|
- if c.persistence != nil {
|
|
|
- b, err := json.Marshal(c)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ tr, ok := c.httpClient.Transport.(*http.Transport)
|
|
|
|
|
|
- _, err = c.persistence.Write(b)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ if !ok {
|
|
|
+ panic("AddRootCA(): Transport type assert should not fail")
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (c *Client) SetCertAndKey(cert string, key string, caCert string) error {
|
|
|
- if cert != "" && key != "" {
|
|
|
- tlsCert, err := tls.LoadX509KeyPair(cert, key)
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- tlsConfig := &tls.Config{
|
|
|
- Certificates: []tls.Certificate{tlsCert},
|
|
|
- }
|
|
|
-
|
|
|
- if caCert != "" {
|
|
|
- caCertPool := x509.NewCertPool()
|
|
|
-
|
|
|
- certBytes, err := ioutil.ReadFile(caCert)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- if !caCertPool.AppendCertsFromPEM(certBytes) {
|
|
|
- return errors.New("Unable to load caCert")
|
|
|
- }
|
|
|
-
|
|
|
- tlsConfig.RootCAs = caCertPool
|
|
|
- } else {
|
|
|
- tlsConfig.InsecureSkipVerify = true
|
|
|
- }
|
|
|
-
|
|
|
- tr := &http.Transport{
|
|
|
- TLSClientConfig: tlsConfig,
|
|
|
- Dial: dialTimeout,
|
|
|
+ if tr.TLSClientConfig.RootCAs == nil {
|
|
|
+ caCertPool := x509.NewCertPool()
|
|
|
+ ok = caCertPool.AppendCertsFromPEM(certBytes)
|
|
|
+ if ok {
|
|
|
+ tr.TLSClientConfig.RootCAs = caCertPool
|
|
|
}
|
|
|
-
|
|
|
- c.httpClient = &http.Client{Transport: tr}
|
|
|
- c.saveConfig()
|
|
|
- return nil
|
|
|
+ tr.TLSClientConfig.InsecureSkipVerify = false
|
|
|
+ } else {
|
|
|
+ ok = tr.TLSClientConfig.RootCAs.AppendCertsFromPEM(certBytes)
|
|
|
}
|
|
|
- return errors.New("Require both cert and key path")
|
|
|
-}
|
|
|
|
|
|
-func (c *Client) SetScheme(scheme int) error {
|
|
|
- if scheme == HTTP {
|
|
|
- c.config.Scheme = "http"
|
|
|
- c.saveConfig()
|
|
|
- return nil
|
|
|
+ if !ok {
|
|
|
+ err = errors.New("Unable to load caCert")
|
|
|
}
|
|
|
- if scheme == HTTPS {
|
|
|
- c.config.Scheme = "https"
|
|
|
- c.saveConfig()
|
|
|
- return nil
|
|
|
- }
|
|
|
- return errors.New("Unknown Scheme")
|
|
|
+
|
|
|
+ c.config.CaCertFile = append(c.config.CaCertFile, caCert)
|
|
|
+ c.saveConfig()
|
|
|
+
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
-// SetCluster updates config using the given machine list.
|
|
|
+// SetCluster updates cluster information using the given machine list.
|
|
|
func (c *Client) SetCluster(machines []string) bool {
|
|
|
success := c.internalSyncCluster(machines)
|
|
|
return success
|
|
|
@@ -296,16 +274,15 @@ func (c *Client) GetCluster() []string {
|
|
|
return c.cluster.Machines
|
|
|
}
|
|
|
|
|
|
-// SyncCluster updates config using the internal machine list.
|
|
|
+// SyncCluster updates the cluster information using the internal machine list.
|
|
|
func (c *Client) SyncCluster() bool {
|
|
|
- success := c.internalSyncCluster(c.cluster.Machines)
|
|
|
- return success
|
|
|
+ return c.internalSyncCluster(c.cluster.Machines)
|
|
|
}
|
|
|
|
|
|
// internalSyncCluster syncs cluster information using the given machine list.
|
|
|
func (c *Client) internalSyncCluster(machines []string) bool {
|
|
|
for _, machine := range machines {
|
|
|
- httpPath := c.createHttpPath(machine, version+"/machines")
|
|
|
+ httpPath := c.createHttpPath(machine, path.Join(version, "machines"))
|
|
|
resp, err := c.httpClient.Get(httpPath)
|
|
|
if err != nil {
|
|
|
// try another machine in the cluster
|
|
|
@@ -319,12 +296,11 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
|
|
}
|
|
|
|
|
|
// update Machines List
|
|
|
- c.cluster.Machines = strings.Split(string(b), ", ")
|
|
|
+ c.cluster.updateFromStr(string(b))
|
|
|
|
|
|
// update leader
|
|
|
// the first one in the machine list is the leader
|
|
|
- logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
|
|
|
- c.cluster.Leader = c.cluster.Machines[0]
|
|
|
+ c.cluster.switchLeader(0)
|
|
|
|
|
|
logger.Debug("sync.machines ", c.cluster.Machines)
|
|
|
c.saveConfig()
|
|
|
@@ -337,8 +313,12 @@ func (c *Client) internalSyncCluster(machines []string) bool {
|
|
|
// createHttpPath creates a complete HTTP URL.
|
|
|
// serverName should contain both the host name and a port number, if any.
|
|
|
func (c *Client) createHttpPath(serverName string, _path string) string {
|
|
|
- u, _ := url.Parse(serverName)
|
|
|
- u.Path = path.Join(u.Path, "/", _path)
|
|
|
+ u, err := url.Parse(serverName)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ u.Path = path.Join(u.Path, _path)
|
|
|
|
|
|
if u.Scheme == "" {
|
|
|
u.Scheme = "http"
|
|
|
@@ -351,27 +331,6 @@ func dialTimeout(network, addr string) (net.Conn, error) {
|
|
|
return net.DialTimeout(network, addr, time.Second)
|
|
|
}
|
|
|
|
|
|
-func (c *Client) updateLeader(u *url.URL) {
|
|
|
- var leader string
|
|
|
- if u.Scheme == "" {
|
|
|
- leader = "http://" + u.Host
|
|
|
- } else {
|
|
|
- leader = u.Scheme + "://" + u.Host
|
|
|
- }
|
|
|
-
|
|
|
- logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
|
|
|
- c.cluster.Leader = leader
|
|
|
- c.saveConfig()
|
|
|
-}
|
|
|
-
|
|
|
-// switchLeader switch the current leader to machines[num]
|
|
|
-func (c *Client) switchLeader(num int) {
|
|
|
- logger.Debugf("switch.leader[from %v to %v]",
|
|
|
- c.cluster.Leader, c.cluster.Machines[num])
|
|
|
-
|
|
|
- c.cluster.Leader = c.cluster.Machines[num]
|
|
|
-}
|
|
|
-
|
|
|
func (c *Client) OpenCURL() {
|
|
|
c.cURLch = make(chan string, defaultBufferSize)
|
|
|
}
|
|
|
@@ -392,3 +351,55 @@ func (c *Client) sendCURL(command string) {
|
|
|
func (c *Client) RecvCURL() string {
|
|
|
return <-c.cURLch
|
|
|
}
|
|
|
+
|
|
|
+// saveConfig saves the current config using c.persistence.
|
|
|
+func (c *Client) saveConfig() error {
|
|
|
+ if c.persistence != nil {
|
|
|
+ b, err := json.Marshal(c)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ _, err = c.persistence.Write(b)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// MarshalJSON implements the Marshaller interface
|
|
|
+// as defined by the standard JSON package.
|
|
|
+func (c *Client) MarshalJSON() ([]byte, error) {
|
|
|
+ b, err := json.Marshal(struct {
|
|
|
+ Config Config `json:"config"`
|
|
|
+ Cluster *Cluster `json:"cluster"`
|
|
|
+ }{
|
|
|
+ Config: c.config,
|
|
|
+ Cluster: c.cluster,
|
|
|
+ })
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return b, nil
|
|
|
+}
|
|
|
+
|
|
|
+// UnmarshalJSON implements the Unmarshaller interface
|
|
|
+// as defined by the standard JSON package.
|
|
|
+func (c *Client) UnmarshalJSON(b []byte) error {
|
|
|
+ temp := struct {
|
|
|
+ Config Config `json: "config"`
|
|
|
+ Cluster *Cluster `json: "cluster"`
|
|
|
+ }{}
|
|
|
+ err := json.Unmarshal(b, &temp)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ c.cluster = temp.Cluster
|
|
|
+ c.config = temp.Config
|
|
|
+ return nil
|
|
|
+}
|