瀏覽代碼

start refactoring frame

Chris Bannister 10 年之前
父節點
當前提交
5c9c6cfea0
共有 1 個文件被更改,包括 313 次插入23 次删除
  1. 313 23
      frame.go

+ 313 - 23
frame.go

@@ -5,7 +5,9 @@
 package gocql
 
 import (
+	"errors"
 	"fmt"
+	"io"
 	"net"
 )
 
@@ -15,37 +17,58 @@ const (
 	protoVersion1      = 0x01
 	protoVersion2      = 0x02
 	protoVersion3      = 0x03
+)
 
+const (
+	// header ops
 	opError         byte = 0x00
-	opStartup       byte = 0x01
-	opReady         byte = 0x02
-	opAuthenticate  byte = 0x03
-	opOptions       byte = 0x05
-	opSupported     byte = 0x06
-	opQuery         byte = 0x07
-	opResult        byte = 0x08
-	opPrepare       byte = 0x09
-	opExecute       byte = 0x0A
-	opRegister      byte = 0x0B
-	opEvent         byte = 0x0C
-	opBatch         byte = 0x0D
-	opAuthChallenge byte = 0x0E
-	opAuthResponse  byte = 0x0F
-	opAuthSuccess   byte = 0x10
-
+	opStartup            = 0x01
+	opReady              = 0x02
+	opAuthenticate       = 0x03
+	opOptions            = 0x05
+	opSupported          = 0x06
+	opQuery              = 0x07
+	opResult             = 0x08
+	opPrepare            = 0x09
+	opExecute            = 0x0A
+	opRegister           = 0x0B
+	opEvent              = 0x0C
+	opBatch              = 0x0D
+	opAuthChallenge      = 0x0E
+	opAuthResponse       = 0x0F
+	opAuthSuccess        = 0x10
+
+	// result kind
 	resultKindVoid          = 1
 	resultKindRows          = 2
 	resultKindKeyspace      = 3
 	resultKindPrepared      = 4
 	resultKindSchemaChanged = 5
 
-	flagQueryValues uint8 = 1
-	flagCompress    uint8 = 1
-	flagTrace       uint8 = 2
-	flagPageSize    uint8 = 4
-	flagPageState   uint8 = 8
-	flagHasMore     uint8 = 2
+	// rows flags
+	flagGlobalTableSpec int = 0x01
+	flagHasMorePages        = 0x02
+	flagNoMetaData          = 0x04
+
+	// query flags
+	flagValues                byte = 0x01
+	flagSkipMetaData               = 0x02
+	flagPageSize                   = 0x04
+	flagWithPagingState            = 0x08
+	flagWithSerialConsistency      = 0x10
+
+	// header flags
+	flagCompression byte = 0x01
+	flagTracing          = 0x02
+
+	flagQueryValues byte = 1
+	flagCompress         = 1
+	flagTrace            = 2
+	flagPageState        = 8
+	flagHasMore          = 2
+)
 
+const (
 	apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
 )
 
@@ -55,10 +78,277 @@ var headerProtoSize = [...]int{
 	protoVersion3: 9,
 }
 
+func writeInt(p []byte, n int) {
+	p[0] = byte(n >> 24)
+	p[1] = byte(n >> 16)
+	p[2] = byte(n >> 8)
+	p[3] = byte(n)
+}
+
+func readInt(p []byte) int {
+	if len(p) < 4 {
+		panic("readInt requires 4 bytes")
+	}
+
+	return int(p[0])<<24 | int(p[1])<<16 | int(p[2])<<8 | int(p[3])
+}
+
+func writeShort(p []byte, n int) {
+	p[0] = byte(n >> 8)
+	p[1] = byte(n)
+}
+
+func readShort(p []byte) int16 {
+	if len(p) < 2 {
+		panic("readShort requires 2 bytes")
+	}
+
+	return int16(p[0])<<8 | int16(p[1])
+}
+
+type frameHeader interface {
+	Version() byte
+	Flags() byte
+	Stream() int
+	Op() byte
+	Length() int
+
+	HeaderSize() int
+
+	io.Writer
+	io.Reader
+}
+
+type frameHeaderV1 struct {
+	version byte
+	flags   byte
+	// stream is an int8 on the wire
+	stream int
+	op     byte
+	lenth  int
+}
+
+func (f *frameHeaderV1) HeaderSize() int {
+	return 8
+}
+
+func (f *frameHeaderV1) Header() frameHeader {
+	return f
+}
+
+func (f *frameHeaderV1) appendWrite(p []byte) []byte {
+	return append(p,
+		f.version,
+		f.flags,
+		f.stream,
+		f.op,
+		byte(f.lenth>>24),
+		byte(f.lenth>>16),
+		byte(f.lenth>>8),
+		byte(f.lenth),
+	)
+}
+
+func (f *frameHeaderV1) Op() byte {
+	return f.op()
+}
+
+func (f *frameHeaderV1) Flags() byte {
+	return f.flags
+}
+
+func (f *frameHeaderV1) Stream() int {
+	return f.stream
+}
+
+func (f *frameHeaderV1) Length() int {
+	return f.lenth
+}
+
+type frameHeaderV3 struct {
+	version byte
+	flags   byte
+	// stream is an int16 on the wire
+	stream int
+	op     byte
+	lenth  int
+}
+
+func (f *frameHeaderV3) HeaderSize() int {
+	return 9
+}
+
+func (f *frameHeaderV3) Header() frameHeader {
+	return f
+}
+
+func (f *frameHeaderV3) Read(p []byte) (int64, error) {
+	if len(p) < 9 {
+		return 0, errors.New("require 9 bytes to read v3 header")
+	}
+
+	f.version = b[0]
+	f.flags = b[1]
+	f.stream = int(readShort(b[2:]))
+	f.op = b[4]
+	f.lenth = readInt(b[5:])
+
+	return 9, nil
+}
+
+func (f *frameHeaderV3) Op() byte {
+	return f.op
+}
+
+func (f *frameHeaderV3) Flags() byte {
+	return f.flags
+}
+
+func (f *frameHeaderV3) Stream() int {
+	return f.stream
+}
+
+func (f *frameHeaderV3) Length() int {
+	return f.lenth
+}
+
+func (f *frameHeaderV3) appendWrite(p []byte) []byte {
+	return append(p,
+		f.version,
+		f.flags,
+		byte(f.stream>>8),
+		byte(f.stream),
+		f.op,
+		byte(f.lenth>>24),
+		byte(f.lenth>>16),
+		byte(f.lenth>>8),
+		byte(f.lenth),
+	)
+}
+
+type frame interface {
+	Header() frameHeader
+}
+
+// a frame is responsible for reading and writing frames on a single stream for
+type framer struct {
+	r io.Reader
+	w io.Writer
+
+	// the size which has been written or read of the body
+	bodySize int
+	proto    byte
+	buf      []byte
+}
+
+func (f *framer) encodeFrame(frame frame) []byte {
+	header := frame.Header()
+	hsize := header.HeaderSize()
+	// TODO: can we reuse an underlying buf slice here instead of allocating a new
+	// one for writeFrame?
+	if cap(f.buf) > hsize {
+		// make sure there is enough room for the header
+		f.buf = f.buf[0:hsize]
+	}
+
+	body := f.buf[hsize:]
+
+	// write body
+	body = frame.appendBody()
+
+	// TODO: can this be done without a type switch and widthout having a SetSize()
+	// method on the header?
+	switch v := header.(type) {
+	case *frameHeaderV1:
+		v.lenth = len(body)
+	case *frameHeaderV3:
+		v.lenth = len(body)
+	default:
+		panic(fmt.Sprintf("encodeFrame: unknown header type: %T", v))
+	}
+
+	_, err := header.Write(f.buf[0:hsize])
+	if err != nil {
+		panic(err)
+	}
+
+	return f.buf[:]
+}
+
+// these are protocol level binary types
+func (f *framer) writeInt(n int) {
+	f.buf = append(f.buf,
+		byte(n>>24),
+		byte(n>>16),
+		byte(n>>8),
+		byte(n),
+	)
+}
+
+func (f *framer) writeShort(n int) {
+	f.buf = append(f.buf,
+		byte(n>>8),
+		byte(n),
+	)
+}
+
+func (f *framer) writeString(s string) {
+	f.writeShort(len(s))
+	f.buf = append(f.buf, s...)
+}
+
+func (f *framer) writeLongString(s string) {
+	f.writeInt(len(s))
+	f.buf = append(f.buf, s...)
+}
+
+func (f *framer) writeUUID(u *UUID) {
+	f.buf = append(f.buf, u[:]...)
+}
+
+func (f *framer) writeStringList(l []string) {
+	f.writeShort(len(l))
+	for _, s := range l {
+		f.writeString(s)
+	}
+}
+
+func (f *framer) writeBytes(p []byte) {
+	// TODO: handle null case correctly,
+	//     [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
+	//					  no byte should follow and the value represented is `null`.
+	if p == nil {
+		f.writeInt(-1)
+	} else {
+		f.writeInt(len(p))
+		f.buf = append(f.buf, p...)
+	}
+}
+
+func (f *framer) writeShortBytes(p []byte) {
+	f.writeShort(len(p))
+	f.fbuf = append(f.buf, p...)
+}
+
+// TODO: add writeOption, though no frame actually writes an option so probably
+// just need a read
+
+func (f *framer) writeInet(ip net.IP, port int) {
+	f.buf = append(f.buf,
+		byte(len(ip)),
+		ip...,
+	)
+	f.writeInt(port)
+}
+
+func (f *framer) writeConsistency(cons Consistency) {
+	f.writeShort(cons)
+}
+
 // TODO: replace with a struct which has a header and a body buffer,
 // header just has methods like, set/get the options in its backing array
 // then in a writeTo we write the header then the body.
-type frame []byte
+// type frame []byte
 
 func newFrame(version uint8) frame {
 	// TODO: pool these at the session level incase anyone is using different