|
|
@@ -209,6 +209,15 @@ type frameHeader struct {
|
|
|
stream int
|
|
|
op frameOp
|
|
|
length int
|
|
|
+
|
|
|
+ framer *framer
|
|
|
+}
|
|
|
+
|
|
|
+func (f *frameHeader) release() {
|
|
|
+ if f.framer != nil {
|
|
|
+ framerPool.Put(f.framer)
|
|
|
+ f.framer = nil
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func (f frameHeader) String() string {
|
|
|
@@ -224,7 +233,8 @@ const defaultBufSize = 128
|
|
|
var framerPool = sync.Pool{
|
|
|
New: func() interface{} {
|
|
|
return &framer{
|
|
|
- buf: make([]byte, defaultBufSize),
|
|
|
+ wbuf: make([]byte, defaultBufSize),
|
|
|
+ rbuf: make([]byte, defaultBufSize),
|
|
|
}
|
|
|
},
|
|
|
}
|
|
|
@@ -245,7 +255,8 @@ type framer struct {
|
|
|
// if tracing flag is set this is not nil
|
|
|
traceID []byte
|
|
|
|
|
|
- buf []byte
|
|
|
+ rbuf []byte
|
|
|
+ wbuf []byte
|
|
|
}
|
|
|
|
|
|
func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer {
|
|
|
@@ -268,7 +279,8 @@ func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *f
|
|
|
f.proto = version
|
|
|
f.flags = flags
|
|
|
f.headSize = headSize
|
|
|
- f.buf = f.buf[:0]
|
|
|
+ f.rbuf = f.rbuf[:0]
|
|
|
+ f.wbuf = f.wbuf[:0]
|
|
|
f.header = nil
|
|
|
f.traceID = nil
|
|
|
|
|
|
@@ -277,15 +289,15 @@ func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *f
|
|
|
|
|
|
type frame interface {
|
|
|
Header() frameHeader
|
|
|
+ release()
|
|
|
}
|
|
|
|
|
|
-func readHeader(r io.Reader, p []byte) (frameHeader, error) {
|
|
|
- _, err := io.ReadFull(r, p)
|
|
|
+func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {
|
|
|
+ _, err = io.ReadFull(r, p)
|
|
|
if err != nil {
|
|
|
- return frameHeader{}, err
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
- head := frameHeader{}
|
|
|
version := p[0] & protoVersionMask
|
|
|
head.version = protoVersion(p[0])
|
|
|
|
|
|
@@ -300,7 +312,7 @@ func readHeader(r io.Reader, p []byte) (frameHeader, error) {
|
|
|
head.length = int(readInt(p[4:]))
|
|
|
}
|
|
|
|
|
|
- return head, nil
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
// explicitly enables tracing for the framers outgoing requests
|
|
|
@@ -311,13 +323,13 @@ func (f *framer) trace() {
|
|
|
// reads a frame form the wire into the framers buffer
|
|
|
func (f *framer) readFrame(head *frameHeader) error {
|
|
|
// assume the underlying reader takes care of timeouts and retries
|
|
|
- if cap(f.buf) > head.length {
|
|
|
- f.buf = f.buf[:head.length]
|
|
|
+ if cap(f.rbuf) > head.length {
|
|
|
+ f.rbuf = f.rbuf[:head.length]
|
|
|
} else {
|
|
|
- f.buf = make([]byte, head.length)
|
|
|
+ f.rbuf = make([]byte, head.length)
|
|
|
}
|
|
|
|
|
|
- _, err := io.ReadFull(f.r, f.buf)
|
|
|
+ _, err := io.ReadFull(f.r, f.rbuf)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -327,12 +339,13 @@ func (f *framer) readFrame(head *frameHeader) error {
|
|
|
return NewErrProtocol("no compressor available with compressed frame body")
|
|
|
}
|
|
|
|
|
|
- f.buf, err = f.compres.Decode(f.buf)
|
|
|
+ f.rbuf, err = f.compres.Decode(f.rbuf)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ head.framer = f
|
|
|
f.header = head
|
|
|
return nil
|
|
|
}
|
|
|
@@ -341,7 +354,7 @@ func (f *framer) readFrame(head *frameHeader) error {
|
|
|
// is safe to use its buffer again (it is fresh)
|
|
|
func (f *framer) parseFrame() (frame, error) {
|
|
|
if f.header.version.request() {
|
|
|
- return frameHeader{}, NewErrProtocol("got a request frame from server: %v", f.header.version)
|
|
|
+ return nil, NewErrProtocol("got a request frame from server: %v", f.header.version)
|
|
|
}
|
|
|
|
|
|
if f.header.flags&flagTracing == flagTracing {
|
|
|
@@ -356,7 +369,7 @@ func (f *framer) parseFrame() (frame, error) {
|
|
|
// asumes that the frame body has been read into buf
|
|
|
switch f.header.op {
|
|
|
case opError:
|
|
|
- frame, err = f.parseErrorFrame()
|
|
|
+ frame = f.parseErrorFrame()
|
|
|
case opReady:
|
|
|
frame = f.parseReadyFrame()
|
|
|
case opResult:
|
|
|
@@ -373,12 +386,10 @@ func (f *framer) parseFrame() (frame, error) {
|
|
|
return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
|
|
|
}
|
|
|
|
|
|
- f.buf = make([]byte, defaultBufSize)
|
|
|
-
|
|
|
return frame, err
|
|
|
}
|
|
|
|
|
|
-func (f *framer) parseErrorFrame() (frame, error) {
|
|
|
+func (f *framer) parseErrorFrame() frame {
|
|
|
code := f.readInt()
|
|
|
msg := f.readString()
|
|
|
|
|
|
@@ -393,75 +404,75 @@ func (f *framer) parseErrorFrame() (frame, error) {
|
|
|
cl := f.readConsistency()
|
|
|
required := f.readInt()
|
|
|
alive := f.readInt()
|
|
|
- return RequestErrUnavailable{
|
|
|
+ return &RequestErrUnavailable{
|
|
|
errorFrame: errD,
|
|
|
Consistency: cl,
|
|
|
Required: required,
|
|
|
Alive: alive,
|
|
|
- }, nil
|
|
|
+ }
|
|
|
case errWriteTimeout:
|
|
|
cl := f.readConsistency()
|
|
|
received := f.readInt()
|
|
|
blockfor := f.readInt()
|
|
|
writeType := f.readString()
|
|
|
- return RequestErrWriteTimeout{
|
|
|
+ return &RequestErrWriteTimeout{
|
|
|
errorFrame: errD,
|
|
|
Consistency: cl,
|
|
|
Received: received,
|
|
|
BlockFor: blockfor,
|
|
|
WriteType: writeType,
|
|
|
- }, nil
|
|
|
+ }
|
|
|
case errReadTimeout:
|
|
|
cl := f.readConsistency()
|
|
|
received := f.readInt()
|
|
|
blockfor := f.readInt()
|
|
|
dataPresent := f.readByte()
|
|
|
- return RequestErrReadTimeout{
|
|
|
+ return &RequestErrReadTimeout{
|
|
|
errorFrame: errD,
|
|
|
Consistency: cl,
|
|
|
Received: received,
|
|
|
BlockFor: blockfor,
|
|
|
DataPresent: dataPresent,
|
|
|
- }, nil
|
|
|
+ }
|
|
|
case errAlreadyExists:
|
|
|
ks := f.readString()
|
|
|
table := f.readString()
|
|
|
- return RequestErrAlreadyExists{
|
|
|
+ return &RequestErrAlreadyExists{
|
|
|
errorFrame: errD,
|
|
|
Keyspace: ks,
|
|
|
Table: table,
|
|
|
- }, nil
|
|
|
+ }
|
|
|
case errUnprepared:
|
|
|
stmtId := f.readShortBytes()
|
|
|
- return RequestErrUnprepared{
|
|
|
+ return &RequestErrUnprepared{
|
|
|
errorFrame: errD,
|
|
|
StatementId: stmtId,
|
|
|
- }, nil
|
|
|
+ }
|
|
|
default:
|
|
|
- return errD, nil
|
|
|
+ return &errD
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeHeader(flags byte, op frameOp, stream int) {
|
|
|
- f.buf = f.buf[:0]
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = f.wbuf[:0]
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
f.proto,
|
|
|
flags,
|
|
|
)
|
|
|
|
|
|
if f.proto > protoVersion2 {
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(stream>>8),
|
|
|
byte(stream),
|
|
|
)
|
|
|
} else {
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(stream),
|
|
|
)
|
|
|
}
|
|
|
|
|
|
// pad out length
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(op),
|
|
|
0,
|
|
|
0,
|
|
|
@@ -476,32 +487,31 @@ func (f *framer) setLength(length int) {
|
|
|
p = 5
|
|
|
}
|
|
|
|
|
|
- f.buf[p+0] = byte(length >> 24)
|
|
|
- f.buf[p+1] = byte(length >> 16)
|
|
|
- f.buf[p+2] = byte(length >> 8)
|
|
|
- f.buf[p+3] = byte(length)
|
|
|
+ f.wbuf[p+0] = byte(length >> 24)
|
|
|
+ f.wbuf[p+1] = byte(length >> 16)
|
|
|
+ f.wbuf[p+2] = byte(length >> 8)
|
|
|
+ f.wbuf[p+3] = byte(length)
|
|
|
}
|
|
|
|
|
|
func (f *framer) finishWrite() error {
|
|
|
- if f.buf[1]&flagCompress == flagCompress {
|
|
|
+ if f.wbuf[1]&flagCompress == flagCompress {
|
|
|
if f.compres == nil {
|
|
|
panic("compress flag set with no compressor")
|
|
|
}
|
|
|
|
|
|
// TODO: only compress frames which are big enough
|
|
|
- compressed, err := f.compres.Encode(f.buf[f.headSize:])
|
|
|
+ compressed, err := f.compres.Encode(f.wbuf[f.headSize:])
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- f.buf = append(f.buf[:f.headSize], compressed...)
|
|
|
+ f.wbuf = append(f.wbuf[:f.headSize], compressed...)
|
|
|
}
|
|
|
- length := len(f.buf) - f.headSize
|
|
|
+ length := len(f.wbuf) - f.headSize
|
|
|
f.setLength(length)
|
|
|
|
|
|
- _, err := f.w.Write(f.buf)
|
|
|
-
|
|
|
- f.buf = f.buf[:0]
|
|
|
+ _, err := f.w.Write(f.wbuf)
|
|
|
+ f.wbuf = f.wbuf[:0]
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -551,13 +561,13 @@ func (f *framer) writeStartupFrame(streamID int, options map[string]string) erro
|
|
|
// startup frame must not have the compress flag set
|
|
|
f.writeHeader(f.flags&^flagCompress, opStartup, streamID)
|
|
|
f.writeStringMap(options)
|
|
|
- f.setLength(len(f.buf) - f.headSize)
|
|
|
+ f.setLength(len(f.wbuf) - f.headSize)
|
|
|
|
|
|
// dont use finishWrite here as it will use compression
|
|
|
// TODO: have a type which has a writeHeader / writeBody so we can do
|
|
|
// transparent compression
|
|
|
- _, err := f.w.Write(f.buf)
|
|
|
- f.buf = f.buf[:0]
|
|
|
+ _, err := f.w.Write(f.wbuf)
|
|
|
+ f.wbuf = f.wbuf[:0]
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -747,6 +757,7 @@ func (f *framer) parseResultSetKeyspace() frame {
|
|
|
|
|
|
type resultPreparedFrame struct {
|
|
|
frameHeader
|
|
|
+
|
|
|
preparedID []byte
|
|
|
reqMeta resultMetadata
|
|
|
respMeta resultMetadata
|
|
|
@@ -1099,48 +1110,48 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame) error {
|
|
|
}
|
|
|
|
|
|
func (f *framer) readByte() byte {
|
|
|
- b := f.buf[0]
|
|
|
- f.buf = f.buf[1:]
|
|
|
+ b := f.rbuf[0]
|
|
|
+ f.rbuf = f.rbuf[1:]
|
|
|
return b
|
|
|
}
|
|
|
|
|
|
func (f *framer) readInt() (n int) {
|
|
|
- n = int(int32(f.buf[0])<<24 | int32(f.buf[1])<<16 | int32(f.buf[2])<<8 | int32(f.buf[3]))
|
|
|
- f.buf = f.buf[4:]
|
|
|
+ n = int(int32(f.rbuf[0])<<24 | int32(f.rbuf[1])<<16 | int32(f.rbuf[2])<<8 | int32(f.rbuf[3]))
|
|
|
+ f.rbuf = f.rbuf[4:]
|
|
|
return
|
|
|
}
|
|
|
|
|
|
func (f *framer) readShort() (n uint16) {
|
|
|
- n = uint16(f.buf[0])<<8 | uint16(f.buf[1])
|
|
|
- f.buf = f.buf[2:]
|
|
|
+ n = uint16(f.rbuf[0])<<8 | uint16(f.rbuf[1])
|
|
|
+ f.rbuf = f.rbuf[2:]
|
|
|
return
|
|
|
}
|
|
|
|
|
|
func (f *framer) readLong() (n int64) {
|
|
|
- n = int64(f.buf[0])<<56 | int64(f.buf[1])<<48 | int64(f.buf[2])<<40 | int64(f.buf[3])<<32 |
|
|
|
- int64(f.buf[4])<<24 | int64(f.buf[5])<<16 | int64(f.buf[6])<<8 | int64(f.buf[7])
|
|
|
- f.buf = f.buf[8:]
|
|
|
+ n = int64(f.rbuf[0])<<56 | int64(f.rbuf[1])<<48 | int64(f.rbuf[2])<<40 | int64(f.rbuf[3])<<32 |
|
|
|
+ int64(f.rbuf[4])<<24 | int64(f.rbuf[5])<<16 | int64(f.rbuf[6])<<8 | int64(f.rbuf[7])
|
|
|
+ f.rbuf = f.rbuf[8:]
|
|
|
return
|
|
|
}
|
|
|
|
|
|
func (f *framer) readString() (s string) {
|
|
|
size := f.readShort()
|
|
|
- s = string(f.buf[:size])
|
|
|
- f.buf = f.buf[size:]
|
|
|
+ s = string(f.rbuf[:size])
|
|
|
+ f.rbuf = f.rbuf[size:]
|
|
|
return
|
|
|
}
|
|
|
|
|
|
func (f *framer) readLongString() (s string) {
|
|
|
size := f.readInt()
|
|
|
- s = string(f.buf[:size])
|
|
|
- f.buf = f.buf[size:]
|
|
|
+ s = string(f.rbuf[:size])
|
|
|
+ f.rbuf = f.rbuf[size:]
|
|
|
return
|
|
|
}
|
|
|
|
|
|
func (f *framer) readUUID() *UUID {
|
|
|
// TODO: how to handle this error, if it is a uuid, then sureley, problems?
|
|
|
- u, _ := UUIDFromBytes(f.buf[:16])
|
|
|
- f.buf = f.buf[16:]
|
|
|
+ u, _ := UUIDFromBytes(f.rbuf[:16])
|
|
|
+ f.rbuf = f.rbuf[16:]
|
|
|
return &u
|
|
|
}
|
|
|
|
|
|
@@ -1161,8 +1172,8 @@ func (f *framer) readBytes() []byte {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- l := f.buf[:size]
|
|
|
- f.buf = f.buf[size:]
|
|
|
+ l := f.rbuf[:size]
|
|
|
+ f.rbuf = f.rbuf[size:]
|
|
|
|
|
|
return l
|
|
|
}
|
|
|
@@ -1170,22 +1181,22 @@ func (f *framer) readBytes() []byte {
|
|
|
func (f *framer) readShortBytes() []byte {
|
|
|
n := f.readShort()
|
|
|
|
|
|
- l := f.buf[:n]
|
|
|
- f.buf = f.buf[n:]
|
|
|
+ l := f.rbuf[:n]
|
|
|
+ f.rbuf = f.rbuf[n:]
|
|
|
|
|
|
return l
|
|
|
}
|
|
|
|
|
|
func (f *framer) readInet() (net.IP, int) {
|
|
|
- size := f.buf[0]
|
|
|
- f.buf = f.buf[1:]
|
|
|
+ size := f.rbuf[0]
|
|
|
+ f.rbuf = f.rbuf[1:]
|
|
|
|
|
|
if !(size == 4 || size == 16) {
|
|
|
panic(fmt.Sprintf("invalid IP size: %d", size))
|
|
|
}
|
|
|
|
|
|
- ip := f.buf[:size]
|
|
|
- f.buf = f.buf[size:]
|
|
|
+ ip := f.rbuf[:size]
|
|
|
+ f.rbuf = f.rbuf[size:]
|
|
|
|
|
|
port := f.readInt()
|
|
|
return net.IP(ip), port
|
|
|
@@ -1222,12 +1233,12 @@ func (f *framer) readStringMultiMap() map[string][]string {
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeByte(b byte) {
|
|
|
- f.buf = append(f.buf, b)
|
|
|
+ f.wbuf = append(f.wbuf, b)
|
|
|
}
|
|
|
|
|
|
// these are protocol level binary types
|
|
|
func (f *framer) writeInt(n int32) {
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(n>>24),
|
|
|
byte(n>>16),
|
|
|
byte(n>>8),
|
|
|
@@ -1236,14 +1247,14 @@ func (f *framer) writeInt(n int32) {
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeShort(n uint16) {
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(n>>8),
|
|
|
byte(n),
|
|
|
)
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeLong(n int64) {
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(n>>56),
|
|
|
byte(n>>48),
|
|
|
byte(n>>40),
|
|
|
@@ -1257,16 +1268,16 @@ func (f *framer) writeLong(n int64) {
|
|
|
|
|
|
func (f *framer) writeString(s string) {
|
|
|
f.writeShort(uint16(len(s)))
|
|
|
- f.buf = append(f.buf, s...)
|
|
|
+ f.wbuf = append(f.wbuf, s...)
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeLongString(s string) {
|
|
|
f.writeInt(int32(len(s)))
|
|
|
- f.buf = append(f.buf, s...)
|
|
|
+ f.wbuf = append(f.wbuf, s...)
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeUUID(u *UUID) {
|
|
|
- f.buf = append(f.buf, u[:]...)
|
|
|
+ f.wbuf = append(f.wbuf, u[:]...)
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeStringList(l []string) {
|
|
|
@@ -1284,21 +1295,21 @@ func (f *framer) writeBytes(p []byte) {
|
|
|
f.writeInt(-1)
|
|
|
} else {
|
|
|
f.writeInt(int32(len(p)))
|
|
|
- f.buf = append(f.buf, p...)
|
|
|
+ f.wbuf = append(f.wbuf, p...)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeShortBytes(p []byte) {
|
|
|
f.writeShort(uint16(len(p)))
|
|
|
- f.buf = append(f.buf, p...)
|
|
|
+ f.wbuf = append(f.wbuf, p...)
|
|
|
}
|
|
|
|
|
|
func (f *framer) writeInet(ip net.IP, port int) {
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
byte(len(ip)),
|
|
|
)
|
|
|
|
|
|
- f.buf = append(f.buf,
|
|
|
+ f.wbuf = append(f.wbuf,
|
|
|
[]byte(ip)...,
|
|
|
)
|
|
|
|