Browse Source

etcdserver: wip

Blake Mizerany 11 years ago
parent
commit
edac2e909b

+ 20 - 0
etcdserver2/example_test.go

@@ -0,0 +1,20 @@
+package etcdserver
+
+// func Example_Server() {
+// 	flag.Parse() // fills cfg
+//
+// 	ss, w, err := LoadState(*statedir)
+// 	if err != nil {
+// 		log.Println("main: unable to load state - %s", err)
+// 	}
+//
+// 	s := Server{
+// 		Snapshot: ss,
+// 		WalFile:  w,
+// 		Config:   cfg,
+// 	}
+//
+// 	go func() {
+// 		log.Fatal(http.ListenAndServe(*laddr, s))
+// 	}()
+// }

+ 1 - 0
etcdserver2/genproto.sh

@@ -0,0 +1 @@
+exec protoc --gogo_out=. -I=.:$GOPATH/src/code.google.com/p/gogoprotobuf/protobuf:$GOPATH/src *.proto

+ 380 - 0
etcdserver2/request.pb.go

@@ -0,0 +1,380 @@
+// Code generated by protoc-gen-gogo.
+// source: request.proto
+// DO NOT EDIT!
+
+/*
+	Package etcdserver is a generated protocol buffer package.
+
+	It is generated from these files:
+		request.proto
+
+	It has these top-level messages:
+		Request
+*/
+package etcdserver
+
+import proto "code.google.com/p/gogoprotobuf/proto"
+import json "encoding/json"
+import math "math"
+
+// discarding unused import gogoproto "code.google.com/p/gogoprotobuf/gogoproto/gogo.pb"
+
+import io "io"
+import code_google_com_p_gogoprotobuf_proto "code.google.com/p/gogoprotobuf/proto"
+
+// Reference proto, json, and math imports to suppress error if they are not otherwise used.
+var _ = proto.Marshal
+var _ = &json.SyntaxError{}
+var _ = math.Inf
+
+type Request struct {
+	Id               int64  `protobuf:"varint,1,req,name=id" json:"id"`
+	Method           string `protobuf:"bytes,2,req,name=method" json:"method"`
+	Key              string `protobuf:"bytes,3,req,name=key" json:"key"`
+	Val              string `protobuf:"bytes,4,req,name=val" json:"val"`
+	Dir              bool   `protobuf:"varint,5,req,name=dir" json:"dir"`
+	PrevValue        string `protobuf:"bytes,6,req,name=prevValue" json:"prevValue"`
+	PrevIndex        int64  `protobuf:"varint,7,req,name=prevIndex" json:"prevIndex"`
+	PrevExists       bool   `protobuf:"varint,8,req,name=prevExists" json:"prevExists"`
+	Expiration       int64  `protobuf:"varint,9,req,name=expiration" json:"expiration"`
+	XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Request) Reset()         { *m = Request{} }
+func (m *Request) String() string { return proto.CompactTextString(m) }
+func (*Request) ProtoMessage()    {}
+
+func init() {
+}
+func (m *Request) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Id |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + int(stringLen)
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Method = string(data[index:postIndex])
+			index = postIndex
+		case 3:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + int(stringLen)
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Key = string(data[index:postIndex])
+			index = postIndex
+		case 4:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + int(stringLen)
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Val = string(data[index:postIndex])
+			index = postIndex
+		case 5:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Dir = bool(v != 0)
+		case 6:
+			if wireType != 2 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + int(stringLen)
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.PrevValue = string(data[index:postIndex])
+			index = postIndex
+		case 7:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.PrevIndex |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 8:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.PrevExists = bool(v != 0)
+		case 9:
+			if wireType != 0 {
+				return code_google_com_p_gogoprotobuf_proto.ErrWrongType
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Expiration |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := code_google_com_p_gogoprotobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *Request) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovRequest(uint64(m.Id))
+	l = len(m.Method)
+	n += 1 + l + sovRequest(uint64(l))
+	l = len(m.Key)
+	n += 1 + l + sovRequest(uint64(l))
+	l = len(m.Val)
+	n += 1 + l + sovRequest(uint64(l))
+	n += 2
+	l = len(m.PrevValue)
+	n += 1 + l + sovRequest(uint64(l))
+	n += 1 + sovRequest(uint64(m.PrevIndex))
+	n += 2
+	n += 1 + sovRequest(uint64(m.Expiration))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func sovRequest(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozRequest(x uint64) (n int) {
+	return sovRequest(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *Request) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Request) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintRequest(data, i, uint64(m.Id))
+	data[i] = 0x12
+	i++
+	i = encodeVarintRequest(data, i, uint64(len(m.Method)))
+	i += copy(data[i:], m.Method)
+	data[i] = 0x1a
+	i++
+	i = encodeVarintRequest(data, i, uint64(len(m.Key)))
+	i += copy(data[i:], m.Key)
+	data[i] = 0x22
+	i++
+	i = encodeVarintRequest(data, i, uint64(len(m.Val)))
+	i += copy(data[i:], m.Val)
+	data[i] = 0x28
+	i++
+	if m.Dir {
+		data[i] = 1
+	} else {
+		data[i] = 0
+	}
+	i++
+	data[i] = 0x32
+	i++
+	i = encodeVarintRequest(data, i, uint64(len(m.PrevValue)))
+	i += copy(data[i:], m.PrevValue)
+	data[i] = 0x38
+	i++
+	i = encodeVarintRequest(data, i, uint64(m.PrevIndex))
+	data[i] = 0x40
+	i++
+	if m.PrevExists {
+		data[i] = 1
+	} else {
+		data[i] = 0
+	}
+	i++
+	data[i] = 0x48
+	i++
+	i = encodeVarintRequest(data, i, uint64(m.Expiration))
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+func encodeFixed64Request(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Request(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintRequest(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}

+ 20 - 0
etcdserver2/request.proto

@@ -0,0 +1,20 @@
+package etcdserver;
+
+import "code.google.com/p/gogoprotobuf/gogoproto/gogo.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+
+message Request {
+	required int64  id         = 1 [(gogoproto.nullable) = false];     
+	required string method     = 2 [(gogoproto.nullable) = false];
+	required string key        = 3 [(gogoproto.nullable) = false];
+	required string val        = 4 [(gogoproto.nullable) = false];
+	required bool   dir        = 5 [(gogoproto.nullable) = false];
+	required string prevValue  = 6 [(gogoproto.nullable) = false];
+	required int64  prevIndex  = 7 [(gogoproto.nullable) = false];
+	required bool   prevExists = 8 [(gogoproto.nullable) = false];
+	required int64  expiration = 9 [(gogoproto.nullable) = false];
+}

+ 55 - 0
etcdserver2/server.go

@@ -0,0 +1,55 @@
+package etcdserver
+
+import (
+	"code.google.com/p/go.net/context"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/wait"
+)
+
+type Response struct {
+	err error
+}
+
+type Server struct {
+	n raft.Node
+	w wait.List
+}
+
+func (s *Server) Run(ctx context.Context) {
+	for {
+		st, ents, cents, msgs, err := s.n.ReadState(ctx)
+		if err != nil {
+			do something here
+		}
+		save state to wal
+		go send messages
+		go func() {
+			for e in cents {
+				req = decode e.Data
+				apply req to state machine
+				build Response from result of apply
+				trigger wait with (r.Id, resp)
+			}
+		}()
+	}
+}
+
+func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
+	if r.Id == 0 {
+		panic("r.Id cannot be 0")
+	}
+	data, err := r.Marshal()
+	if err != nil {
+		return Response{}, err
+	}
+	ch := s.w.Register(r.Id)
+	s.n.Propose(ctx, data)
+	select {
+	case x := <-ch:
+		resp := x.(Response)
+		return resp, resp.err
+	case <-ctx.Done():
+		s.w.Trigger(r.Id, nil) // GC wait
+		return Response{}, ctx.Err()
+	}
+}

+ 5 - 0
etcdserver2/server_test.go

@@ -0,0 +1,5 @@
+package etcdserver
+
+import "testing"
+
+func TestServer(t *testing.T) {}

+ 2 - 2
raft/node.go

@@ -29,8 +29,8 @@ type Node struct {
 	tickc  chan struct{}
 	tickc  chan struct{}
 }
 }
 
 
-func Start(ctx context.Context, id int64, peers []int64) *Node {
-	n := &Node{
+func Start(ctx context.Context, id int64, peers []int64) Node {
+	n := Node{
 		ctx:    ctx,
 		ctx:    ctx,
 		propc:  make(chan Message),
 		propc:  make(chan Message),
 		recvc:  make(chan Message),
 		recvc:  make(chan Message),

+ 5 - 5
wait/wait.go

@@ -2,16 +2,16 @@ package wait
 
 
 import "sync"
 import "sync"
 
 
-type WaitList struct {
+type List struct {
 	l sync.Mutex
 	l sync.Mutex
 	m map[int64]chan interface{}
 	m map[int64]chan interface{}
 }
 }
 
 
-func New() WaitList {
-	return WaitList{m: make(map[int64]chan interface{})}
+func New() List {
+	return List{m: make(map[int64]chan interface{})}
 }
 }
 
 
-func (w WaitList) Register(id int64) <-chan interface{} {
+func (w List) Register(id int64) <-chan interface{} {
 	w.l.Lock()
 	w.l.Lock()
 	defer w.l.Unlock()
 	defer w.l.Unlock()
 	ch := w.m[id]
 	ch := w.m[id]
@@ -22,7 +22,7 @@ func (w WaitList) Register(id int64) <-chan interface{} {
 	return ch
 	return ch
 }
 }
 
 
-func (w WaitList) Trigger(id int64, x interface{}) {
+func (w List) Trigger(id int64, x interface{}) {
 	w.l.Lock()
 	w.l.Lock()
 	ch := w.m[id]
 	ch := w.m[id]
 	delete(w.m, id)
 	delete(w.m, id)