Browse Source

storage: add kv and event proto

Xiang Li 10 years ago
parent
commit
845cb61213
4 changed files with 526 additions and 6 deletions
  1. 1 1
      scripts/genproto.sh
  2. 34 5
      storage/kv.go
  3. 456 0
      storage/storagepb/kv.pb.go
  4. 35 0
      storage/storagepb/kv.proto

+ 1 - 1
scripts/genproto.sh

@@ -5,7 +5,7 @@
 #
 
 PREFIX="github.com/coreos/etcd/Godeps/_workspace/src"
-DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./migrate/etcd4pb"
+DIRS="./wal/walpb ./etcdserver/etcdserverpb ./snap/snappb ./raft/raftpb ./migrate/etcd4pb ./storage/storagepb"
 
 SHA="bc946d07d1016848dfd2507f90f0859c9471681e"
 

+ 34 - 5
storage/kv.go

@@ -2,9 +2,11 @@ package storage
 
 import (
 	"encoding/binary"
+	"log"
 	"time"
 
 	"github.com/coreos/etcd/storage/backend"
+	"github.com/coreos/etcd/storage/storagepb"
 )
 
 var (
@@ -17,14 +19,16 @@ type store struct {
 	b       backend.Backend
 	kvindex index
 
-	now uint64 // current index of the store
+	now        uint64 // current index of the store
+	marshalBuf []byte // buffer for marshal protobuf
 }
 
 func newStore(path string) *store {
 	s := &store{
-		b:       backend.New(path, batchInterval, batchLimit),
-		kvindex: newTreeIndex(),
-		now:     0,
+		b:          backend.New(path, batchInterval, batchLimit),
+		kvindex:    newTreeIndex(),
+		now:        0,
+		marshalBuf: make([]byte, 1024*1024),
 	}
 
 	tx := s.b.BatchTx()
@@ -47,7 +51,32 @@ func (s *store) Put(key, value []byte) {
 	tx.Lock()
 	defer tx.Unlock()
 	s.now = now
-	tx.UnsafePut(keyBucketName, ibytes, value)
+
+	event := storagepb.Event{
+		Type: storagepb.PUT,
+		Kv: storagepb.KeyValue{
+			Key:   key,
+			Value: value,
+		},
+	}
+
+	var (
+		d   []byte
+		err error
+		n   int
+	)
+
+	if event.Size() < len(s.marshalBuf) {
+		n, err = event.MarshalTo(s.marshalBuf)
+		d = s.marshalBuf[:n]
+	} else {
+		d, err = event.Marshal()
+	}
+	if err != nil {
+		log.Fatalf("storage: cannot marshal event: %v", err)
+	}
+
+	tx.UnsafePut(keyBucketName, ibytes, d)
 }
 
 func (s *store) Get(key []byte) []byte {

+ 456 - 0
storage/storagepb/kv.pb.go

@@ -0,0 +1,456 @@
+// Code generated by protoc-gen-gogo.
+// source: kv.proto
+// DO NOT EDIT!
+
+/*
+	Package storagepb is a generated protocol buffer package.
+
+	It is generated from these files:
+		kv.proto
+
+	It has these top-level messages:
+		KeyValue
+		Event
+*/
+package storagepb
+
+import proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
+import math "math"
+
+// discarding unused import gogoproto "github.com/gogo/protobuf/gogoproto/gogo.pb"
+
+import io "io"
+import fmt "fmt"
+import github_com_gogo_protobuf_proto "github.com/coreos/etcd/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = math.Inf
+
+type Event_EventType int32
+
+const (
+	PUT    Event_EventType = 0
+	DELETE Event_EventType = 1
+	EXPIRE Event_EventType = 2
+)
+
+var Event_EventType_name = map[int32]string{
+	0: "PUT",
+	1: "DELETE",
+	2: "EXPIRE",
+}
+var Event_EventType_value = map[string]int32{
+	"PUT":    0,
+	"DELETE": 1,
+	"EXPIRE": 2,
+}
+
+func (x Event_EventType) Enum() *Event_EventType {
+	p := new(Event_EventType)
+	*p = x
+	return p
+}
+func (x Event_EventType) String() string {
+	return proto.EnumName(Event_EventType_name, int32(x))
+}
+func (x *Event_EventType) UnmarshalJSON(data []byte) error {
+	value, err := proto.UnmarshalJSONEnum(Event_EventType_value, data, "Event_EventType")
+	if err != nil {
+		return err
+	}
+	*x = Event_EventType(value)
+	return nil
+}
+
+type KeyValue struct {
+	Key []byte `protobuf:"bytes,1,opt,name=key" json:"key"`
+	// mod_index is the last modified index of the key.
+	CreateIndex int64 `protobuf:"varint,2,opt,name=create_index" json:"create_index"`
+	ModIndex    int64 `protobuf:"varint,3,opt,name=mod_index" json:"mod_index"`
+	// version is the version of the key. A deletion resets
+	// the version to zero and any modification of the key
+	// increases its version.
+	Version          int64  `protobuf:"varint,4,opt,name=version" json:"version"`
+	Value            []byte `protobuf:"bytes,5,opt,name=value" json:"value"`
+	XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *KeyValue) Reset()         { *m = KeyValue{} }
+func (m *KeyValue) String() string { return proto.CompactTextString(m) }
+func (*KeyValue) ProtoMessage()    {}
+
+type Event struct {
+	Type Event_EventType `protobuf:"varint,1,opt,name=type,enum=storagepb.Event_EventType" json:"type"`
+	// a put event contains the current key-value
+	// a delete/expire event contains the previous
+	// key-value
+	Kv               KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv"`
+	XXX_unrecognized []byte   `json:"-"`
+}
+
+func (m *Event) Reset()         { *m = Event{} }
+func (m *Event) String() string { return proto.CompactTextString(m) }
+func (*Event) ProtoMessage()    {}
+
+func init() {
+	proto.RegisterEnum("storagepb.Event_EventType", Event_EventType_name, Event_EventType_value)
+}
+func (m *KeyValue) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Key = append([]byte{}, data[index:postIndex]...)
+			index = postIndex
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field CreateIndex", wireType)
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.CreateIndex |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field ModIndex", wireType)
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.ModIndex |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 4:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Version |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 5:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Value = append([]byte{}, data[index:postIndex]...)
+			index = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := github_com_gogo_protobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *Event) Unmarshal(data []byte) error {
+	l := len(data)
+	index := 0
+	for index < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if index >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[index]
+			index++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		switch fieldNum {
+		case 1:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+			}
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				m.Type |= (Event_EventType(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Kv", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if index >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[index]
+				index++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			postIndex := index + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			if err := m.Kv.Unmarshal(data[index:postIndex]); err != nil {
+				return err
+			}
+			index = postIndex
+		default:
+			var sizeOfWire int
+			for {
+				sizeOfWire++
+				wire >>= 7
+				if wire == 0 {
+					break
+				}
+			}
+			index -= sizeOfWire
+			skippy, err := github_com_gogo_protobuf_proto.Skip(data[index:])
+			if err != nil {
+				return err
+			}
+			if (index + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.XXX_unrecognized = append(m.XXX_unrecognized, data[index:index+skippy]...)
+			index += skippy
+		}
+	}
+	return nil
+}
+func (m *KeyValue) Size() (n int) {
+	var l int
+	_ = l
+	if m.Key != nil {
+		l = len(m.Key)
+		n += 1 + l + sovKv(uint64(l))
+	}
+	n += 1 + sovKv(uint64(m.CreateIndex))
+	n += 1 + sovKv(uint64(m.ModIndex))
+	n += 1 + sovKv(uint64(m.Version))
+	if m.Value != nil {
+		l = len(m.Value)
+		n += 1 + l + sovKv(uint64(l))
+	}
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func (m *Event) Size() (n int) {
+	var l int
+	_ = l
+	n += 1 + sovKv(uint64(m.Type))
+	l = m.Kv.Size()
+	n += 1 + l + sovKv(uint64(l))
+	if m.XXX_unrecognized != nil {
+		n += len(m.XXX_unrecognized)
+	}
+	return n
+}
+
+func sovKv(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozKv(x uint64) (n int) {
+	return sovKv(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *KeyValue) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *KeyValue) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if m.Key != nil {
+		data[i] = 0xa
+		i++
+		i = encodeVarintKv(data, i, uint64(len(m.Key)))
+		i += copy(data[i:], m.Key)
+	}
+	data[i] = 0x10
+	i++
+	i = encodeVarintKv(data, i, uint64(m.CreateIndex))
+	data[i] = 0x18
+	i++
+	i = encodeVarintKv(data, i, uint64(m.ModIndex))
+	data[i] = 0x20
+	i++
+	i = encodeVarintKv(data, i, uint64(m.Version))
+	if m.Value != nil {
+		data[i] = 0x2a
+		i++
+		i = encodeVarintKv(data, i, uint64(len(m.Value)))
+		i += copy(data[i:], m.Value)
+	}
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+
+func (m *Event) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *Event) MarshalTo(data []byte) (n int, err error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	data[i] = 0x8
+	i++
+	i = encodeVarintKv(data, i, uint64(m.Type))
+	data[i] = 0x12
+	i++
+	i = encodeVarintKv(data, i, uint64(m.Kv.Size()))
+	n1, err := m.Kv.MarshalTo(data[i:])
+	if err != nil {
+		return 0, err
+	}
+	i += n1
+	if m.XXX_unrecognized != nil {
+		i += copy(data[i:], m.XXX_unrecognized)
+	}
+	return i, nil
+}
+
+func encodeFixed64Kv(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Kv(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintKv(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}

+ 35 - 0
storage/storagepb/kv.proto

@@ -0,0 +1,35 @@
+package storagepb;
+
+import "github.com/gogo/protobuf/gogoproto/gogo.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+option (gogoproto.goproto_enum_prefix_all) = false;
+
+message KeyValue {
+  optional bytes key = 1 [(gogoproto.nullable) = false];
+  // mod_index is the last modified index of the key.
+  optional int64 create_index = 2 [(gogoproto.nullable) = false];
+  optional int64 mod_index = 3 [(gogoproto.nullable) = false];
+  // version is the version of the key. A deletion resets
+  // the version to zero and any modification of the key
+  // increases its version.
+  optional int64 version = 4 [(gogoproto.nullable) = false];
+  optional bytes value = 5 [(gogoproto.nullable) = false];
+}
+
+message Event {
+  enum EventType {
+    PUT = 0;
+    DELETE = 1;
+    EXPIRE = 2;
+  }
+  optional EventType type = 1 [(gogoproto.nullable) = false];
+  // a put event contains the current key-value
+  // a delete/expire event contains the previous
+  // key-value
+  optional KeyValue kv = 2 [(gogoproto.nullable) = false];
+}
+