Browse Source

fix raft api

Xiang Li 12 years ago
parent
commit
f813017f1b
3 changed files with 19 additions and 18 deletions
  1. 1 1
      command.go
  2. 14 13
      third_party/github.com/coreos/go-etcd/etcd/client.go
  3. 4 4
      transporter.go

+ 1 - 1
command.go

@@ -155,7 +155,7 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	addNameToURL(c.Name, c.RaftURL, c.EtcdURL)
 
 	// add peer in raft
-	err := raftServer.AddPeer(c.Name)
+	err := raftServer.AddPeer(c.Name, "")
 
 	// add machine in etcd storage
 	key := path.Join("_etcd/machines", c.Name)

+ 14 - 13
third_party/github.com/coreos/go-etcd/etcd/client.go

@@ -6,6 +6,7 @@ import (
 	"io/ioutil"
 	"net"
 	"net/http"
+	"net/url"
 	"path"
 	"strings"
 	"time"
@@ -39,10 +40,10 @@ func NewClient() *Client {
 
 	// default leader and machines
 	cluster := Cluster{
-		Leader:   "0.0.0.0:4001",
+		Leader:   "http://0.0.0.0:4001",
 		Machines: make([]string, 1),
 	}
-	cluster.Machines[0] = "0.0.0.0:4001"
+	cluster.Machines[0] = "http://0.0.0.0:4001"
 
 	config := Config{
 		// default use http
@@ -145,9 +146,9 @@ func (c *Client) internalSyncCluster(machines []string) bool {
 
 // serverName should contain both hostName and port
 func (c *Client) createHttpPath(serverName string, _path string) string {
-	httpPath := path.Join(serverName, _path)
-	httpPath = c.config.Scheme + "://" + httpPath
-	return httpPath
+	u, _ := url.Parse(serverName)
+	u.Path = path.Join(u.Path, "/", _path)
+	return u.String()
 }
 
 // Dial with timeout.
@@ -156,22 +157,21 @@ func dialTimeout(network, addr string) (net.Conn, error) {
 }
 
 func (c *Client) getHttpPath(s ...string) string {
-	httpPath := path.Join(c.cluster.Leader, version)
+	u, _ := url.Parse(c.cluster.Leader)
+
+	u.Path = path.Join(u.Path, "/", version)
 
 	for _, seg := range s {
-		httpPath = path.Join(httpPath, seg)
+		u.Path = path.Join(u.Path, seg)
 	}
 
-	httpPath = c.config.Scheme + "://" + httpPath
-	return httpPath
+	return u.String()
 }
 
 func (c *Client) updateLeader(httpPath string) {
-	// httpPath http://127.0.0.1:4001/v1...
-	leader := strings.Split(httpPath, "://")[1]
-	// we want to have 127.0.0.1:4001
+	u, _ := url.Parse(httpPath)
+	leader := u.Host
 
-	leader = strings.Split(leader, "/")[0]
 	logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
 	c.cluster.Leader = leader
 }
@@ -188,6 +188,7 @@ func (c *Client) sendRequest(method string, _path string, body string) (*http.Re
 	for {
 
 		httpPath := c.getHttpPath(_path)
+
 		logger.Debug("send.request.to ", httpPath)
 		if body == "" {
 

+ 4 - 4
transporter.go

@@ -47,7 +47,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	u, _ := nameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name)
 	debugf("Send LogEntries to %s ", u)
 
 	resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
@@ -74,7 +74,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	u, _ := nameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name)
 	debugf("Send Vote to %s", u)
 
 	resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
@@ -100,7 +100,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	u, _ := nameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name)
 	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
 		req.LastTerm, req.LastIndex)
 
@@ -128,7 +128,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	u, _ := nameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name)
 	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
 		req.LastTerm, req.LastIndex)