Blake Mizerany 11 years ago
parent
commit
fc9fc3f888
1 changed files with 56 additions and 2 deletions
  1. 56 2
      etcdserver2/etcdhttp/http.go

+ 56 - 2
etcdserver2/etcdhttp/http.go

@@ -1,6 +1,7 @@
 package etcdhttp
 package etcdhttp
 
 
 import (
 import (
+	"bytes"
 	"encoding/binary"
 	"encoding/binary"
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
@@ -13,7 +14,8 @@ import (
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
-	"crypto/rand"
+	crand "crypto/rand"
+	"math/rand"
 	"code.google.com/p/go.net/context"
 	"code.google.com/p/go.net/context"
 	"github.com/coreos/etcd/elog"
 	"github.com/coreos/etcd/elog"
 	etcdserver "github.com/coreos/etcd/etcdserver2"
 	etcdserver "github.com/coreos/etcd/etcdserver2"
@@ -22,10 +24,62 @@ import (
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
 
 
+type Peers map[int64][]string
+
+func (ps Peers) Pick(id int64) string {
+	addrs := ps[id]
+	return addrs[rand.Intn(len(addrs))]
+}
+
 var errClosed = errors.New("etcdhttp: client closed connection")
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 
 const DefaultTimeout = 500 * time.Millisecond
 const DefaultTimeout = 500 * time.Millisecond
 
 
+func Sender(prefix string, p Peers) func(msgs []raftpb.Message) {
+	return func(msgs []raftpb.Message) {
+		for _, m := range msgs {
+			// TODO: create workers that deal with message sending
+			// concurrently as to not block progress
+			for {
+				url := p.Pick(m.To)
+				if url == "" {
+					// TODO: unknown peer id.. what do we do? I
+					// don't think his should ever happen, need to
+					// look into this further.
+					elog.TODO()
+				}
+				// TODO: don't block. we should be able to have 1000s
+				// of messages out at a time.
+				data, err := m.Marshal()
+				if err != nil {
+					elog.TODO()
+					break // drop bad message
+				}
+				if httpPost(url+prefix, data) {
+					break // success
+				}
+
+				// TODO: backoff
+			}
+		}
+	}
+}
+
+func httpPost(url string, data []byte) bool {
+	resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
+	if err != nil {
+		elog.TODO()
+		return false
+	}
+	if resp.StatusCode != 200 {
+		elog.TODO()
+		return false
+	}
+	return true
+}
+
+// Handler implements the http.Handler interface and serves etcd client and
+// raft communication.
 type Handler struct {
 type Handler struct {
 	Timeout time.Duration
 	Timeout time.Duration
 	Server  *etcdserver.Server
 	Server  *etcdserver.Server
@@ -90,7 +144,7 @@ func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.R
 func genId() int64 {
 func genId() int64 {
 	for {
 	for {
 		b := make([]byte, 8)
 		b := make([]byte, 8)
-		if _, err := io.ReadFull(rand.Reader, b); err != nil {
+		if _, err := io.ReadFull(crand.Reader, b); err != nil {
 			panic(err) // really bad stuff happened
 			panic(err) // really bad stuff happened
 		}
 		}
 		n := int64(binary.BigEndian.Uint64(b))
 		n := int64(binary.BigEndian.Uint64(b))