|
|
@@ -5,10 +5,11 @@
|
|
|
package gocql
|
|
|
|
|
|
import (
|
|
|
- "errors"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
"net"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
const (
|
|
|
@@ -17,27 +18,96 @@ const (
|
|
|
protoVersion1 = 0x01
|
|
|
protoVersion2 = 0x02
|
|
|
protoVersion3 = 0x03
|
|
|
+
|
|
|
+ protoVersionRequest = 0x00
|
|
|
+ protoVersionResponse = 0x80
|
|
|
)
|
|
|
|
|
|
+type protoVersion byte
|
|
|
+
|
|
|
+func (p protoVersion) request() bool {
|
|
|
+ return p&protoDirectionMask == 0x00
|
|
|
+}
|
|
|
+
|
|
|
+func (p protoVersion) response() bool {
|
|
|
+ return p&protoDirectionMask == 0x80
|
|
|
+}
|
|
|
+
|
|
|
+func (p protoVersion) version() byte {
|
|
|
+ return byte(p) & protoVersionMask
|
|
|
+}
|
|
|
+
|
|
|
+func (p protoVersion) String() string {
|
|
|
+ dir := "REQ"
|
|
|
+ if p.response() {
|
|
|
+ dir = "RESP"
|
|
|
+ }
|
|
|
+
|
|
|
+ return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir)
|
|
|
+}
|
|
|
+
|
|
|
+type frameOp byte
|
|
|
+
|
|
|
const (
|
|
|
// header ops
|
|
|
- opError byte = 0x00
|
|
|
- opStartup = 0x01
|
|
|
- opReady = 0x02
|
|
|
- opAuthenticate = 0x03
|
|
|
- opOptions = 0x05
|
|
|
- opSupported = 0x06
|
|
|
- opQuery = 0x07
|
|
|
- opResult = 0x08
|
|
|
- opPrepare = 0x09
|
|
|
- opExecute = 0x0A
|
|
|
- opRegister = 0x0B
|
|
|
- opEvent = 0x0C
|
|
|
- opBatch = 0x0D
|
|
|
- opAuthChallenge = 0x0E
|
|
|
- opAuthResponse = 0x0F
|
|
|
- opAuthSuccess = 0x10
|
|
|
+ opError frameOp = 0x00
|
|
|
+ opStartup = 0x01
|
|
|
+ opReady = 0x02
|
|
|
+ opAuthenticate = 0x03
|
|
|
+ opOptions = 0x05
|
|
|
+ opSupported = 0x06
|
|
|
+ opQuery = 0x07
|
|
|
+ opResult = 0x08
|
|
|
+ opPrepare = 0x09
|
|
|
+ opExecute = 0x0A
|
|
|
+ opRegister = 0x0B
|
|
|
+ opEvent = 0x0C
|
|
|
+ opBatch = 0x0D
|
|
|
+ opAuthChallenge = 0x0E
|
|
|
+ opAuthResponse = 0x0F
|
|
|
+ opAuthSuccess = 0x10
|
|
|
+)
|
|
|
|
|
|
+func (f frameOp) String() string {
|
|
|
+ switch f {
|
|
|
+ case opError:
|
|
|
+ return "ERROR"
|
|
|
+ case opStartup:
|
|
|
+ return "STARTUP"
|
|
|
+ case opReady:
|
|
|
+ return "READY"
|
|
|
+ case opAuthenticate:
|
|
|
+ return "AUTHENTICATE"
|
|
|
+ case opOptions:
|
|
|
+ return "OPTIONS"
|
|
|
+ case opSupported:
|
|
|
+ return "SUPPORTED"
|
|
|
+ case opQuery:
|
|
|
+ return "QUERY"
|
|
|
+ case opResult:
|
|
|
+ return "RESULT"
|
|
|
+ case opPrepare:
|
|
|
+ return "PREPARE"
|
|
|
+ case opExecute:
|
|
|
+ return "EXECUTE"
|
|
|
+ case opRegister:
|
|
|
+ return "REGISTER"
|
|
|
+ case opEvent:
|
|
|
+ return "EVENT"
|
|
|
+ case opBatch:
|
|
|
+ return "BATCH"
|
|
|
+ case opAuthChallenge:
|
|
|
+ return "AUTH_CHALLENGE"
|
|
|
+ case opAuthResponse:
|
|
|
+ return "AUTH_RESPONSE"
|
|
|
+ case opAuthSuccess:
|
|
|
+ return "AUTH_SUCCESS"
|
|
|
+ default:
|
|
|
+ return fmt.Sprintf("UNKNOWN_OP_%d", f)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
// result kind
|
|
|
resultKindVoid = 1
|
|
|
resultKindRows = 2
|
|
|
@@ -56,6 +126,8 @@ const (
|
|
|
flagPageSize = 0x04
|
|
|
flagWithPagingState = 0x08
|
|
|
flagWithSerialConsistency = 0x10
|
|
|
+ flagDefaultTimestamp = 0x20
|
|
|
+ flagWithNameValues = 0x40
|
|
|
|
|
|
// header flags
|
|
|
flagCompression byte = 0x01
|
|
|
@@ -68,16 +140,55 @@ const (
|
|
|
flagHasMore = 2
|
|
|
)
|
|
|
|
|
|
+type Consistency uint16
|
|
|
+
|
|
|
const (
|
|
|
- apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
|
|
|
+ Any Consistency = 0x00
|
|
|
+ One = 0x01
|
|
|
+ Two = 0x02
|
|
|
+ Three = 0x03
|
|
|
+ Quorum = 0x04
|
|
|
+ All = 0x05
|
|
|
+ LocalQuorum = 0x06
|
|
|
+ EachQuorum = 0x07
|
|
|
+ Serial = 0x08
|
|
|
+ LocalSerial = 0x09
|
|
|
+ LocalOne = 0x0A
|
|
|
)
|
|
|
|
|
|
-var headerProtoSize = [...]int{
|
|
|
- protoVersion1: 8,
|
|
|
- protoVersion2: 8,
|
|
|
- protoVersion3: 9,
|
|
|
+func (c Consistency) String() string {
|
|
|
+ switch c {
|
|
|
+ case Any:
|
|
|
+ return "ANY"
|
|
|
+ case One:
|
|
|
+ return "ONE"
|
|
|
+ case Two:
|
|
|
+ return "TWO"
|
|
|
+ case Three:
|
|
|
+ return "THREE"
|
|
|
+ case Quorum:
|
|
|
+ return "QUORUM"
|
|
|
+ case All:
|
|
|
+ return "ALL"
|
|
|
+ case LocalQuorum:
|
|
|
+ return "LOCAL_QUORUM"
|
|
|
+ case EachQuorum:
|
|
|
+ return "EACH_QUORUM"
|
|
|
+ case Serial:
|
|
|
+ return "SERIAL"
|
|
|
+ case LocalSerial:
|
|
|
+ return "LOCAL_SERIAL"
|
|
|
+ case LocalOne:
|
|
|
+ return "LOCAL_ONE"
|
|
|
+ default:
|
|
|
+ return fmt.Sprintf("UNKNOWN_CONS_0x%x", c)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+const (
|
|
|
+ apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
|
|
|
+)
|
|
|
+
|
|
|
func writeInt(p []byte, n int) {
|
|
|
p[0] = byte(n >> 24)
|
|
|
p[1] = byte(n >> 16)
|
|
|
@@ -85,823 +196,1112 @@ func writeInt(p []byte, n int) {
|
|
|
p[3] = byte(n)
|
|
|
}
|
|
|
|
|
|
-func readInt(p []byte) int {
|
|
|
- if len(p) < 4 {
|
|
|
- panic("readInt requires 4 bytes")
|
|
|
- }
|
|
|
-
|
|
|
- return int(p[0])<<24 | int(p[1])<<16 | int(p[2])<<8 | int(p[3])
|
|
|
+func readInt(p []byte) int32 {
|
|
|
+ return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3])
|
|
|
}
|
|
|
|
|
|
-func writeShort(p []byte, n int) {
|
|
|
+func writeShort(p []byte, n uint16) {
|
|
|
p[0] = byte(n >> 8)
|
|
|
p[1] = byte(n)
|
|
|
}
|
|
|
|
|
|
-func readShort(p []byte) int16 {
|
|
|
- if len(p) < 2 {
|
|
|
- panic("readShort requires 2 bytes")
|
|
|
- }
|
|
|
-
|
|
|
- return int16(p[0])<<8 | int16(p[1])
|
|
|
-}
|
|
|
-
|
|
|
-type frameHeader interface {
|
|
|
- Version() byte
|
|
|
- Flags() byte
|
|
|
- Stream() int
|
|
|
- Op() byte
|
|
|
- Length() int
|
|
|
-
|
|
|
- HeaderSize() int
|
|
|
-
|
|
|
- io.Writer
|
|
|
- io.Reader
|
|
|
+func readShort(p []byte) int {
|
|
|
+ return int(p[0])<<8 | int(p[1])
|
|
|
}
|
|
|
|
|
|
-type frameHeaderV1 struct {
|
|
|
- version byte
|
|
|
+type frameHeader struct {
|
|
|
+ version protoVersion
|
|
|
flags byte
|
|
|
- // stream is an int8 on the wire
|
|
|
- stream int
|
|
|
- op byte
|
|
|
- lenth int
|
|
|
+ stream int
|
|
|
+ op frameOp
|
|
|
+ length int
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV1) HeaderSize() int {
|
|
|
- return 8
|
|
|
+func (f frameHeader) String() string {
|
|
|
+ return fmt.Sprintf("[header version=%s flags=0x%x stream=%d op=%s length=%d]", f.version, f.flags, f.stream, f.op, f.length)
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV1) Header() frameHeader {
|
|
|
+func (f frameHeader) Header() frameHeader {
|
|
|
return f
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV1) appendWrite(p []byte) []byte {
|
|
|
- return append(p,
|
|
|
- f.version,
|
|
|
- f.flags,
|
|
|
- f.stream,
|
|
|
- f.op,
|
|
|
- byte(f.lenth>>24),
|
|
|
- byte(f.lenth>>16),
|
|
|
- byte(f.lenth>>8),
|
|
|
- byte(f.lenth),
|
|
|
- )
|
|
|
+var framerPool = sync.Pool{
|
|
|
+ New: func() interface{} {
|
|
|
+ return &framer{
|
|
|
+ buf: make([]byte, 0, 4096),
|
|
|
+ }
|
|
|
+ },
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV1) Op() byte {
|
|
|
- return f.op()
|
|
|
-}
|
|
|
+// a framer is responsible for reading, writing and parsing frames on a single stream
|
|
|
+type framer struct {
|
|
|
+ r io.Reader
|
|
|
+ w io.Writer
|
|
|
|
|
|
-func (f *frameHeaderV1) Flags() byte {
|
|
|
- return f.flags
|
|
|
-}
|
|
|
+ proto byte
|
|
|
+ // flags are for outgoing flags, enabling compression and tracing etc
|
|
|
+ flags byte
|
|
|
+ compres Compressor
|
|
|
+ headSize int
|
|
|
+ // if this frame was read then the header will be here
|
|
|
+ header *frameHeader
|
|
|
|
|
|
-func (f *frameHeaderV1) Stream() int {
|
|
|
- return f.stream
|
|
|
+ buf []byte
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV1) Length() int {
|
|
|
- return f.lenth
|
|
|
-}
|
|
|
+func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer {
|
|
|
+ f := framerPool.Get().(*framer)
|
|
|
+ var flags byte
|
|
|
+ if compressor != nil {
|
|
|
+ flags |= flagCompress
|
|
|
+ }
|
|
|
|
|
|
-type frameHeaderV3 struct {
|
|
|
- version byte
|
|
|
- flags byte
|
|
|
- // stream is an int16 on the wire
|
|
|
- stream int
|
|
|
- op byte
|
|
|
- lenth int
|
|
|
-}
|
|
|
+ headSize := 8
|
|
|
+ if version > protoVersion2 {
|
|
|
+ headSize = 9
|
|
|
+ }
|
|
|
|
|
|
-func (f *frameHeaderV3) HeaderSize() int {
|
|
|
- return 9
|
|
|
-}
|
|
|
+ f.r = r
|
|
|
+ f.w = w
|
|
|
+ f.compres = compressor
|
|
|
+ f.proto = version
|
|
|
+ f.flags = flags
|
|
|
+ f.headSize = headSize
|
|
|
+ f.buf = f.buf[:0]
|
|
|
+ f.header = nil
|
|
|
|
|
|
-func (f *frameHeaderV3) Header() frameHeader {
|
|
|
return f
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV3) Read(p []byte) (int64, error) {
|
|
|
- if len(p) < 9 {
|
|
|
- return 0, errors.New("require 9 bytes to read v3 header")
|
|
|
+type frame interface {
|
|
|
+ Header() frameHeader
|
|
|
+}
|
|
|
+
|
|
|
+func readHeader(r io.Reader, p []byte) (frameHeader, error) {
|
|
|
+ _, err := io.ReadFull(r, p)
|
|
|
+ if err != nil {
|
|
|
+ return frameHeader{}, err
|
|
|
}
|
|
|
|
|
|
- f.version = b[0]
|
|
|
- f.flags = b[1]
|
|
|
- f.stream = int(readShort(b[2:]))
|
|
|
- f.op = b[4]
|
|
|
- f.lenth = readInt(b[5:])
|
|
|
+ head := frameHeader{}
|
|
|
+ version := p[0] & protoVersionMask
|
|
|
+ direction := p[0] & protoDirectionMask
|
|
|
+ head.version = protoVersion(p[0])
|
|
|
+ if direction == protoVersionRequest {
|
|
|
+ return frameHeader{}, NewErrProtocol("got a request frame from server: %v", head.version)
|
|
|
+ }
|
|
|
|
|
|
- return 9, nil
|
|
|
-}
|
|
|
+ head.flags = p[1]
|
|
|
+ if version > protoVersion2 {
|
|
|
+ head.stream = readShort(p[2:])
|
|
|
+ head.op = frameOp(p[4])
|
|
|
+ head.length = int(readInt(p[5:]))
|
|
|
+ } else {
|
|
|
+ head.stream = int(p[2])
|
|
|
+ head.op = frameOp(p[3])
|
|
|
+ head.length = int(readInt(p[4:]))
|
|
|
+ }
|
|
|
|
|
|
-func (f *frameHeaderV3) Op() byte {
|
|
|
- return f.op
|
|
|
+ return head, nil
|
|
|
}
|
|
|
|
|
|
-func (f *frameHeaderV3) Flags() byte {
|
|
|
- return f.flags
|
|
|
-}
|
|
|
+// reads a frame form the wire into the framers buffer
|
|
|
+func (f *framer) readFrame(head *frameHeader) error {
|
|
|
+ // assume the underlying reader takes care of timeouts and retries
|
|
|
+ if cap(f.buf) > head.length {
|
|
|
+ f.buf = f.buf[:head.length]
|
|
|
+ } else {
|
|
|
+ f.buf = make([]byte, head.length)
|
|
|
+ }
|
|
|
|
|
|
-func (f *frameHeaderV3) Stream() int {
|
|
|
- return f.stream
|
|
|
-}
|
|
|
+ _, err := io.ReadFull(f.r, f.buf)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
-func (f *frameHeaderV3) Length() int {
|
|
|
- return f.lenth
|
|
|
-}
|
|
|
+ // TODO: move frame processing out of framer and onto the requesting callers
|
|
|
+ // this means that we will not be able to reuse buffers between streams, which
|
|
|
+ // may end up being slower than parsing on the IO thread.
|
|
|
+ if head.flags&flagCompress == flagCompress {
|
|
|
+ if f.compres == nil {
|
|
|
+ return NewErrProtocol("no compressor available with compressed frame body")
|
|
|
+ }
|
|
|
|
|
|
-func (f *frameHeaderV3) appendWrite(p []byte) []byte {
|
|
|
- return append(p,
|
|
|
- f.version,
|
|
|
- f.flags,
|
|
|
- byte(f.stream>>8),
|
|
|
- byte(f.stream),
|
|
|
- f.op,
|
|
|
- byte(f.lenth>>24),
|
|
|
- byte(f.lenth>>16),
|
|
|
- byte(f.lenth>>8),
|
|
|
- byte(f.lenth),
|
|
|
- )
|
|
|
+ f.buf, err = f.compres.Decode(f.buf)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ f.header = head
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) parseFrame() (frame, error) {
|
|
|
+ // asumes that the frame body has been read into buf
|
|
|
+ switch f.header.op {
|
|
|
+ case opError:
|
|
|
+ return f.parseErrorFrame()
|
|
|
+ case opReady:
|
|
|
+ return f.parseReadyFrame(), nil
|
|
|
+ case opResult:
|
|
|
+ return f.parseResultFrame()
|
|
|
+ case opSupported:
|
|
|
+ return f.parseSupportedFrame(), nil
|
|
|
+ case opAuthenticate:
|
|
|
+ return f.parseAuthenticateFrame(), nil
|
|
|
+ }
|
|
|
+ return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
|
|
|
}
|
|
|
|
|
|
-type frame interface {
|
|
|
- Header() frameHeader
|
|
|
+func (f *framer) parseErrorFrame() (frame, error) {
|
|
|
+ code := f.readInt()
|
|
|
+ msg := f.readString()
|
|
|
+
|
|
|
+ errD := errorFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ code: code,
|
|
|
+ message: msg,
|
|
|
+ }
|
|
|
+
|
|
|
+ switch code {
|
|
|
+ case errUnavailable:
|
|
|
+ cl := f.readConsistency()
|
|
|
+ required := f.readInt()
|
|
|
+ alive := f.readInt()
|
|
|
+ return RequestErrUnavailable{
|
|
|
+ errorFrame: errD,
|
|
|
+ Consistency: cl,
|
|
|
+ Required: required,
|
|
|
+ Alive: alive,
|
|
|
+ }, nil
|
|
|
+ case errWriteTimeout:
|
|
|
+ cl := f.readConsistency()
|
|
|
+ received := f.readInt()
|
|
|
+ blockfor := f.readInt()
|
|
|
+ writeType := f.readString()
|
|
|
+ return RequestErrWriteTimeout{
|
|
|
+ errorFrame: errD,
|
|
|
+ Consistency: cl,
|
|
|
+ Received: received,
|
|
|
+ BlockFor: blockfor,
|
|
|
+ WriteType: writeType,
|
|
|
+ }, nil
|
|
|
+ case errReadTimeout:
|
|
|
+ cl := f.readConsistency()
|
|
|
+ received := f.readInt()
|
|
|
+ blockfor := f.readInt()
|
|
|
+ dataPresent := f.readByte()
|
|
|
+ return RequestErrReadTimeout{
|
|
|
+ errorFrame: errD,
|
|
|
+ Consistency: cl,
|
|
|
+ Received: received,
|
|
|
+ BlockFor: blockfor,
|
|
|
+ DataPresent: dataPresent,
|
|
|
+ }, nil
|
|
|
+ case errAlreadyExists:
|
|
|
+ ks := f.readString()
|
|
|
+ table := f.readString()
|
|
|
+ return RequestErrAlreadyExists{
|
|
|
+ errorFrame: errD,
|
|
|
+ Keyspace: ks,
|
|
|
+ Table: table,
|
|
|
+ }, nil
|
|
|
+ case errUnprepared:
|
|
|
+ stmtId := f.readShortBytes()
|
|
|
+ return RequestErrUnprepared{
|
|
|
+ errorFrame: errD,
|
|
|
+ StatementId: stmtId,
|
|
|
+ }, nil
|
|
|
+ default:
|
|
|
+ return errD, nil
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-// a frame is responsible for reading and writing frames on a single stream for
|
|
|
-type framer struct {
|
|
|
- r io.Reader
|
|
|
- w io.Writer
|
|
|
+func (f *framer) writeHeader(flags byte, op frameOp, stream int) {
|
|
|
+ f.buf = append(f.buf[0:],
|
|
|
+ f.proto,
|
|
|
+ flags,
|
|
|
+ )
|
|
|
|
|
|
- // the size which has been written or read of the body
|
|
|
- bodySize int
|
|
|
- proto byte
|
|
|
- buf []byte
|
|
|
+ if f.proto > protoVersion2 {
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(stream>>8),
|
|
|
+ byte(stream),
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(stream),
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ // pad out length
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(op),
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
-func (f *framer) encodeFrame(frame frame) []byte {
|
|
|
- header := frame.Header()
|
|
|
- hsize := header.HeaderSize()
|
|
|
- // TODO: can we reuse an underlying buf slice here instead of allocating a new
|
|
|
- // one for writeFrame?
|
|
|
- if cap(f.buf) > hsize {
|
|
|
- // make sure there is enough room for the header
|
|
|
- f.buf = f.buf[0:hsize]
|
|
|
+func (f *framer) setLength(length int) {
|
|
|
+ p := 4
|
|
|
+ if f.proto > protoVersion2 {
|
|
|
+ p = 5
|
|
|
}
|
|
|
|
|
|
- body := f.buf[hsize:]
|
|
|
+ f.buf[p+0] = byte(length >> 24)
|
|
|
+ f.buf[p+1] = byte(length >> 16)
|
|
|
+ f.buf[p+2] = byte(length >> 8)
|
|
|
+ f.buf[p+3] = byte(length)
|
|
|
+}
|
|
|
|
|
|
- // write body
|
|
|
- body = frame.appendBody()
|
|
|
+func (f *framer) finishWrite() error {
|
|
|
+ length := len(f.buf) - f.headSize
|
|
|
+ if f.flags&flagCompress == flagCompress && f.compres != nil {
|
|
|
+ compressed, err := f.compres.Encode(f.buf[f.headSize:])
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- // TODO: can this be done without a type switch and widthout having a SetSize()
|
|
|
- // method on the header?
|
|
|
- switch v := header.(type) {
|
|
|
- case *frameHeaderV1:
|
|
|
- v.lenth = len(body)
|
|
|
- case *frameHeaderV3:
|
|
|
- v.lenth = len(body)
|
|
|
- default:
|
|
|
- panic(fmt.Sprintf("encodeFrame: unknown header type: %T", v))
|
|
|
+ // assuming that len(compressed) < len(f.buf)
|
|
|
+ length = copy(f.buf[f.headSize:], compressed)
|
|
|
+ f.buf = f.buf[:f.headSize+length]
|
|
|
}
|
|
|
+ f.setLength(length)
|
|
|
+
|
|
|
+ _, err := f.w.Write(f.buf)
|
|
|
+ // log.Printf("OUT wrote=%d header=% X\n", n, f.buf[:f.headSize])
|
|
|
|
|
|
- _, err := header.Write(f.buf[0:hsize])
|
|
|
+ f.buf = f.buf[:0]
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
- return f.buf[:]
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
-// these are protocol level binary types
|
|
|
-func (f *framer) writeInt(n int) {
|
|
|
- f.buf = append(f.buf,
|
|
|
- byte(n>>24),
|
|
|
- byte(n>>16),
|
|
|
- byte(n>>8),
|
|
|
- byte(n),
|
|
|
- )
|
|
|
+type readyFrame struct {
|
|
|
+ frameHeader
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeShort(n int) {
|
|
|
- f.buf = append(f.buf,
|
|
|
- byte(n>>8),
|
|
|
- byte(n),
|
|
|
- )
|
|
|
+func (f *framer) parseReadyFrame() frame {
|
|
|
+ return &readyFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeString(s string) {
|
|
|
- f.writeShort(len(s))
|
|
|
- f.buf = append(f.buf, s...)
|
|
|
+type supportedFrame struct {
|
|
|
+ frameHeader
|
|
|
+
|
|
|
+ supported map[string][]string
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeLongString(s string) {
|
|
|
- f.writeInt(len(s))
|
|
|
- f.buf = append(f.buf, s...)
|
|
|
+// TODO: if we move the body buffer onto the frameHeader then we only need a single
|
|
|
+// framer, and can move the methods onto the header.
|
|
|
+func (f *framer) parseSupportedFrame() frame {
|
|
|
+ return &supportedFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+
|
|
|
+ supported: f.readStringMultiMap(),
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeUUID(u *UUID) {
|
|
|
- f.buf = append(f.buf, u[:]...)
|
|
|
+type writeStartupFrame struct {
|
|
|
+ opts map[string]string
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeStringList(l []string) {
|
|
|
- f.writeShort(len(l))
|
|
|
- for _, s := range l {
|
|
|
- f.writeString(s)
|
|
|
- }
|
|
|
+func (w *writeStartupFrame) writeFrame(framer *framer, streamID int) error {
|
|
|
+ return framer.writeStartupFrame(streamID, w.opts)
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeBytes(p []byte) {
|
|
|
- // TODO: handle null case correctly,
|
|
|
- // [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0,
|
|
|
- // no byte should follow and the value represented is `null`.
|
|
|
- if p == nil {
|
|
|
- f.writeInt(-1)
|
|
|
- } else {
|
|
|
- f.writeInt(len(p))
|
|
|
- f.buf = append(f.buf, p...)
|
|
|
+func (f *framer) writeStartupFrame(streamID int, options map[string]string) error {
|
|
|
+ // startup frame must not have the compress flag set
|
|
|
+ f.writeHeader(f.flags&^flagCompress, opStartup, streamID)
|
|
|
+ f.writeStringMap(options)
|
|
|
+ f.setLength(len(f.buf) - f.headSize)
|
|
|
+
|
|
|
+ // dont use finishWrite here as it will use compression
|
|
|
+ // TODO: have a type which has a writeHeader / writeBody so we can do
|
|
|
+ // transparent compression
|
|
|
+ _, err := f.w.Write(f.buf)
|
|
|
+ f.buf = f.buf[:0]
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-func (f *framer) writeShortBytes(p []byte) {
|
|
|
- f.writeShort(len(p))
|
|
|
- f.fbuf = append(f.buf, p...)
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
-// TODO: add writeOption, though no frame actually writes an option so probably
|
|
|
-// just need a read
|
|
|
+type writePrepareFrame struct {
|
|
|
+ statement string
|
|
|
+}
|
|
|
|
|
|
-func (f *framer) writeInet(ip net.IP, port int) {
|
|
|
- f.buf = append(f.buf,
|
|
|
- byte(len(ip)),
|
|
|
- ip...,
|
|
|
- )
|
|
|
- f.writeInt(port)
|
|
|
+func (w *writePrepareFrame) writeFrame(framer *framer, streamID int) error {
|
|
|
+ return framer.writePrepareFrame(streamID, w.statement)
|
|
|
}
|
|
|
|
|
|
-func (f *framer) writeConsistency(cons Consistency) {
|
|
|
- f.writeShort(cons)
|
|
|
+func (f *framer) writePrepareFrame(stream int, statement string) error {
|
|
|
+ f.writeHeader(f.flags, opPrepare, stream)
|
|
|
+ f.writeLongString(statement)
|
|
|
+ return f.finishWrite()
|
|
|
}
|
|
|
|
|
|
-// TODO: replace with a struct which has a header and a body buffer,
|
|
|
-// header just has methods like, set/get the options in its backing array
|
|
|
-// then in a writeTo we write the header then the body.
|
|
|
-// type frame []byte
|
|
|
+func (f *framer) readTypeInfo() *TypeInfo {
|
|
|
+ id := f.readShort()
|
|
|
+ typ := &TypeInfo{
|
|
|
+ // we need to pass proto to the marshaller here
|
|
|
+ Proto: f.proto,
|
|
|
+ Type: Type(id),
|
|
|
+ }
|
|
|
+
|
|
|
+ switch typ.Type {
|
|
|
+ case TypeCustom:
|
|
|
+ typ.Custom = f.readString()
|
|
|
+ if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
|
|
|
+ typ = &TypeInfo{
|
|
|
+ Proto: f.proto,
|
|
|
+ Type: cassType,
|
|
|
+ }
|
|
|
+ switch typ.Type {
|
|
|
+ case TypeMap:
|
|
|
+ typ.Key = f.readTypeInfo()
|
|
|
+ fallthrough
|
|
|
+ case TypeList, TypeSet:
|
|
|
+ typ.Elem = f.readTypeInfo()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case TypeMap:
|
|
|
+ typ.Key = f.readTypeInfo()
|
|
|
+ fallthrough
|
|
|
+ case TypeList, TypeSet:
|
|
|
+ typ.Elem = f.readTypeInfo()
|
|
|
+ }
|
|
|
|
|
|
-func newFrame(version uint8) frame {
|
|
|
- // TODO: pool these at the session level incase anyone is using different
|
|
|
- // clusters with different versions in the same application.
|
|
|
- return make(frame, headerProtoSize[version], defaultFrameSize)
|
|
|
+ return typ
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeInt(v int32) {
|
|
|
- p := f.grow(4)
|
|
|
- (*f)[p] = byte(v >> 24)
|
|
|
- (*f)[p+1] = byte(v >> 16)
|
|
|
- (*f)[p+2] = byte(v >> 8)
|
|
|
- (*f)[p+3] = byte(v)
|
|
|
+type resultMetadata struct {
|
|
|
+ flags int
|
|
|
+
|
|
|
+ // only if flagPageState
|
|
|
+ pagingState []byte
|
|
|
+
|
|
|
+ columns []ColumnInfo
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeShort(v uint16) {
|
|
|
- p := f.grow(2)
|
|
|
- (*f)[p] = byte(v >> 8)
|
|
|
- (*f)[p+1] = byte(v)
|
|
|
+func (r resultMetadata) String() string {
|
|
|
+ return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeString(v string) {
|
|
|
- f.writeShort(uint16(len(v)))
|
|
|
- p := f.grow(len(v))
|
|
|
- copy((*f)[p:], v)
|
|
|
+func (f *framer) parseResultMetadata() resultMetadata {
|
|
|
+ meta := resultMetadata{
|
|
|
+ flags: f.readInt(),
|
|
|
+ }
|
|
|
+
|
|
|
+ colCount := f.readInt()
|
|
|
+
|
|
|
+ if meta.flags&flagHasMorePages == flagHasMorePages {
|
|
|
+ meta.pagingState = f.readBytes()
|
|
|
+ }
|
|
|
+
|
|
|
+ if meta.flags&flagNoMetaData == flagNoMetaData {
|
|
|
+ return meta
|
|
|
+ }
|
|
|
+
|
|
|
+ var keyspace, table string
|
|
|
+ globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
|
|
|
+ if globalSpec {
|
|
|
+ keyspace = f.readString()
|
|
|
+ table = f.readString()
|
|
|
+ }
|
|
|
+
|
|
|
+ // log.Printf("flags=% X keyspace=%s table=%s\n", meta.flags, keyspace, table)
|
|
|
+ cols := make([]ColumnInfo, colCount)
|
|
|
+
|
|
|
+ for i := 0; i < colCount; i++ {
|
|
|
+ col := &cols[i]
|
|
|
+
|
|
|
+ if !globalSpec {
|
|
|
+ col.Keyspace = f.readString()
|
|
|
+ col.Table = f.readString()
|
|
|
+ } else {
|
|
|
+ col.Keyspace = keyspace
|
|
|
+ col.Table = table
|
|
|
+ }
|
|
|
+
|
|
|
+ col.Name = f.readString()
|
|
|
+ col.TypeInfo = f.readTypeInfo()
|
|
|
+ }
|
|
|
+
|
|
|
+ meta.columns = cols
|
|
|
+
|
|
|
+ return meta
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeLongString(v string) {
|
|
|
- f.writeInt(int32(len(v)))
|
|
|
- p := f.grow(len(v))
|
|
|
- copy((*f)[p:], v)
|
|
|
+type resultVoidFrame struct {
|
|
|
+ frameHeader
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeUUID() {
|
|
|
+func (f *resultVoidFrame) String() string {
|
|
|
+ return "[result_void]"
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeStringList(v []string) {
|
|
|
- f.writeShort(uint16(len(v)))
|
|
|
- for i := range v {
|
|
|
- f.writeString(v[i])
|
|
|
+func (f *framer) parseResultFrame() (frame, error) {
|
|
|
+ kind := f.readInt()
|
|
|
+
|
|
|
+ switch kind {
|
|
|
+ case resultKindVoid:
|
|
|
+ return &resultVoidFrame{frameHeader: *f.header}, nil
|
|
|
+ case resultKindRows:
|
|
|
+ return f.parseResultRows(), nil
|
|
|
+ case resultKindKeyspace:
|
|
|
+ return f.parseResultSetKeyspace(), nil
|
|
|
+ case resultKindPrepared:
|
|
|
+ return f.parseResultPrepared(), nil
|
|
|
+ case resultKindSchemaChanged:
|
|
|
+ return f.parseResultSchemaChange(), nil
|
|
|
}
|
|
|
+
|
|
|
+ return nil, NewErrProtocol("unknown result kind: %x", kind)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeByte(v byte) {
|
|
|
- p := f.grow(1)
|
|
|
- (*f)[p] = v
|
|
|
+type resultRowsFrame struct {
|
|
|
+ frameHeader
|
|
|
+
|
|
|
+ meta resultMetadata
|
|
|
+ rows [][][]byte
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeBytes(v []byte) {
|
|
|
- if v == nil {
|
|
|
- f.writeInt(-1)
|
|
|
- return
|
|
|
+func (f *resultRowsFrame) String() string {
|
|
|
+ return "[result_rows]"
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) parseResultRows() frame {
|
|
|
+ meta := f.parseResultMetadata()
|
|
|
+ // log.Printf("parsed meta=%v\n", meta)
|
|
|
+
|
|
|
+ numRows := f.readInt()
|
|
|
+ colCount := len(meta.columns)
|
|
|
+
|
|
|
+ rows := make([][][]byte, numRows)
|
|
|
+ for i := 0; i < numRows; i++ {
|
|
|
+ rows[i] = make([][]byte, colCount)
|
|
|
+ for j := 0; j < colCount; j++ {
|
|
|
+ rows[i][j] = f.readBytes()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return &resultRowsFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ meta: meta,
|
|
|
+ rows: rows,
|
|
|
}
|
|
|
- f.writeInt(int32(len(v)))
|
|
|
- p := f.grow(len(v))
|
|
|
- copy((*f)[p:], v)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeShortBytes(v []byte) {
|
|
|
- f.writeShort(uint16(len(v)))
|
|
|
- p := f.grow(len(v))
|
|
|
- copy((*f)[p:], v)
|
|
|
+type resultKeyspaceFrame struct {
|
|
|
+ frameHeader
|
|
|
+ keyspace string
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeInet(ip net.IP, port int) {
|
|
|
- p := f.grow(1 + len(ip))
|
|
|
- (*f)[p] = byte(len(ip))
|
|
|
- copy((*f)[p+1:], ip)
|
|
|
- f.writeInt(int32(port))
|
|
|
+func (r *resultKeyspaceFrame) String() string {
|
|
|
+ return fmt.Sprintf("[result_keyspace keyspace=%s]", r.keyspace)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeStringMap(v map[string]string) {
|
|
|
- f.writeShort(uint16(len(v)))
|
|
|
- for key, value := range v {
|
|
|
- f.writeString(key)
|
|
|
- f.writeString(value)
|
|
|
+func (f *framer) parseResultSetKeyspace() frame {
|
|
|
+ return &resultKeyspaceFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ keyspace: f.readString(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeStringMultimap(v map[string][]string) {
|
|
|
- f.writeShort(uint16(len(v)))
|
|
|
- for key, values := range v {
|
|
|
- f.writeString(key)
|
|
|
- f.writeStringList(values)
|
|
|
- }
|
|
|
+type resultPreparedFrame struct {
|
|
|
+ frameHeader
|
|
|
+ preparedID []byte
|
|
|
+ reqMeta resultMetadata
|
|
|
+ respMeta resultMetadata
|
|
|
}
|
|
|
|
|
|
-func (f *frame) setHeader(version, flags uint8, stream int, opcode uint8) {
|
|
|
- (*f)[0] = version
|
|
|
- (*f)[1] = flags
|
|
|
- p := 2
|
|
|
- if version&maskVersion > protoVersion2 {
|
|
|
- (*f)[2] = byte(stream >> 8)
|
|
|
- (*f)[3] = byte(stream)
|
|
|
- p += 2
|
|
|
- } else {
|
|
|
- (*f)[2] = byte(stream & 0xFF)
|
|
|
- p++
|
|
|
+func (f *framer) parseResultPrepared() frame {
|
|
|
+ frame := &resultPreparedFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ preparedID: f.readShortBytes(),
|
|
|
+ reqMeta: f.parseResultMetadata(),
|
|
|
}
|
|
|
|
|
|
- (*f)[p] = opcode
|
|
|
+ if f.proto < protoVersion2 {
|
|
|
+ return frame
|
|
|
+ }
|
|
|
+
|
|
|
+ frame.respMeta = f.parseResultMetadata()
|
|
|
+
|
|
|
+ return frame
|
|
|
}
|
|
|
|
|
|
-func (f *frame) setStream(stream int, version uint8) {
|
|
|
- if version > protoVersion2 {
|
|
|
- (*f)[2] = byte(stream >> 8)
|
|
|
- (*f)[3] = byte(stream)
|
|
|
- } else {
|
|
|
- (*f)[2] = byte(stream)
|
|
|
- }
|
|
|
+type resultSchemaChangeFrame struct {
|
|
|
+ frameHeader
|
|
|
+
|
|
|
+ change string
|
|
|
+ keyspace string
|
|
|
+ table string
|
|
|
}
|
|
|
|
|
|
-func (f *frame) Stream(version uint8) (n int) {
|
|
|
- if version > protoVersion2 {
|
|
|
- n = int((*f)[2])<<8 | int((*f)[3])
|
|
|
+func (s *resultSchemaChangeFrame) String() string {
|
|
|
+ return fmt.Sprintf("[result_schema_change change=%s keyspace=%s table=%s]", s.change, s.keyspace, s.table)
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) parseResultSchemaChange() frame {
|
|
|
+ frame := &resultSchemaChangeFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ }
|
|
|
+
|
|
|
+ if f.proto < protoVersion3 {
|
|
|
+ frame.change = f.readString()
|
|
|
+ frame.keyspace = f.readString()
|
|
|
+ frame.table = f.readString()
|
|
|
} else {
|
|
|
- n = int((*f)[2])
|
|
|
+ // TODO: improve type representation of this
|
|
|
+ frame.change = f.readString()
|
|
|
+ target := f.readString()
|
|
|
+ switch target {
|
|
|
+ case "KEYSPACE":
|
|
|
+ frame.keyspace = f.readString()
|
|
|
+ case "TABLE", "TYPE":
|
|
|
+ frame.keyspace = f.readString()
|
|
|
+ frame.table = f.readString()
|
|
|
+ }
|
|
|
}
|
|
|
- return
|
|
|
+
|
|
|
+ return frame
|
|
|
}
|
|
|
|
|
|
-func (f *frame) setLength(length int, version uint8) {
|
|
|
- p := 4
|
|
|
- if version > protoVersion2 {
|
|
|
- p = 5
|
|
|
- }
|
|
|
+type authenticateFrame struct {
|
|
|
+ frameHeader
|
|
|
|
|
|
- (*f)[p] = byte(length >> 24)
|
|
|
- (*f)[p+1] = byte(length >> 16)
|
|
|
- (*f)[p+2] = byte(length >> 8)
|
|
|
- (*f)[p+3] = byte(length)
|
|
|
+ class string
|
|
|
}
|
|
|
|
|
|
-func (f *frame) Op(version uint8) byte {
|
|
|
- if version > protoVersion2 {
|
|
|
- return (*f)[4]
|
|
|
- } else {
|
|
|
- return (*f)[3]
|
|
|
+func (f *framer) parseAuthenticateFrame() frame {
|
|
|
+ return &authenticateFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ class: f.readString(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (f *frame) Length(version uint8) int {
|
|
|
- p := 4
|
|
|
- if version > protoVersion2 {
|
|
|
- p = 5
|
|
|
- }
|
|
|
+type authSuccessFrame struct {
|
|
|
+ frameHeader
|
|
|
|
|
|
- return int((*f)[p])<<24 | int((*f)[p+1])<<16 | int((*f)[p+2])<<8 | int((*f)[p+3])
|
|
|
+ data []byte
|
|
|
}
|
|
|
|
|
|
-func (f *frame) grow(n int) int {
|
|
|
- if len(*f)+n >= cap(*f) {
|
|
|
- buf := make(frame, len(*f), len(*f)*2+n)
|
|
|
- copy(buf, *f)
|
|
|
- *f = buf
|
|
|
+func (f *framer) parseAuthSuccessFrame() frame {
|
|
|
+ return &authSuccessFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ data: f.readBytes(),
|
|
|
}
|
|
|
- p := len(*f)
|
|
|
- *f = (*f)[:p+n]
|
|
|
- return p
|
|
|
}
|
|
|
|
|
|
-func (f *frame) skipHeader(version uint8) {
|
|
|
- *f = (*f)[headerProtoSize[version]:]
|
|
|
+type authChallengeFrame struct {
|
|
|
+ frameHeader
|
|
|
+
|
|
|
+ data []byte
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readInt() int {
|
|
|
- if len(*f) < 4 {
|
|
|
- panic(NewErrProtocol("Trying to read an int while <4 bytes in the buffer"))
|
|
|
+func (f *framer) parseAuthChallengeFrame() frame {
|
|
|
+ return &authChallengeFrame{
|
|
|
+ frameHeader: *f.header,
|
|
|
+ data: f.readBytes(),
|
|
|
}
|
|
|
- v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
|
|
|
- *f = (*f)[4:]
|
|
|
- return int(int32(v))
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readShort() uint16 {
|
|
|
- if len(*f) < 2 {
|
|
|
- panic(NewErrProtocol("Trying to read a short while <2 bytes in the buffer"))
|
|
|
- }
|
|
|
- v := uint16((*f)[0])<<8 | uint16((*f)[1])
|
|
|
- *f = (*f)[2:]
|
|
|
- return v
|
|
|
+type writeAuthResponseFrame struct {
|
|
|
+ data []byte
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readString() string {
|
|
|
- n := int(f.readShort())
|
|
|
- if len(*f) < n {
|
|
|
- panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
|
|
|
- }
|
|
|
- v := string((*f)[:n])
|
|
|
- *f = (*f)[n:]
|
|
|
- return v
|
|
|
+func (a *writeAuthResponseFrame) String() string {
|
|
|
+ return fmt.Sprintf("[auth_response data=%q]", a.data)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readLongString() string {
|
|
|
- n := f.readInt()
|
|
|
- if len(*f) < n {
|
|
|
- panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
|
|
|
- }
|
|
|
- v := string((*f)[:n])
|
|
|
- *f = (*f)[n:]
|
|
|
- return v
|
|
|
+func (a *writeAuthResponseFrame) writeFrame(framer *framer, streamID int) error {
|
|
|
+ return framer.writeAuthResponseFrame(streamID, a.data)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readBytes() []byte {
|
|
|
- n := f.readInt()
|
|
|
- if n < 0 {
|
|
|
- return nil
|
|
|
- }
|
|
|
- if len(*f) < n {
|
|
|
- panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
|
|
|
- }
|
|
|
- v := (*f)[:n]
|
|
|
- *f = (*f)[n:]
|
|
|
- return v
|
|
|
+func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error {
|
|
|
+ f.writeHeader(f.flags, opAuthResponse, streamID)
|
|
|
+ f.writeBytes(data)
|
|
|
+ return f.finishWrite()
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readShortBytes() []byte {
|
|
|
- n := int(f.readShort())
|
|
|
- if len(*f) < n {
|
|
|
- panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
|
|
|
- }
|
|
|
- v := (*f)[:n]
|
|
|
- *f = (*f)[n:]
|
|
|
- return v
|
|
|
+type queryValues struct {
|
|
|
+ value []byte
|
|
|
+ // optional name, will set With names for values flag
|
|
|
+ name string
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readTypeInfo(version uint8) *TypeInfo {
|
|
|
- x := f.readShort()
|
|
|
- typ := &TypeInfo{
|
|
|
- Proto: version,
|
|
|
- Type: Type(x),
|
|
|
- }
|
|
|
- switch typ.Type {
|
|
|
- case TypeCustom:
|
|
|
- typ.Custom = f.readString()
|
|
|
- if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
|
|
|
- typ = &TypeInfo{Type: cassType}
|
|
|
- switch typ.Type {
|
|
|
- case TypeMap:
|
|
|
- typ.Key = f.readTypeInfo(version)
|
|
|
- fallthrough
|
|
|
- case TypeList, TypeSet:
|
|
|
- typ.Elem = f.readTypeInfo(version)
|
|
|
- }
|
|
|
- }
|
|
|
- case TypeMap:
|
|
|
- typ.Key = f.readTypeInfo(version)
|
|
|
- fallthrough
|
|
|
- case TypeList, TypeSet:
|
|
|
- typ.Elem = f.readTypeInfo(version)
|
|
|
- }
|
|
|
- return typ
|
|
|
+type queryParams struct {
|
|
|
+ consistency Consistency
|
|
|
+ // v2+
|
|
|
+ skipMeta bool
|
|
|
+ values []queryValues
|
|
|
+ pageSize int
|
|
|
+ pagingState []byte
|
|
|
+ serialConsistency Consistency
|
|
|
+ // v3+
|
|
|
+ timestamp *time.Time
|
|
|
+}
|
|
|
+
|
|
|
+func (q queryParams) String() string {
|
|
|
+ return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v timestamp=%v values=%v]",
|
|
|
+ q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.timestamp, q.values)
|
|
|
}
|
|
|
|
|
|
-func (f *frame) readMetaData(version uint8) ([]ColumnInfo, []byte) {
|
|
|
- flags := f.readInt()
|
|
|
- numColumns := f.readInt()
|
|
|
+func (f *framer) writeQueryParams(opts *queryParams) {
|
|
|
+ f.writeConsistency(opts.consistency)
|
|
|
|
|
|
- var pageState []byte
|
|
|
- if flags&2 != 0 {
|
|
|
- pageState = f.readBytes()
|
|
|
+ if f.proto == protoVersion1 {
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
- globalKeyspace := ""
|
|
|
- globalTable := ""
|
|
|
- if flags&1 != 0 {
|
|
|
- globalKeyspace = f.readString()
|
|
|
- globalTable = f.readString()
|
|
|
+ var flags byte
|
|
|
+ if len(opts.values) > 0 {
|
|
|
+ flags |= flagQueryValues
|
|
|
}
|
|
|
-
|
|
|
- columns := make([]ColumnInfo, numColumns)
|
|
|
- for i := 0; i < numColumns; i++ {
|
|
|
- columns[i].Keyspace = globalKeyspace
|
|
|
- columns[i].Table = globalTable
|
|
|
- if flags&1 == 0 {
|
|
|
- columns[i].Keyspace = f.readString()
|
|
|
- columns[i].Table = f.readString()
|
|
|
- }
|
|
|
- columns[i].Name = f.readString()
|
|
|
- columns[i].TypeInfo = f.readTypeInfo(version)
|
|
|
+ if opts.skipMeta {
|
|
|
+ flags |= flagSkipMetaData
|
|
|
+ }
|
|
|
+ if opts.pageSize > 0 {
|
|
|
+ flags |= flagPageSize
|
|
|
+ }
|
|
|
+ if len(opts.pagingState) > 0 {
|
|
|
+ flags |= flagPageState
|
|
|
+ }
|
|
|
+ if opts.serialConsistency > 0 {
|
|
|
+ flags |= flagWithSerialConsistency
|
|
|
}
|
|
|
- return columns, pageState
|
|
|
-}
|
|
|
|
|
|
-func (f *frame) readError() RequestError {
|
|
|
- code := f.readInt()
|
|
|
- msg := f.readString()
|
|
|
- errD := errorFrame{code, msg}
|
|
|
- switch code {
|
|
|
- case errUnavailable:
|
|
|
- cl := Consistency(f.readShort())
|
|
|
- required := f.readInt()
|
|
|
- alive := f.readInt()
|
|
|
- return RequestErrUnavailable{errorFrame: errD,
|
|
|
- Consistency: cl,
|
|
|
- Required: required,
|
|
|
- Alive: alive}
|
|
|
- case errWriteTimeout:
|
|
|
- cl := Consistency(f.readShort())
|
|
|
- received := f.readInt()
|
|
|
- blockfor := f.readInt()
|
|
|
- writeType := f.readString()
|
|
|
- return RequestErrWriteTimeout{errorFrame: errD,
|
|
|
- Consistency: cl,
|
|
|
- Received: received,
|
|
|
- BlockFor: blockfor,
|
|
|
- WriteType: writeType,
|
|
|
- }
|
|
|
- case errReadTimeout:
|
|
|
- cl := Consistency(f.readShort())
|
|
|
- received := f.readInt()
|
|
|
- blockfor := f.readInt()
|
|
|
- dataPresent := (*f)[0]
|
|
|
- *f = (*f)[1:]
|
|
|
- return RequestErrReadTimeout{errorFrame: errD,
|
|
|
- Consistency: cl,
|
|
|
- Received: received,
|
|
|
- BlockFor: blockfor,
|
|
|
- DataPresent: dataPresent,
|
|
|
+ names := false
|
|
|
+
|
|
|
+ // protoV3 specific things
|
|
|
+ if f.proto > protoVersion2 {
|
|
|
+ if opts.timestamp != nil {
|
|
|
+ flags |= flagDefaultTimestamp
|
|
|
}
|
|
|
- case errAlreadyExists:
|
|
|
- ks := f.readString()
|
|
|
- table := f.readString()
|
|
|
- return RequestErrAlreadyExists{errorFrame: errD,
|
|
|
- Keyspace: ks,
|
|
|
- Table: table,
|
|
|
+ if len(opts.values) > 0 && opts.values[0].name != "" {
|
|
|
+ flags |= flagWithNameValues
|
|
|
+ names = true
|
|
|
}
|
|
|
- case errUnprepared:
|
|
|
- stmtId := f.readShortBytes()
|
|
|
- return RequestErrUnprepared{errorFrame: errD,
|
|
|
- StatementId: stmtId,
|
|
|
+ }
|
|
|
+
|
|
|
+ f.writeByte(flags)
|
|
|
+
|
|
|
+ if n := len(opts.values); n > 0 {
|
|
|
+ f.writeShort(n)
|
|
|
+ for i := 0; i < n; i++ {
|
|
|
+ if names {
|
|
|
+ f.writeString(opts.values[i].name)
|
|
|
+ }
|
|
|
+ f.writeBytes(opts.values[i].value)
|
|
|
}
|
|
|
- default:
|
|
|
- return errD
|
|
|
+ }
|
|
|
+
|
|
|
+ if opts.pageSize > 0 {
|
|
|
+ f.writeInt(opts.pageSize)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(opts.pagingState) > 0 {
|
|
|
+ f.writeBytes(opts.pagingState)
|
|
|
+ }
|
|
|
+
|
|
|
+ if opts.serialConsistency > 0 {
|
|
|
+ f.writeConsistency(opts.serialConsistency)
|
|
|
+ }
|
|
|
+
|
|
|
+ if f.proto > protoVersion2 && opts.timestamp != nil {
|
|
|
+ // timestamp in microseconds
|
|
|
+ // TODO: should the timpestamp be set on the queryParams or should we set
|
|
|
+ // it here?
|
|
|
+ ts := opts.timestamp.UnixNano() / 1000
|
|
|
+ f.writeLong(ts)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (f *frame) writeConsistency(c Consistency) {
|
|
|
- f.writeShort(consistencyCodes[c])
|
|
|
+type writeQueryFrame struct {
|
|
|
+ statement string
|
|
|
+ params queryParams
|
|
|
}
|
|
|
|
|
|
-func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
|
|
|
- return f, nil
|
|
|
+func (w *writeQueryFrame) String() string {
|
|
|
+ return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params)
|
|
|
}
|
|
|
|
|
|
-var consistencyCodes = []uint16{
|
|
|
- Any: 0x0000,
|
|
|
- One: 0x0001,
|
|
|
- Two: 0x0002,
|
|
|
- Three: 0x0003,
|
|
|
- Quorum: 0x0004,
|
|
|
- All: 0x0005,
|
|
|
- LocalQuorum: 0x0006,
|
|
|
- EachQuorum: 0x0007,
|
|
|
- Serial: 0x0008,
|
|
|
- LocalSerial: 0x0009,
|
|
|
- LocalOne: 0x000A,
|
|
|
+func (w *writeQueryFrame) writeFrame(framer *framer, streamID int) error {
|
|
|
+ return framer.writeQueryFrame(streamID, w.statement, &w.params)
|
|
|
}
|
|
|
|
|
|
-type readyFrame struct{}
|
|
|
+func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams) error {
|
|
|
+ f.writeHeader(f.flags, opQuery, streamID)
|
|
|
+ f.writeLongString(statement)
|
|
|
+ f.writeQueryParams(params)
|
|
|
|
|
|
-type supportedFrame struct{}
|
|
|
+ return f.finishWrite()
|
|
|
+}
|
|
|
|
|
|
-type resultVoidFrame struct{}
|
|
|
+type frameWriter interface {
|
|
|
+ writeFrame(framer *framer, streamID int) error
|
|
|
+}
|
|
|
|
|
|
-type resultRowsFrame struct {
|
|
|
- Columns []ColumnInfo
|
|
|
- Rows [][][]byte
|
|
|
- PagingState []byte
|
|
|
+type writeExecuteFrame struct {
|
|
|
+ preparedID []byte
|
|
|
+ params queryParams
|
|
|
}
|
|
|
|
|
|
-type resultKeyspaceFrame struct {
|
|
|
- Keyspace string
|
|
|
+func (e *writeExecuteFrame) String() string {
|
|
|
+ return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params)
|
|
|
}
|
|
|
|
|
|
-type resultPreparedFrame struct {
|
|
|
- PreparedId []byte
|
|
|
- Arguments []ColumnInfo
|
|
|
- ReturnValues []ColumnInfo
|
|
|
+func (e *writeExecuteFrame) writeFrame(fr *framer, streamID int) error {
|
|
|
+ return fr.writeExecuteFrame(streamID, e.preparedID, &e.params)
|
|
|
}
|
|
|
|
|
|
-type operation interface {
|
|
|
- encodeFrame(version uint8, dst frame) (frame, error)
|
|
|
+func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams) error {
|
|
|
+ f.writeHeader(f.flags, opExecute, streamID)
|
|
|
+ f.writeShortBytes(preparedID)
|
|
|
+ if f.proto > protoVersion1 {
|
|
|
+ f.writeQueryParams(params)
|
|
|
+ } else {
|
|
|
+ n := len(params.values)
|
|
|
+ f.writeShort(n)
|
|
|
+ for i := 0; i < n; i++ {
|
|
|
+ f.writeBytes(params.values[i].value)
|
|
|
+ }
|
|
|
+ f.writeConsistency(params.consistency)
|
|
|
+ }
|
|
|
+
|
|
|
+ return f.finishWrite()
|
|
|
}
|
|
|
|
|
|
-type startupFrame struct {
|
|
|
- CQLVersion string
|
|
|
- Compression string
|
|
|
+// TODO: can we replace BatchStatemt with batchStatement? As they prety much
|
|
|
+// duplicate each other
|
|
|
+type batchStatment struct {
|
|
|
+ preparedID []byte
|
|
|
+ statement string
|
|
|
+ values []queryValues
|
|
|
}
|
|
|
|
|
|
-func (op *startupFrame) String() string {
|
|
|
- return fmt.Sprintf("[startup cqlversion=%q compression=%q]", op.CQLVersion, op.Compression)
|
|
|
+type writeBatchFrame struct {
|
|
|
+ typ BatchType
|
|
|
+ statements []batchStatment
|
|
|
+ consistency Consistency
|
|
|
+ serialConsistency Consistency
|
|
|
+ defaultTimestamp bool
|
|
|
}
|
|
|
|
|
|
-func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
|
|
|
- if f == nil {
|
|
|
- f = newFrame(version)
|
|
|
- }
|
|
|
+func (w *writeBatchFrame) writeFrame(framer *framer, streamID int) error {
|
|
|
+ return framer.writeBatchFrame(streamID, w)
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame) error {
|
|
|
+ f.writeHeader(f.flags, opBatch, streamID)
|
|
|
+ f.writeByte(byte(w.typ))
|
|
|
+
|
|
|
+ n := len(w.statements)
|
|
|
+ f.writeShort(n)
|
|
|
|
|
|
- f.setHeader(version, 0, 0, opStartup)
|
|
|
+ var flags byte
|
|
|
|
|
|
- // TODO: fix this, this is actually a StringMap
|
|
|
- var size uint16 = 1
|
|
|
- if op.Compression != "" {
|
|
|
- size++
|
|
|
+ for i := 0; i < n; i++ {
|
|
|
+ b := &w.statements[i]
|
|
|
+ if len(b.preparedID) == 0 {
|
|
|
+ f.writeByte(0)
|
|
|
+ f.writeLongString(b.statement)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ f.writeByte(1)
|
|
|
+ f.writeShortBytes(b.preparedID)
|
|
|
+ f.writeShort(len(b.values))
|
|
|
+ for j := range b.values {
|
|
|
+ col := &b.values[j]
|
|
|
+ if f.proto > protoVersion2 && col.name != "" {
|
|
|
+ // TODO: move this check into the caller and set a flag on writeBatchFrame
|
|
|
+ // to indicate using named values
|
|
|
+ flags |= flagWithNameValues
|
|
|
+ f.writeString(col.name)
|
|
|
+ }
|
|
|
+ f.writeBytes(col.value)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- f.writeShort(size)
|
|
|
- f.writeString("CQL_VERSION")
|
|
|
- f.writeString(op.CQLVersion)
|
|
|
+ f.writeConsistency(w.consistency)
|
|
|
+
|
|
|
+ if f.proto > protoVersion2 {
|
|
|
+ if w.serialConsistency > 0 {
|
|
|
+ flags |= flagWithSerialConsistency
|
|
|
+ }
|
|
|
+ if w.defaultTimestamp {
|
|
|
+ flags |= flagDefaultTimestamp
|
|
|
+ }
|
|
|
|
|
|
- if op.Compression != "" {
|
|
|
- f.writeString("COMPRESSION")
|
|
|
- f.writeString(op.Compression)
|
|
|
+ f.writeByte(flags)
|
|
|
+
|
|
|
+ if w.serialConsistency > 0 {
|
|
|
+ f.writeConsistency(w.serialConsistency)
|
|
|
+ }
|
|
|
+ if w.defaultTimestamp {
|
|
|
+ now := time.Now().UnixNano() / 1000
|
|
|
+ f.writeLong(now)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- return f, nil
|
|
|
+ return f.finishWrite()
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) readByte() byte {
|
|
|
+ b := f.buf[0]
|
|
|
+ f.buf = f.buf[1:]
|
|
|
+ return b
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) readInt() (n int) {
|
|
|
+ n = int(int32(f.buf[0])<<24 | int32(f.buf[1])<<16 | int32(f.buf[2])<<8 | int32(f.buf[3]))
|
|
|
+ f.buf = f.buf[4:]
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) readShort() (n uint16) {
|
|
|
+ n = uint16(f.buf[0])<<8 | uint16(f.buf[1])
|
|
|
+ f.buf = f.buf[2:]
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) readLong() (n int64) {
|
|
|
+ n = int64(f.buf[0])<<56 | int64(f.buf[1])<<48 | int64(f.buf[2])<<40 | int64(f.buf[3])<<32 |
|
|
|
+ int64(f.buf[4])<<24 | int64(f.buf[5])<<16 | int64(f.buf[6])<<8 | int64(f.buf[7])
|
|
|
+ f.buf = f.buf[8:]
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
-type queryFrame struct {
|
|
|
- Stmt string
|
|
|
- Prepared []byte
|
|
|
- Cons Consistency
|
|
|
- Values [][]byte
|
|
|
- PageSize int
|
|
|
- PageState []byte
|
|
|
+func (f *framer) readString() (s string) {
|
|
|
+ size := f.readShort()
|
|
|
+ s = string(f.buf[:size])
|
|
|
+ f.buf = f.buf[size:]
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
-func (op *queryFrame) String() string {
|
|
|
- return fmt.Sprintf("[query statement=%q prepared=%x cons=%v ...]", op.Stmt, op.Prepared, op.Cons)
|
|
|
+func (f *framer) longString() (s string) {
|
|
|
+ size := f.readInt()
|
|
|
+ s = string(f.buf[:size])
|
|
|
+ f.buf = f.buf[size:]
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
-func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
|
|
|
- if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
|
|
|
- (len(op.Values) > 0 && len(op.Prepared) == 0)) {
|
|
|
- return nil, ErrUnsupported
|
|
|
+func (f *framer) readUUID() (u *UUID) {
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) readStringList() []string {
|
|
|
+ size := f.readShort()
|
|
|
+
|
|
|
+ l := make([]string, size)
|
|
|
+ for i := 0; i < int(size); i++ {
|
|
|
+ l[i] = f.readString()
|
|
|
}
|
|
|
|
|
|
- if f == nil {
|
|
|
- f = newFrame(version)
|
|
|
+ return l
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) readBytes() []byte {
|
|
|
+ size := f.readInt()
|
|
|
+ if size < 0 {
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
- if len(op.Prepared) > 0 {
|
|
|
- f.setHeader(version, 0, 0, opExecute)
|
|
|
- f.writeShortBytes(op.Prepared)
|
|
|
- } else {
|
|
|
- f.setHeader(version, 0, 0, opQuery)
|
|
|
- f.writeLongString(op.Stmt)
|
|
|
+ // TODO: should we use copy here? this way it is consistent with the rest of
|
|
|
+ // the readers which do not retain a pointer to the underlying buffer.
|
|
|
+ l := make([]byte, size)
|
|
|
+ n := copy(l, f.buf)
|
|
|
+ if n != size {
|
|
|
+ panic(fmt.Sprintf("not enough bytes to bytes: size=%d read=%d", size, n))
|
|
|
}
|
|
|
|
|
|
- if version >= 2 {
|
|
|
- f.writeConsistency(op.Cons)
|
|
|
- flagPos := len(f)
|
|
|
- f.writeByte(0)
|
|
|
+ f.buf = f.buf[n:]
|
|
|
|
|
|
- if len(op.Values) > 0 {
|
|
|
- f[flagPos] |= flagQueryValues
|
|
|
- f.writeShort(uint16(len(op.Values)))
|
|
|
- for _, value := range op.Values {
|
|
|
- f.writeBytes(value)
|
|
|
- }
|
|
|
- }
|
|
|
+ return l
|
|
|
+}
|
|
|
|
|
|
- if op.PageSize > 0 {
|
|
|
- f[flagPos] |= flagPageSize
|
|
|
- f.writeInt(int32(op.PageSize))
|
|
|
- }
|
|
|
+func (f *framer) readShortBytes() []byte {
|
|
|
+ size := f.readShort()
|
|
|
|
|
|
- if len(op.PageState) > 0 {
|
|
|
- f[flagPos] |= flagPageState
|
|
|
- f.writeBytes(op.PageState)
|
|
|
- }
|
|
|
- } else if version == 1 {
|
|
|
- if len(op.Prepared) > 0 {
|
|
|
- f.writeShort(uint16(len(op.Values)))
|
|
|
- for _, value := range op.Values {
|
|
|
- f.writeBytes(value)
|
|
|
- }
|
|
|
- }
|
|
|
- f.writeConsistency(op.Cons)
|
|
|
+ // TODO: should we use copy here? this way it is consistent with the rest of
|
|
|
+ // the readers which do not retain a pointer to the underlying buffer.
|
|
|
+ l := make([]byte, size)
|
|
|
+ n := copy(l, f.buf)
|
|
|
+ if n != int(size) {
|
|
|
+ panic("not enough space in buffer to read bytes")
|
|
|
}
|
|
|
|
|
|
- return f, nil
|
|
|
+ f.buf = f.buf[n:]
|
|
|
+
|
|
|
+ return l
|
|
|
}
|
|
|
|
|
|
-type prepareFrame struct {
|
|
|
- Stmt string
|
|
|
+func (f *framer) readInet() (net.IP, int) {
|
|
|
+ size := f.buf[0]
|
|
|
+ f.buf = f.buf[1:]
|
|
|
+
|
|
|
+ if !(size == 4 || size == 16) {
|
|
|
+ panic(fmt.Sprintf("invalid IP size: %d", size))
|
|
|
+ }
|
|
|
+
|
|
|
+ ip := make([]byte, size)
|
|
|
+ copy(ip, f.buf)
|
|
|
+
|
|
|
+ f.buf = f.buf[size:]
|
|
|
+
|
|
|
+ port := f.readInt()
|
|
|
+ return net.IP(ip), port
|
|
|
}
|
|
|
|
|
|
-func (op *prepareFrame) String() string {
|
|
|
- return fmt.Sprintf("[prepare statement=%q]", op.Stmt)
|
|
|
+func (f *framer) readConsistency() Consistency {
|
|
|
+ return Consistency(f.readShort())
|
|
|
}
|
|
|
|
|
|
-func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
|
|
|
- if f == nil {
|
|
|
- f = newFrame(version)
|
|
|
+func (f *framer) readStringMap() map[string]string {
|
|
|
+ size := f.readShort()
|
|
|
+ m := make(map[string]string)
|
|
|
+
|
|
|
+ for i := 0; i < int(size); i++ {
|
|
|
+ k := f.readString()
|
|
|
+ v := f.readString()
|
|
|
+ m[k] = v
|
|
|
}
|
|
|
- f.setHeader(version, 0, 0, opPrepare)
|
|
|
- f.writeLongString(op.Stmt)
|
|
|
- return f, nil
|
|
|
+
|
|
|
+ return m
|
|
|
}
|
|
|
|
|
|
-type optionsFrame struct{}
|
|
|
+func (f *framer) readStringMultiMap() map[string][]string {
|
|
|
+ size := f.readShort()
|
|
|
+ m := make(map[string][]string)
|
|
|
|
|
|
-func (op *optionsFrame) String() string {
|
|
|
- return "[options]"
|
|
|
+ for i := 0; i < int(size); i++ {
|
|
|
+ k := f.readString()
|
|
|
+ v := f.readStringList()
|
|
|
+ m[k] = v
|
|
|
+ }
|
|
|
+
|
|
|
+ return m
|
|
|
}
|
|
|
|
|
|
-func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
|
|
|
- if f == nil {
|
|
|
- f = newFrame(version)
|
|
|
- }
|
|
|
- f.setHeader(version, 0, 0, opOptions)
|
|
|
- return f, nil
|
|
|
+func (f *framer) writeByte(b byte) {
|
|
|
+ f.buf = append(f.buf, b)
|
|
|
}
|
|
|
|
|
|
-type authenticateFrame struct {
|
|
|
- Authenticator string
|
|
|
+// these are protocol level binary types
|
|
|
+func (f *framer) writeInt(n int) {
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(n>>24),
|
|
|
+ byte(n>>16),
|
|
|
+ byte(n>>8),
|
|
|
+ byte(n),
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) writeShort(n int) {
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(n>>8),
|
|
|
+ byte(n),
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) writeLong(n int64) {
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(n>>56),
|
|
|
+ byte(n>>48),
|
|
|
+ byte(n>>40),
|
|
|
+ byte(n>>32),
|
|
|
+ byte(n>>24),
|
|
|
+ byte(n>>16),
|
|
|
+ byte(n>>8),
|
|
|
+ byte(n),
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
-func (op *authenticateFrame) String() string {
|
|
|
- return fmt.Sprintf("[authenticate authenticator=%q]", op.Authenticator)
|
|
|
+func (f *framer) writeString(s string) {
|
|
|
+ f.writeShort(len(s))
|
|
|
+ f.buf = append(f.buf, s...)
|
|
|
}
|
|
|
|
|
|
-type authResponseFrame struct {
|
|
|
- Data []byte
|
|
|
+func (f *framer) writeLongString(s string) {
|
|
|
+ f.writeInt(len(s))
|
|
|
+ f.buf = append(f.buf, s...)
|
|
|
}
|
|
|
|
|
|
-func (op *authResponseFrame) String() string {
|
|
|
- return fmt.Sprintf("[auth_response data=%q]", op.Data)
|
|
|
+func (f *framer) writeUUID(u *UUID) {
|
|
|
+ f.buf = append(f.buf, u[:]...)
|
|
|
}
|
|
|
|
|
|
-func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
|
|
|
- if f == nil {
|
|
|
- f = newFrame(version)
|
|
|
+func (f *framer) writeStringList(l []string) {
|
|
|
+ f.writeShort(len(l))
|
|
|
+ for _, s := range l {
|
|
|
+ f.writeString(s)
|
|
|
}
|
|
|
- f.setHeader(version, 0, 0, opAuthResponse)
|
|
|
- f.writeBytes(op.Data)
|
|
|
- return f, nil
|
|
|
}
|
|
|
|
|
|
-type authSuccessFrame struct {
|
|
|
- Data []byte
|
|
|
+func (f *framer) writeBytes(p []byte) {
|
|
|
+ // TODO: handle null case correctly,
|
|
|
+ // [bytes] A [int] n, followed by n bytes if n >= 0. If n < 0,
|
|
|
+ // no byte should follow and the value represented is `null`.
|
|
|
+ if p == nil {
|
|
|
+ f.writeInt(-1)
|
|
|
+ } else {
|
|
|
+ f.writeInt(len(p))
|
|
|
+ f.buf = append(f.buf, p...)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (op *authSuccessFrame) String() string {
|
|
|
- return fmt.Sprintf("[auth_success data=%q]", op.Data)
|
|
|
+func (f *framer) writeShortBytes(p []byte) {
|
|
|
+ f.writeShort(len(p))
|
|
|
+ f.buf = append(f.buf, p...)
|
|
|
}
|
|
|
|
|
|
-type authChallengeFrame struct {
|
|
|
- Data []byte
|
|
|
+// TODO: add writeOption, though no frame actually writes an option so probably
|
|
|
+// just need a read
|
|
|
+
|
|
|
+func (f *framer) writeInet(ip net.IP, port int) {
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ byte(len(ip)),
|
|
|
+ )
|
|
|
+
|
|
|
+ f.buf = append(f.buf,
|
|
|
+ []byte(ip)...,
|
|
|
+ )
|
|
|
+
|
|
|
+ f.writeInt(port)
|
|
|
+}
|
|
|
+
|
|
|
+func (f *framer) writeConsistency(cons Consistency) {
|
|
|
+ f.writeShort(int(cons))
|
|
|
}
|
|
|
|
|
|
-func (op *authChallengeFrame) String() string {
|
|
|
- return fmt.Sprintf("[auth_challenge data=%q]", op.Data)
|
|
|
+func (f *framer) writeStringMap(m map[string]string) {
|
|
|
+ f.writeShort(len(m))
|
|
|
+ for k, v := range m {
|
|
|
+ f.writeString(k)
|
|
|
+ f.writeString(v)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+var consistencyCodes = []uint16{
|
|
|
+ Any: 0x0000,
|
|
|
+ One: 0x0001,
|
|
|
+ Two: 0x0002,
|
|
|
+ Three: 0x0003,
|
|
|
+ Quorum: 0x0004,
|
|
|
+ All: 0x0005,
|
|
|
+ LocalQuorum: 0x0006,
|
|
|
+ EachQuorum: 0x0007,
|
|
|
+ Serial: 0x0008,
|
|
|
+ LocalSerial: 0x0009,
|
|
|
+ LocalOne: 0x000A,
|
|
|
}
|