Browse Source

many: marshal message

Blake Mizerany 11 years ago
parent
commit
ddd219f297
10 changed files with 818 additions and 327 deletions
  1. 11 0
      elog/elog.go
  2. 30 1
      etcdserver2/etcdhttp/http.go
  3. 0 264
      raft/entry.pb.go
  4. 0 16
      raft/entry.proto
  5. 1 1
      raft/log.go
  6. 734 0
      raft/protos.pb.go
  7. 35 0
      raft/protos.proto
  8. 2 23
      raft/raft.go
  9. 5 5
      raft/raft_test.go
  10. 0 17
      raft/snapshot.go

+ 11 - 0
elog/elog.go

@@ -0,0 +1,11 @@
+package elog
+
+import (
+	. "log"
+	"runtime"
+)
+
+func TODO() {
+	_, file, line, _ := runtime.Caller(1)
+	Printf("TODO: %s:%d", file, line)
+}

+ 30 - 1
etcdserver2/etcdhttp/http.go

@@ -4,11 +4,15 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"io/ioutil"
 	"net/http"
 	"net/http"
+	"strings"
 	"time"
 	"time"
 
 
 	"code.google.com/p/go.net/context"
 	"code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/elog"
 	etcdserver "github.com/coreos/etcd/etcdserver2"
 	etcdserver "github.com/coreos/etcd/etcdserver2"
+	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
 
 
@@ -32,6 +36,17 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	defer cancel()
 
 
+	switch {
+	case strings.HasPrefix(r.URL.Path, "/raft"):
+		h.serveRaft(ctx, w, r)
+	case strings.HasPrefix(r.URL.Path, "/keys"):
+		h.serveKeys(ctx, w, r)
+	default:
+		http.NotFound(w, r)
+	}
+}
+
+func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) {
 	rr, err := parseRequest(r)
 	rr, err := parseRequest(r)
 	if err != nil {
 	if err != nil {
 		http.Error(w, err.Error(), 400)
 		http.Error(w, err.Error(), 400)
@@ -49,6 +64,20 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 	}
 }
 }
 
 
+func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+	b, err := ioutil.ReadAll(r.Body)
+	if err != nil {
+		elog.TODO()
+	}
+	var m raft.Message
+	if err := m.Unmarshal(b); err != nil {
+		elog.TODO()
+	}
+	if err := h.Server.Node.Step(ctx, m); err != nil {
+		elog.TODO()
+	}
+}
+
 func parseRequest(r *http.Request) (etcdserver.Request, error) {
 func parseRequest(r *http.Request) (etcdserver.Request, error) {
 	return etcdserver.Request{}, nil
 	return etcdserver.Request{}, nil
 }
 }
@@ -94,7 +123,7 @@ func waitForEvent(ctx context.Context, w http.ResponseWriter, wa *store.Watcher)
 	case ev := <-wa.EventChan:
 	case ev := <-wa.EventChan:
 		return ev, nil
 		return ev, nil
 	case <-nch:
 	case <-nch:
-		// TODO: log something?
+		elog.TODO()
 		return nil, errClosed
 		return nil, errClosed
 	case <-ctx.Done():
 	case <-ctx.Done():
 		return nil, ctx.Err()
 		return nil, ctx.Err()

+ 0 - 264
raft/entry.pb.go

@@ -1,264 +0,0 @@
-// Code generated by protoc-gen-gogo.
-// source: entry.proto
-// DO NOT EDIT!
-
-/*
-	Package raft is a generated protocol buffer package.
-
-	It is generated from these files:
-		entry.proto
-		state.proto
-
-	It has these top-level messages:
-		Entry
-*/
-package raft
-
-import proto "code.google.com/p/gogoprotobuf/proto"
-import json "encoding/json"
-import math "math"
-
-// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
-
-import io "io"
-import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
-
-// Reference proto, json, and math imports to suppress error if they are not otherwise used.
-var _ = proto.Marshal
-var _ = &json.SyntaxError{}
-var _ = math.Inf
-
-type Entry struct {
-	Type             int64  `protobuf:"varint,1,req,name=type" json:"type"`
-	Term             int64  `protobuf:"varint,2,req,name=term" json:"term"`
-	Index            int64  `protobuf:"varint,3,req,name=index" json:"index"`
-	Data             []byte `protobuf:"bytes,4,opt,name=data" json:"data,omitempty"`
-	Id               int64  `protobuf:"varint,5,req,name=id" json:"id"`
-	XXX_unrecognized []byte `json:"-"`
-}
-
-func (m *Entry) Reset()         { *m = Entry{} }
-func (m *Entry) String() string { return proto.CompactTextString(m) }
-func (*Entry) ProtoMessage()    {}
-
-func init() {
-}
-func (m *Entry) Unmarshal(data []byte) error {
-	l := len(data)
-	index := 0
-	for index < l {
-		var wire uint64
-		for shift := uint(0); ; shift += 7 {
-			if index >= l {
-				return io.ErrUnexpectedEOF
-			}
-			b := data[index]
-			index++
-			wire |= (uint64(b) & 0x7F) << shift
-			if b < 0x80 {
-				break
-			}
-		}
-		fieldNum := int32(wire >> 3)
-		wireType := int(wire & 0x7)
-		switch fieldNum {
-		case 1:
-			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				m.Type |= (int64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-		case 2:
-			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				m.Term |= (int64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-		case 3:
-			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				m.Index |= (int64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-		case 4:
-			if wireType != 2 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			var byteLen int
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				byteLen |= (int(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-			postIndex := index + byteLen
-			if postIndex > l {
-				return io.ErrUnexpectedEOF
-			}
-			m.Data = append(m.Data, data[index:postIndex]...)
-			index = postIndex
-		case 5:
-			if wireType != 0 {
-				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
-			}
-			for shift := uint(0); ; shift += 7 {
-				if index >= l {
-					return io.ErrUnexpectedEOF
-				}
-				b := data[index]
-				index++
-				m.Id |= (int64(b) & 0x7F) << shift
-				if b < 0x80 {
-					break
-				}
-			}
-		default:
-			var sizeOfWire int
-			for {
-				sizeOfWire++
-				wire >>= 7
-				if wire == 0 {
-					break
-				}
-			}
-			index -= sizeOfWire
-			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
-			if err != nil {
-				return err
-			}
-			if (index + skippy) > l {
-				return io.ErrUnexpectedEOF
-			}
-			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
-			index += skippy
-		}
-	}
-	return nil
-}
-func (m *Entry) Size() (n int) {
-	var l int
-	_ = l
-	n += 1 + sovEntry(uint64(m.Type))
-	n += 1 + sovEntry(uint64(m.Term))
-	n += 1 + sovEntry(uint64(m.Index))
-	if m.Data != nil {
-		l = len(m.Data)
-		n += 1 + l + sovEntry(uint64(l))
-	}
-	n += 1 + sovEntry(uint64(m.Id))
-	if m.XXX_unrecognized != nil {
-		n += len(m.XXX_unrecognized)
-	}
-	return n
-}
-
-func sovEntry(x uint64) (n int) {
-	for {
-		n++
-		x >>= 7
-		if x == 0 {
-			break
-		}
-	}
-	return n
-}
-func sozEntry(x uint64) (n int) {
-	return sovEntry(uint64((x << 1) ^ uint64((int64(x) >> 63))))
-}
-func (m *Entry) Marshal() (data []byte, err error) {
-	size := m.Size()
-	data = make([]byte, size)
-	n, err := m.MarshalTo(data)
-	if err != nil {
-		return nil, err
-	}
-	return data[:n], nil
-}
-
-func (m *Entry) MarshalTo(data []byte) (n int, err error) {
-	var i int
-	_ = i
-	var l int
-	_ = l
-	data[i] = 0x8
-	i++
-	i = encodeVarintEntry(data, i, uint64(m.Type))
-	data[i] = 0x10
-	i++
-	i = encodeVarintEntry(data, i, uint64(m.Term))
-	data[i] = 0x18
-	i++
-	i = encodeVarintEntry(data, i, uint64(m.Index))
-	if m.Data != nil {
-		data[i] = 0x22
-		i++
-		i = encodeVarintEntry(data, i, uint64(len(m.Data)))
-		i += copy(data[i:], m.Data)
-	}
-	data[i] = 0x28
-	i++
-	i = encodeVarintEntry(data, i, uint64(m.Id))
-	if m.XXX_unrecognized != nil {
-		i += copy(data[i:], m.XXX_unrecognized)
-	}
-	return i, nil
-}
-func encodeFixed64Entry(data []byte, offset int, v uint64) int {
-	data[offset] = uint8(v)
-	data[offset+1] = uint8(v >> 8)
-	data[offset+2] = uint8(v >> 16)
-	data[offset+3] = uint8(v >> 24)
-	data[offset+4] = uint8(v >> 32)
-	data[offset+5] = uint8(v >> 40)
-	data[offset+6] = uint8(v >> 48)
-	data[offset+7] = uint8(v >> 56)
-	return offset + 8
-}
-func encodeFixed32Entry(data []byte, offset int, v uint32) int {
-	data[offset] = uint8(v)
-	data[offset+1] = uint8(v >> 8)
-	data[offset+2] = uint8(v >> 16)
-	data[offset+3] = uint8(v >> 24)
-	return offset + 4
-}
-func encodeVarintEntry(data []byte, offset int, v uint64) int {
-	for v >= 1<<7 {
-		data[offset] = uint8(v&0x7f | 0x80)
-		v >>= 7
-		offset++
-	}
-	data[offset] = uint8(v)
-	return offset + 1
-}

+ 0 - 16
raft/entry.proto

@@ -1,16 +0,0 @@
-package raft;
-
-import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
-
-option (gogoproto.marshaler_all) = true;
-option (gogoproto.sizer_all) = true;
-option (gogoproto.unmarshaler_all) = true;
-option (gogoproto.goproto_getters_all) = false;
-
-message Entry {
-	required int64 type  = 1 [(gogoproto.nullable) = false];
-	required int64 term  = 2 [(gogoproto.nullable) = false];
-	required int64 index = 3 [(gogoproto.nullable) = false];
-	optional bytes data  = 4;
-	required int64 id = 5 [(gogoproto.nullable) = false];
-}

+ 1 - 1
raft/log.go

@@ -176,7 +176,7 @@ func (l *raftLog) compact(i int64) int64 {
 }
 }
 
 
 func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) {
 func (l *raftLog) snap(d []byte, index, term int64, nodes []int64) {
-	l.snapshot = Snapshot{d, nodes, index, term}
+	l.snapshot = Snapshot{d, nodes, index, term, nil}
 }
 }
 
 
 func (l *raftLog) shouldCompact() bool {
 func (l *raftLog) shouldCompact() bool {

+ 734 - 0
raft/protos.pb.go

@@ -0,0 +1,734 @@
+// Code generated by protoc-gen-gogo.
+// source: protos.proto
+// DO NOT EDIT!
+
+/*
+	Package raft is a generated protocol buffer package.
+
+	It is generated from these files:
+		protos.proto
+		state.proto
+
+	It has these top-level messages:
+		Entry
+		Snapshot
+		Message
+*/
+package raft
+
+import proto "code.google.com/p/gogoprotobuf/proto"
+import json "encoding/json"
+import math "math"
+
+// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
+
+import io "io"
+import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
+
+// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+var _ = proto.Marshal
+var _ = &json.SyntaxError{}
+var _ = math.Inf
+
+type Entry struct {
+	Type             int64  `protobuf:"varint,1,req,name=type" json:"type"`
+	Term             int64  `protobuf:"varint,2,req,name=term" json:"term"`
+	Index            int64  `protobuf:"varint,3,req,name=index" json:"index"`
+	Data             []byte `protobuf:"bytes,4,opt,name=data" json:"data"`
+	Id               int64  `protobuf:"varint,5,req,name=id" json:"id"`
+	XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Entry) Reset()         { *m = Entry{} }
+func (m *Entry) String() string { return proto.CompactTextString(m) }
+func (*Entry) ProtoMessage()    {}
+
+type Snapshot struct {
+	Data             []byte  `protobuf:"bytes,1,req,name=data" json:"data"`
+	Nodes            []int64 `protobuf:"varint,2,rep,name=nodes" json:"nodes"`
+	Index            int64   `protobuf:"varint,3,req,name=index" json:"index"`
+	Term             int64   `protobuf:"varint,4,req,name=term" json:"term"`
+	XXX_unrecognized []byte  `json:"-"`
+}
+
+func (m *Snapshot) Reset()         { *m = Snapshot{} }
+func (m *Snapshot) String() string { return proto.CompactTextString(m) }
+func (*Snapshot) ProtoMessage()    {}
+
+type Message struct {
+	Type             int64    `protobuf:"varint,1,req,name=type" json:"type"`
+	To               int64    `protobuf:"varint,2,req,name=to" json:"to"`
+	From             int64    `protobuf:"varint,3,req,name=from" json:"from"`
+	Term             int64    `protobuf:"varint,4,req,name=term" json:"term"`
+	LogTerm          int64    `protobuf:"varint,5,req,name=logTerm" json:"logTerm"`
+	Index            int64    `protobuf:"varint,6,req,name=index" json:"index"`
+	Entries          []Entry  `protobuf:"bytes,7,rep,name=entries" json:"entries"`
+	Commit           int64    `protobuf:"varint,8,req,name=commit" json:"commit"`
+	Snapshot         Snapshot `protobuf:"bytes,9,req,name=snapshot" json:"snapshot"`
+	XXX_unrecognized []byte   `json:"-"`
+}
+
+func (m *Message) Reset()         { *m = Message{} }
+func (m *Message) String() string { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage()    {}
+
+func init() {
+}
+func (m *Entry) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Type |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Term |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Index |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 4:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Data = append(m.Data, data[index:postIndex]...)
+			index = postIndex
+		case 5:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Id |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *Snapshot) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Data = append(m.Data, data[index:postIndex]...)
+			index = postIndex
+		case 2:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v int64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Nodes = append(m.Nodes, v)
+		case 3:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Index |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 4:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Term |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *Message) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Type |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.To |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.From |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 4:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Term |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 5:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.LogTerm |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 6:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Index |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 7:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Entries = append(m.Entries, Entry{})
+			m.Entries[len(m.Entries)-1].Unmarshal(data[index:postIndex])
+			index = postIndex
+		case 8:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Commit |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 9:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if err := m.Snapshot.Unmarshal(data[index:postIndex]); err != nil {
+				return err
+			}
+			index = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *Entry) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovProtos(uint64(m.Type))
+	n += 1 + sovProtos(uint64(m.Term))
+	n += 1 + sovProtos(uint64(m.Index))
+	l = len(m.Data)
+	n += 1 + l + sovProtos(uint64(l))
+	n += 1 + sovProtos(uint64(m.Id))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+func (m *Snapshot) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Data)
+	n += 1 + l + sovProtos(uint64(l))
+	if len(m.Nodes) > 0 {
+		for _, e := range m.Nodes {
+			n += 1 + sovProtos(uint64(e))
+		}
+	}
+	n += 1 + sovProtos(uint64(m.Index))
+	n += 1 + sovProtos(uint64(m.Term))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+func (m *Message) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovProtos(uint64(m.Type))
+	n += 1 + sovProtos(uint64(m.To))
+	n += 1 + sovProtos(uint64(m.From))
+	n += 1 + sovProtos(uint64(m.Term))
+	n += 1 + sovProtos(uint64(m.LogTerm))
+	n += 1 + sovProtos(uint64(m.Index))
+	if len(m.Entries) > 0 {
+		for _, e := range m.Entries {
+			l = e.Size()
+			n += 1 + l + sovProtos(uint64(l))
+		}
+	}
+	n += 1 + sovProtos(uint64(m.Commit))
+	l = m.Snapshot.Size()
+	n += 1 + l + sovProtos(uint64(l))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func sovProtos(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozProtos(x uint64) (n int) {
+	return sovProtos(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *Entry) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Entry) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Type))
+	data[i] = 0x10
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Term))
+	data[i] = 0x18
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Index))
+	data[i] = 0x22
+	i++
+	i = encodeVarintProtos(data, i, uint64(len(m.Data)))
+	i += copy(data[i:], m.Data)
+	data[i] = 0x28
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Id))
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+func (m *Snapshot) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Snapshot) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0xa
+	i++
+	i = encodeVarintProtos(data, i, uint64(len(m.Data)))
+	i += copy(data[i:], m.Data)
+	if len(m.Nodes) > 0 {
+		for _, num := range m.Nodes {
+			data[i] = 0x10
+			i++
+			i = encodeVarintProtos(data, i, uint64(num))
+		}
+	}
+	data[i] = 0x18
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Index))
+	data[i] = 0x20
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Term))
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+func (m *Message) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Message) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Type))
+	data[i] = 0x10
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.To))
+	data[i] = 0x18
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.From))
+	data[i] = 0x20
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Term))
+	data[i] = 0x28
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.LogTerm))
+	data[i] = 0x30
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Index))
+	if len(m.Entries) > 0 {
+		for _, msg := range m.Entries {
+			data[i] = 0x3a
+			i++
+			i = encodeVarintProtos(data, i, uint64(msg.Size()))
+			n, err := msg.MarshalTo(data[i:])
+			if err != nil {
+				return 0, err
+			}
+			i += n
+		}
+	}
+	data[i] = 0x40
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Commit))
+	data[i] = 0x4a
+	i++
+	i = encodeVarintProtos(data, i, uint64(m.Snapshot.Size()))
+	n1, err := m.Snapshot.MarshalTo(data[i:])
+	if err != nil {
+		return 0, err
+	}
+	i += n1
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+func encodeFixed64Protos(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Protos(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintProtos(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}

+ 35 - 0
raft/protos.proto

@@ -0,0 +1,35 @@
+package raft;
+
+import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+
+message Entry {
+	required int64 type  = 1 [(gogoproto.nullable) = false];
+	required int64 term  = 2 [(gogoproto.nullable) = false];
+	required int64 index = 3 [(gogoproto.nullable) = false];
+	optional bytes data  = 4 [(gogoproto.nullable) = false];
+	required int64 id    = 5 [(gogoproto.nullable) = false];
+}
+
+message Snapshot {
+	required bytes data  = 1 [(gogoproto.nullable) = false];
+	repeated int64 nodes = 2 [(gogoproto.nullable) = false];
+	required int64 index = 3 [(gogoproto.nullable) = false];
+	required int64 term  = 4 [(gogoproto.nullable) = false];
+}
+
+message Message {
+	required int64 type        = 1 [(gogoproto.nullable) = false];
+	required int64 to          = 2 [(gogoproto.nullable) = false];
+	required int64 from        = 3 [(gogoproto.nullable) = false];
+	required int64 term        = 4 [(gogoproto.nullable) = false];
+	required int64 logTerm     = 5 [(gogoproto.nullable) = false];
+	required int64 index       = 6 [(gogoproto.nullable) = false];
+	repeated Entry entries     = 7 [(gogoproto.nullable) = false];
+	required int64 commit      = 8 [(gogoproto.nullable) = false];
+	required Snapshot snapshot = 9 [(gogoproto.nullable) = false];
+}

+ 2 - 23
raft/raft.go

@@ -11,7 +11,7 @@ const none = -1
 type messageType int64
 type messageType int64
 
 
 const (
 const (
-	msgHup messageType = iota
+	msgHup int64 = iota
 	msgBeat
 	msgBeat
 	msgProp
 	msgProp
 	msgApp
 	msgApp
@@ -64,23 +64,6 @@ func (st stateType) String() string {
 	return stmap[int64(st)]
 	return stmap[int64(st)]
 }
 }
 
 
-type Message struct {
-	Type     messageType
-	To       int64
-	From     int64
-	Term     int64
-	LogTerm  int64
-	Index    int64
-	Entries  []Entry
-	Commit   int64
-	Snapshot Snapshot
-}
-
-func (m Message) String() string {
-	return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d",
-		m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries))
-}
-
 type progress struct {
 type progress struct {
 	match, next int64
 	match, next int64
 }
 }
@@ -489,7 +472,7 @@ func (r *raft) restore(s Snapshot) bool {
 
 
 func (r *raft) needSnapshot(i int64) bool {
 func (r *raft) needSnapshot(i int64) bool {
 	if i < r.raftLog.offset {
 	if i < r.raftLog.offset {
-		if r.raftLog.snapshot.IsEmpty() {
+		if r.raftLog.snapshot.Term == 0 {
 			panic("need non-empty snapshot")
 			panic("need non-empty snapshot")
 		}
 		}
 		return true
 		return true
@@ -526,7 +509,3 @@ func (r *raft) loadState(state State) {
 	r.Term = state.Term
 	r.Term = state.Term
 	r.Vote = state.Vote
 	r.Vote = state.Vote
 }
 }
-
-func (s *State) IsEmpty() bool {
-	return s.Term == 0
-}

+ 5 - 5
raft/raft_test.go

@@ -685,7 +685,7 @@ func TestAllServerStepdown(t *testing.T) {
 		{stateLeader, stateFollower, 3, 2},
 		{stateLeader, stateFollower, 3, 2},
 	}
 	}
 
 
-	tmsgTypes := [...]messageType{msgVote, msgApp}
+	tmsgTypes := [...]int64{msgVote, msgApp}
 	tterm := int64(3)
 	tterm := int64(3)
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -920,7 +920,7 @@ func ents(terms ...int64) *raft {
 type network struct {
 type network struct {
 	peers   map[int64]Interface
 	peers   map[int64]Interface
 	dropm   map[connem]float64
 	dropm   map[connem]float64
-	ignorem map[messageType]bool
+	ignorem map[int64]bool
 }
 }
 
 
 // newNetwork initializes a network from peers.
 // newNetwork initializes a network from peers.
@@ -957,7 +957,7 @@ func newNetwork(peers ...Interface) *network {
 	return &network{
 	return &network{
 		peers:   npeers,
 		peers:   npeers,
 		dropm:   make(map[connem]float64),
 		dropm:   make(map[connem]float64),
-		ignorem: make(map[messageType]bool),
+		ignorem: make(map[int64]bool),
 	}
 	}
 }
 }
 
 
@@ -989,13 +989,13 @@ func (nw *network) isolate(id int64) {
 	}
 	}
 }
 }
 
 
-func (nw *network) ignore(t messageType) {
+func (nw *network) ignore(t int64) {
 	nw.ignorem[t] = true
 	nw.ignorem[t] = true
 }
 }
 
 
 func (nw *network) recover() {
 func (nw *network) recover() {
 	nw.dropm = make(map[connem]float64)
 	nw.dropm = make(map[connem]float64)
-	nw.ignorem = make(map[messageType]bool)
+	nw.ignorem = make(map[int64]bool)
 }
 }
 
 
 func (nw *network) filter(msgs []Message) []Message {
 func (nw *network) filter(msgs []Message) []Message {

+ 0 - 17
raft/snapshot.go

@@ -1,17 +0,0 @@
-package raft
-
-var emptySnapshot = Snapshot{}
-
-type Snapshot struct {
-	Data []byte
-	// the configuration
-	Nodes []int64
-	// the index at which the snapshot was taken.
-	Index int64
-	// the log term of the index
-	Term int64
-}
-
-func (s Snapshot) IsEmpty() bool {
-	return s.Term == 0
-}