Browse Source

Merge pull request #339 from Zariel/frame-refactors

refactor all framing protocol code into frame
Ben Hood 11 năm trước cách đây
mục cha
commit
584a266891
10 tập tin đã thay đổi với 1506 bổ sung874 xóa
  1. 5 0
      README.md
  2. 19 16
      cassandra_test.go
  3. 263 331
      conn.go
  4. 55 53
      conn_test.go
  5. 12 0
      errors.go
  6. 1 1
      errors_test.go
  7. 1098 398
      frame.go
  8. 19 19
      marshal.go
  9. 23 44
      session.go
  10. 11 12
      session_test.go

+ 5 - 0
README.md

@@ -11,6 +11,11 @@ Project Website: http://gocql.github.io/<br>
 API documentation: http://godoc.org/github.com/gocql/gocql<br>
 Discussions: https://groups.google.com/forum/#!forum/gocql
 
+Production Stability
+---------
+The underlying framing code was rewritten as part of #339 and as such may have
+unforseen bugs. If you run into a bug related to wire framing, please raise a ticket and we will try to resolve this as soon as we can. If you require a stable version to pin your production app against, we have tagged the previous stable version in source code, so you can build against this. The tag is called 1st_gen_framing (180456fef0a3c6d02c51dc7211f49b55e9315867). This note will be removed as the new generation framing code base matures
+
 Supported Versions
 ------------------
 

+ 19 - 16
cassandra_test.go

@@ -90,7 +90,8 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 	if err != nil {
 		tb.Fatal("createSession:", err)
 	}
-	if err = session.Query(`DROP KEYSPACE ` + keyspace).Exec(); err != nil {
+	defer session.Close()
+	if err = session.Query(`DROP KEYSPACE IF EXISTS ` + keyspace).Exec(); err != nil {
 		tb.Log("drop keyspace:", err)
 	}
 	if err := session.Query(fmt.Sprintf(`CREATE KEYSPACE %s
@@ -101,7 +102,6 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
 		tb.Fatalf("error creating keyspace %s: %v", keyspace, err)
 	}
 	tb.Logf("Created keyspace %s", keyspace)
-	session.Close()
 }
 
 func createSession(tb testing.TB) *Session {
@@ -973,16 +973,19 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
 	stmtsLRU.Lock()
 	stmtsLRU.lru.Add(conn.addr+stmt, flight)
 	stmtsLRU.Unlock()
-	flight.info = &QueryInfo{
-		Id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
-		Args: []ColumnInfo{ColumnInfo{
-			Keyspace: "gocql_test",
-			Table:    table,
-			Name:     "foo",
-			TypeInfo: &TypeInfo{
-				Type: TypeVarchar,
-			},
-		}},
+	flight.info = &resultPreparedFrame{
+		preparedID: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
+		reqMeta: resultMetadata{
+			columns: []ColumnInfo{
+				{
+					Keyspace: "gocql_test",
+					Table:    table,
+					Name:     "foo",
+					TypeInfo: &TypeInfo{
+						Type: TypeVarchar,
+					},
+				},
+			}},
 	}
 	return stmt, conn
 }
@@ -1045,13 +1048,13 @@ func TestQueryInfo(t *testing.T) {
 		t.Fatalf("Failed to execute query for preparing statement: %v", err)
 	}
 
-	if len(info.Args) != 1 {
-		t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, len(info.Args))
+	if len(info.reqMeta.columns) != 1 {
+		t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, len(info.reqMeta.columns))
 	}
 
 	if *flagProto > 1 {
-		if len(info.Rval) != 2 {
-			t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, len(info.Rval))
+		if len(info.respMeta.columns) != 2 {
+			t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, len(info.respMeta.columns))
 		}
 	}
 }

+ 263 - 331
conn.go

@@ -15,16 +15,9 @@ import (
 	"strconv"
 	"strings"
 	"sync"
-	"sync/atomic"
 	"time"
 )
 
-const (
-	defaultFrameSize = 4096
-	flagResponse     = 0x80
-	maskVersion      = 0x7F
-)
-
 //JoinHostPort is a utility to return a address string that can be used
 //gocql.Conn to form a connection with a host.
 func JoinHostPort(addr string, port int) string {
@@ -90,9 +83,10 @@ type Conn struct {
 	r       *bufio.Reader
 	timeout time.Duration
 
+	headerBuf []byte
+
 	uniq  chan int
 	calls []callReq
-	nwait int32
 
 	pool            ConnectionPool
 	compressor      Compressor
@@ -100,6 +94,7 @@ type Conn struct {
 	addr            string
 	version         uint8
 	currentKeyspace string
+	started         bool
 
 	closedMu sync.RWMutex
 	isClosed bool
@@ -129,9 +124,12 @@ func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
 		cfg.ProtoVersion = 2
 	}
 
+	headerSize := 8
+
 	maxStreams := 128
 	if cfg.ProtoVersion > protoVersion2 {
 		maxStreams = 32768
+		headerSize = 9
 	}
 
 	if cfg.NumStreams <= 0 || cfg.NumStreams > maxStreams {
@@ -149,6 +147,8 @@ func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
 		pool:       pool,
 		compressor: cfg.Compressor,
 		auth:       cfg.Authenticator,
+
+		headerBuf: make([]byte, headerSize),
 	}
 
 	if cfg.Keepalive > 0 {
@@ -156,66 +156,115 @@ func Connect(addr string, cfg ConnConfig, pool ConnectionPool) (*Conn, error) {
 	}
 
 	for i := 0; i < cfg.NumStreams; i++ {
+		c.calls[i].resp = make(chan error, 1)
 		c.uniq <- i
 	}
 
+	go c.serve()
+
 	if err := c.startup(&cfg); err != nil {
 		conn.Close()
 		return nil, err
 	}
-
-	go c.serve()
+	c.started = true
 
 	return c, nil
 }
 
+func (c *Conn) Write(p []byte) (int, error) {
+	if c.timeout > 0 {
+		c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
+	}
+
+	return c.conn.Write(p)
+}
+
+func (c *Conn) Read(p []byte) (n int, err error) {
+	const maxAttempts = 5
+
+	for i := 0; i < maxAttempts; i++ {
+		var nn int
+		if c.timeout > 0 {
+			c.conn.SetReadDeadline(time.Now().Add(c.timeout))
+		}
+
+		nn, err = io.ReadFull(c.r, p[n:])
+		n += nn
+		if err == nil {
+			break
+		}
+
+		if verr, ok := err.(net.Error); !ok || !verr.Temporary() {
+			break
+		}
+	}
+
+	return
+}
+
 func (c *Conn) startup(cfg *ConnConfig) error {
-	compression := ""
+	m := map[string]string{
+		"CQL_VERSION": cfg.CQLVersion,
+	}
+
 	if c.compressor != nil {
-		compression = c.compressor.Name()
+		m["COMPRESSION"] = c.compressor.Name()
+	}
+
+	frame, err := c.exec(&writeStartupFrame{opts: m}, nil)
+	if err != nil {
+		return err
+	}
+
+	switch v := frame.(type) {
+	case error:
+		return v
+	case *readyFrame:
+		return nil
+	case *authenticateFrame:
+		return c.authenticateHandshake(v)
+	default:
+		return NewErrProtocol("Unknown type of response to startup frame: %s", v)
 	}
-	var req operation = &startupFrame{
-		CQLVersion:  cfg.CQLVersion,
-		Compression: compression,
+}
+
+func (c *Conn) authenticateHandshake(authFrame *authenticateFrame) error {
+	if c.auth == nil {
+		return fmt.Errorf("authentication required (using %q)", authFrame.class)
 	}
-	var challenger Authenticator
+
+	resp, challenger, err := c.auth.Challenge([]byte(authFrame.class))
+	if err != nil {
+		return err
+	}
+
+	req := &writeAuthResponseFrame{data: resp}
+
 	for {
-		resp, err := c.execSimple(req)
+		frame, err := c.exec(req, nil)
 		if err != nil {
 			return err
 		}
-		switch x := resp.(type) {
-		case readyFrame:
-			return nil
+
+		switch v := frame.(type) {
 		case error:
-			return x
-		case authenticateFrame:
-			if c.auth == nil {
-				return fmt.Errorf("authentication required (using %q)", x.Authenticator)
-			}
-			var resp []byte
-			resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
-			if err != nil {
-				return err
-			}
-			req = &authResponseFrame{resp}
-		case authChallengeFrame:
-			if challenger == nil {
-				return fmt.Errorf("authentication error (invalid challenge)")
+			return v
+		case *authSuccessFrame:
+			if challenger != nil {
+				return challenger.Success(v.data)
 			}
-			var resp []byte
-			resp, challenger, err = challenger.Challenge(x.Data)
+			return nil
+		case *authChallengeFrame:
+			resp, challenger, err = challenger.Challenge(v.data)
 			if err != nil {
 				return err
 			}
-			req = &authResponseFrame{resp}
-		case authSuccessFrame:
-			if challenger != nil {
-				return challenger.Success(x.Data)
+
+			req = &writeAuthResponseFrame{
+				data: resp,
 			}
-			return nil
 		default:
-			return NewErrProtocol("Unknown type of response to startup frame: %s", x)
+			return fmt.Errorf("unknown frame response during authentication: %v", v)
 		}
 	}
 }
@@ -225,170 +274,112 @@ func (c *Conn) startup(cfg *ConnConfig) error {
 // open and is therefore usually called in a separate goroutine.
 func (c *Conn) serve() {
 	var (
-		err  error
-		resp frame
+		err error
 	)
 
 	for {
-		resp, err = c.recv()
+		err = c.recv()
 		if err != nil {
 			break
 		}
-		c.dispatch(resp)
 	}
 
 	c.Close()
 	for id := 0; id < len(c.calls); id++ {
 		req := &c.calls[id]
-		if atomic.LoadInt32(&req.active) == 1 {
-			req.resp <- callResp{nil, err}
+		// we need to send the error to all waiting queries, put the state
+		// of this conn into not active so that it can not execute any queries.
+		select {
+		case req.resp <- err:
+		default:
 		}
-	}
-	c.pool.HandleError(c, err, true)
-}
 
-func (c *Conn) Write(p []byte) (int, error) {
-	c.conn.SetWriteDeadline(time.Now().Add(c.timeout))
-	return c.conn.Write(p)
-}
+		close(req.resp)
+	}
 
-func (c *Conn) Read(p []byte) (int, error) {
-	return c.r.Read(p)
+	if c.started {
+		c.pool.HandleError(c, err, true)
+	}
 }
 
-func (c *Conn) recv() (frame, error) {
-	size := headerProtoSize[c.version]
-	resp := make(frame, size, size+512)
+func (c *Conn) recv() error {
+	// not safe for concurrent reads
 
 	// read a full header, ignore timeouts, as this is being ran in a loop
-	c.conn.SetReadDeadline(time.Time{})
-	_, err := io.ReadFull(c.r, resp[:size])
-	if err != nil {
-		return nil, err
+	// TODO: TCP level deadlines? or just query level deadlines?
+	if c.timeout > 0 {
+		c.conn.SetReadDeadline(time.Time{})
 	}
 
-	if v := c.version | flagResponse; resp[0] != v {
-		return nil, NewErrProtocol("recv: response protocol version does not match connection protocol version (%d != %d)", resp[0], v)
+	// were just reading headers over and over and copy bodies
+	head, err := readHeader(c.r, c.headerBuf)
+	if err != nil {
+		return err
 	}
 
-	bodySize := resp.Length(c.version)
-	if bodySize == 0 {
-		return resp, nil
+	call := &c.calls[head.stream]
+	err = call.framer.readFrame(&head)
+	if err != nil {
+		return err
 	}
-	resp.grow(bodySize)
-
-	const maxAttempts = 5
-
-	n := size
-	for i := 0; i < maxAttempts; i++ {
-		var nn int
-		c.conn.SetReadDeadline(time.Now().Add(c.timeout))
-		nn, err = io.ReadFull(c.r, resp[n:size+bodySize])
-		if err == nil {
-			break
-		}
-		n += nn
 
-		if verr, ok := err.(net.Error); !ok || !verr.Temporary() {
-			break
-		}
-	}
+	// once we get to here we know that the caller must be waiting and that there
+	// is no error.
+	call.resp <- nil
+	c.uniq <- head.stream
 
-	if err != nil {
-		return nil, err
-	}
+	return nil
+}
 
-	return resp, nil
+type callReq struct {
+	// could use a waitgroup but this allows us to do timeouts on the read/send
+	resp   chan error
+	framer *framer
 }
 
-func (c *Conn) execSimple(op operation) (interface{}, error) {
-	f, err := op.encodeFrame(c.version, nil)
-	if err != nil {
-		// this should be a noop err
-		return nil, err
-	}
+func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
+	// TODO: move tracer onto conn
+	stream := <-c.uniq
 
-	bodyLen := len(f) - headerProtoSize[c.version]
-	f.setLength(bodyLen, c.version)
+	call := &c.calls[stream]
+	// resp is basically a waiting semaphore protecting the framer
+	framer := newFramer(c, c, c.compressor, c.version)
+	call.framer = framer
 
-	if _, err := c.Write([]byte(f)); err != nil {
-		c.Close()
-		return nil, err
+	if tracer != nil {
+		framer.trace()
 	}
 
-	// here recv wont timeout waiting for a header, should it?
-	if f, err = c.recv(); err != nil {
+	err := req.writeFrame(framer, stream)
+	if err != nil {
 		return nil, err
 	}
 
-	return c.decodeFrame(f, nil)
-}
-
-func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
-	req, err := op.encodeFrame(c.version, nil)
+	err = <-call.resp
 	if err != nil {
 		return nil, err
 	}
 
-	if trace != nil {
-		req[1] |= flagTrace
+	if v := framer.header.version.version(); v != c.version {
+		return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
 	}
 
-	headerSize := headerProtoSize[c.version]
-	if len(req) > headerSize && c.compressor != nil {
-		body, err := c.compressor.Encode([]byte(req[headerSize:]))
-		if err != nil {
-			return nil, err
-		}
-		req = append(req[:headerSize], frame(body)...)
-		req[1] |= flagCompress
-	}
-	bodyLen := len(req) - headerSize
-	req.setLength(bodyLen, c.version)
-
-	id := <-c.uniq
-	req.setStream(id, c.version)
-	call := &c.calls[id]
-	call.resp = make(chan callResp, 1)
-	atomic.AddInt32(&c.nwait, 1)
-	atomic.StoreInt32(&call.active, 1)
-
-	if _, err := c.Write(req); err != nil {
-		c.uniq <- id
-		c.Close()
+	frame, err := framer.parseFrame()
+	if err != nil {
 		return nil, err
 	}
 
-	reply := <-call.resp
-	call.resp = nil
-	c.uniq <- id
-
-	if reply.err != nil {
-		return nil, reply.err
+	if len(framer.traceID) > 0 {
+		tracer.Trace(framer.traceID)
 	}
 
-	return c.decodeFrame(reply.buf, trace)
-}
-
-func (c *Conn) dispatch(resp frame) {
-	id := resp.Stream(c.version)
-	if id >= len(c.calls) {
-		return
-	}
-	call := &c.calls[id]
-	if !atomic.CompareAndSwapInt32(&call.active, 1, 0) {
-		return
-	}
-	atomic.AddInt32(&c.nwait, -1)
-	call.resp <- callResp{resp, nil}
-}
+	framerPool.Put(framer)
+	call.framer = nil
 
-func (c *Conn) ping() error {
-	_, err := c.exec(&optionsFrame{}, nil)
-	return err
+	return frame, nil
 }
 
-func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
+func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame, error) {
 	stmtsLRU.Lock()
 	if stmtsLRU.lru == nil {
 		initStmtsLRU(defaultMaxPreparedStmts)
@@ -397,8 +388,8 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
 	stmtCacheKey := c.addr + c.currentKeyspace + stmt
 
 	if val, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
-		flight := val.(*inflightPrepare)
 		stmtsLRU.Unlock()
+		flight := val.(*inflightPrepare)
 		flight.wg.Wait()
 		return flight.info, flight.err
 	}
@@ -408,28 +399,28 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
 	stmtsLRU.lru.Add(stmtCacheKey, flight)
 	stmtsLRU.Unlock()
 
-	resp, err := c.exec(&prepareFrame{Stmt: stmt}, trace)
+	prep := &writePrepareFrame{
+		statement: stmt,
+	}
+
+	resp, err := c.exec(prep, trace)
 	if err != nil {
 		flight.err = err
-	} else {
-		switch x := resp.(type) {
-		case resultPreparedFrame:
-			flight.info = &QueryInfo{
-				Id:   x.PreparedId,
-				Args: x.Arguments,
-				Rval: x.ReturnValues,
-			}
-		case error:
-			flight.err = x
-		default:
-			flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
-		}
-		err = flight.err
+		flight.wg.Done()
+		return nil, err
 	}
 
+	switch x := resp.(type) {
+	case *resultPreparedFrame:
+		flight.info = x
+	case error:
+		flight.err = x
+	default:
+		flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x)
+	}
 	flight.wg.Done()
 
-	if err != nil {
+	if flight.err != nil {
 		stmtsLRU.Lock()
 		stmtsLRU.lru.Remove(stmtCacheKey)
 		stmtsLRU.Unlock()
@@ -439,12 +430,19 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
 }
 
 func (c *Conn) executeQuery(qry *Query) *Iter {
-	op := &queryFrame{
-		Stmt:      qry.stmt,
-		Cons:      qry.cons,
-		PageSize:  qry.pageSize,
-		PageState: qry.pageState,
+	params := queryParams{
+		consistency: qry.cons,
+	}
+
+	// TODO: Add DefaultTimestamp, SerialConsistency
+	if len(qry.pageState) > 0 {
+		params.pagingState = qry.pageState
+	}
+	if qry.pageSize > 0 {
+		params.pageSize = qry.pageSize
 	}
+
+	var frame frameWriter
 	if qry.shouldPrepare() {
 		// Prepare all DML queries. Other queries can not be prepared.
 		info, err := c.prepareStatement(qry.stmt, qry.trace)
@@ -457,48 +455,74 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		if qry.binding == nil {
 			values = qry.values
 		} else {
-			values, err = qry.binding(info)
+			binding := &QueryInfo{
+				Id:   info.preparedID,
+				Args: info.reqMeta.columns,
+				Rval: info.respMeta.columns,
+			}
+
+			values, err = qry.binding(binding)
 			if err != nil {
 				return &Iter{err: err}
 			}
 		}
 
-		if len(values) != len(info.Args) {
+		if len(values) != len(info.reqMeta.columns) {
 			return &Iter{err: ErrQueryArgLength}
 		}
-		op.Prepared = info.Id
-		op.Values = make([][]byte, len(values))
+		params.values = make([]queryValues, len(values))
 		for i := 0; i < len(values); i++ {
-			val, err := Marshal(info.Args[i].TypeInfo, values[i])
+			val, err := Marshal(info.reqMeta.columns[i].TypeInfo, values[i])
 			if err != nil {
 				return &Iter{err: err}
 			}
-			op.Values[i] = val
+
+			v := &params.values[i]
+			v.value = val
+			// TODO: handle query binding names
+		}
+
+		frame = &writeExecuteFrame{
+			preparedID: info.preparedID,
+			params:     params,
+		}
+	} else {
+		frame = &writeQueryFrame{
+			statement: qry.stmt,
+			params:    params,
 		}
 	}
-	resp, err := c.exec(op, qry.trace)
+
+	resp, err := c.exec(frame, qry.trace)
 	if err != nil {
 		return &Iter{err: err}
 	}
+
 	switch x := resp.(type) {
-	case resultVoidFrame:
+	case *resultVoidFrame:
 		return &Iter{}
-	case resultRowsFrame:
-		iter := &Iter{columns: x.Columns, rows: x.Rows}
-		if len(x.PagingState) > 0 {
+	case *resultRowsFrame:
+		iter := &Iter{
+			columns: x.meta.columns,
+			rows:    x.rows,
+		}
+
+		if len(x.meta.pagingState) > 0 {
 			iter.next = &nextIter{
 				qry: *qry,
 				pos: int((1 - qry.prefetch) * float64(len(iter.rows))),
 			}
-			iter.next.qry.pageState = x.PagingState
+
+			iter.next.qry.pageState = x.meta.pagingState
 			if iter.next.pos < 1 {
 				iter.next.pos = 1
 			}
 		}
+
 		return iter
-	case resultKeyspaceFrame:
+	case *resultKeyspaceFrame, *resultSchemaChangeFrame:
 		return &Iter{}
-	case RequestErrUnprepared:
+	case *RequestErrUnprepared:
 		stmtsLRU.Lock()
 		stmtCacheKey := c.addr + c.currentKeyspace + qry.stmt
 		if _, ok := stmtsLRU.lru.Get(stmtCacheKey); ok {
@@ -550,16 +574,20 @@ func (c *Conn) AvailableStreams() int {
 }
 
 func (c *Conn) UseKeyspace(keyspace string) error {
-	resp, err := c.exec(&queryFrame{Stmt: `USE "` + keyspace + `"`, Cons: Any}, nil)
+	q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
+	q.params.consistency = Any
+
+	resp, err := c.exec(q, nil)
 	if err != nil {
 		return err
 	}
+
 	switch x := resp.(type) {
-	case resultKeyspaceFrame:
+	case *resultKeyspaceFrame:
 	case error:
 		return x
 	default:
-		return NewErrProtocol("Unknown type in response to USE: %s", x)
+		return NewErrProtocol("unknown frame in response to USE: %v", x)
 	}
 
 	c.currentKeyspace = keyspace
@@ -571,67 +599,73 @@ func (c *Conn) executeBatch(batch *Batch) error {
 	if c.version == protoVersion1 {
 		return ErrUnsupported
 	}
-	f := newFrame(c.version)
-	f.setHeader(c.version, 0, 0, opBatch)
-	f.writeByte(byte(batch.Type))
-	f.writeShort(uint16(len(batch.Entries)))
+
+	n := len(batch.Entries)
+	req := &writeBatchFrame{
+		typ:         batch.Type,
+		statements:  make([]batchStatment, n),
+		consistency: batch.Cons,
+	}
 
 	stmts := make(map[string]string)
 
-	for i := 0; i < len(batch.Entries); i++ {
+	for i := 0; i < n; i++ {
 		entry := &batch.Entries[i]
-		var info *QueryInfo
-		var args []interface{}
+		b := &req.statements[i]
 		if len(entry.Args) > 0 || entry.binding != nil {
-			var err error
-			info, err = c.prepareStatement(entry.Stmt, nil)
+			info, err := c.prepareStatement(entry.Stmt, nil)
 			if err != nil {
 				return err
 			}
 
+			var args []interface{}
 			if entry.binding == nil {
 				args = entry.Args
 			} else {
-				args, err = entry.binding(info)
+				binding := &QueryInfo{
+					Id:   info.preparedID,
+					Args: info.reqMeta.columns,
+					Rval: info.respMeta.columns,
+				}
+				args, err = entry.binding(binding)
 				if err != nil {
 					return err
 				}
 			}
 
-			if len(args) != len(info.Args) {
+			if len(args) != len(info.reqMeta.columns) {
 				return ErrQueryArgLength
 			}
 
-			stmts[string(info.Id)] = entry.Stmt
-			f.writeByte(1)
-			f.writeShortBytes(info.Id)
-		} else {
-			f.writeByte(0)
-			f.writeLongString(entry.Stmt)
-		}
-		f.writeShort(uint16(len(args)))
-		for j := 0; j < len(args); j++ {
-			val, err := Marshal(info.Args[j].TypeInfo, args[j])
-			if err != nil {
-				return err
+			b.preparedID = info.preparedID
+			stmts[string(info.preparedID)] = entry.Stmt
+
+			b.values = make([]queryValues, len(info.reqMeta.columns))
+
+			for j := 0; j < len(info.reqMeta.columns); j++ {
+				val, err := Marshal(info.reqMeta.columns[j].TypeInfo, args[j])
+				if err != nil {
+					return err
+				}
+
+				b.values[j].value = val
+				// TODO: add names
 			}
-			f.writeBytes(val)
+		} else {
+			b.statement = entry.Stmt
 		}
 	}
-	f.writeConsistency(batch.Cons)
-	if c.version >= protoVersion3 {
-		// TODO: add support for flags here
-		f.writeByte(0)
-	}
 
-	resp, err := c.exec(f, nil)
+	// TODO: should batch support tracing?
+	resp, err := c.exec(req, nil)
 	if err != nil {
 		return err
 	}
+
 	switch x := resp.(type) {
-	case resultVoidFrame:
+	case *resultVoidFrame:
 		return nil
-	case RequestErrUnprepared:
+	case *RequestErrUnprepared:
 		stmt, found := stmts[string(x.StatementId)]
 		if found {
 			stmtsLRU.Lock()
@@ -650,91 +684,6 @@ func (c *Conn) executeBatch(batch *Batch) error {
 	}
 }
 
-func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error) {
-	defer func() {
-		if r := recover(); r != nil {
-			if e, ok := r.(ErrProtocol); ok {
-				err = e
-				return
-			}
-			panic(r)
-		}
-	}()
-
-	headerSize := headerProtoSize[c.version]
-	if len(f) < headerSize {
-		return nil, NewErrProtocol("Decoding frame: less data received than required for header: %d < %d", len(f), headerSize)
-	} else if f[0] != c.version|flagResponse {
-		return nil, NewErrProtocol("Decoding frame: response protocol version does not match connection protocol version (%d != %d)", f[0], c.version|flagResponse)
-	}
-
-	flags, op, f := f[1], f.Op(c.version), f[headerSize:]
-	if flags&flagCompress != 0 && len(f) > 0 && c.compressor != nil {
-		if buf, err := c.compressor.Decode([]byte(f)); err != nil {
-			return nil, err
-		} else {
-			f = frame(buf)
-		}
-	}
-	if flags&flagTrace != 0 {
-		if len(f) < 16 {
-			return nil, NewErrProtocol("Decoding frame: length of frame less than 16 while tracing is enabled")
-		}
-		traceId := []byte(f[:16])
-		f = f[16:]
-		trace.Trace(traceId)
-	}
-
-	switch op {
-	case opReady:
-		return readyFrame{}, nil
-	case opResult:
-		switch kind := f.readInt(); kind {
-		case resultKindVoid:
-			return resultVoidFrame{}, nil
-		case resultKindRows:
-			columns, pageState := f.readMetaData(c.version)
-			numRows := f.readInt()
-			values := make([][]byte, numRows*len(columns))
-			for i := 0; i < len(values); i++ {
-				values[i] = f.readBytes()
-			}
-			rows := make([][][]byte, numRows)
-			for i := 0; i < numRows; i++ {
-				rows[i], values = values[:len(columns)], values[len(columns):]
-			}
-			return resultRowsFrame{columns, rows, pageState}, nil
-		case resultKindKeyspace:
-			keyspace := f.readString()
-			return resultKeyspaceFrame{keyspace}, nil
-		case resultKindPrepared:
-			id := f.readShortBytes()
-			args, _ := f.readMetaData(c.version)
-			if c.version < 2 {
-				return resultPreparedFrame{PreparedId: id, Arguments: args}, nil
-			}
-			rvals, _ := f.readMetaData(c.version)
-			return resultPreparedFrame{PreparedId: id, Arguments: args, ReturnValues: rvals}, nil
-		case resultKindSchemaChanged:
-			return resultVoidFrame{}, nil
-		default:
-			return nil, NewErrProtocol("Decoding frame: unknown result kind %s", kind)
-		}
-	case opAuthenticate:
-		return authenticateFrame{f.readString()}, nil
-	case opAuthChallenge:
-		return authChallengeFrame{f.readBytes()}, nil
-	case opAuthSuccess:
-		return authSuccessFrame{f.readBytes()}, nil
-	case opSupported:
-		return supportedFrame{}, nil
-	case opError:
-		return f.readError(), nil
-	default:
-		return nil, NewErrProtocol("Decoding frame: unknown op", op)
-	}
-}
-
 func (c *Conn) setKeepalive(d time.Duration) error {
 	if tc, ok := c.conn.(*net.TCPConn); ok {
 		err := tc.SetKeepAlivePeriod(d)
@@ -748,25 +697,8 @@ func (c *Conn) setKeepalive(d time.Duration) error {
 	return nil
 }
 
-// QueryInfo represents the meta data associated with a prepared CQL statement.
-type QueryInfo struct {
-	Id   []byte
-	Args []ColumnInfo
-	Rval []ColumnInfo
-}
-
-type callReq struct {
-	active int32
-	resp   chan callResp
-}
-
-type callResp struct {
-	buf frame
-	err error
-}
-
 type inflightPrepare struct {
-	info *QueryInfo
+	info *resultPreparedFrame
 	err  error
 	wg   sync.WaitGroup
 }

+ 55 - 53
conn_test.go

@@ -438,13 +438,14 @@ func NewSSLTestServer(t testing.TB, protocol uint8) *TestServer {
 }
 
 type TestServer struct {
-	Address  string
-	t        testing.TB
-	nreq     uint64
-	listen   net.Listener
-	nKillReq int64
-
-	protocol   uint8
+	Address    string
+	t          testing.TB
+	nreq       uint64
+	listen     net.Listener
+	nKillReq   int64
+	compressor Compressor
+
+	protocol   byte
 	headerSize int
 }
 
@@ -458,16 +459,19 @@ func (srv *TestServer) serve() {
 		go func(conn net.Conn) {
 			defer conn.Close()
 			for {
-				frame, err := srv.readFrame(conn)
-				if err == io.EOF {
-					return
-				} else if err != nil {
+				framer, err := srv.readFrame(conn)
+				if err != nil {
+					if err == io.EOF {
+						return
+					}
+
 					srv.t.Error(err)
-					continue
+					return
 				}
 
 				atomic.AddUint64(&srv.nreq, 1)
-				go srv.process(frame, conn)
+
+				go srv.process(framer)
 			}
 		}(conn)
 	}
@@ -477,24 +481,21 @@ func (srv *TestServer) Stop() {
 	srv.listen.Close()
 }
 
-func (srv *TestServer) process(f frame, conn net.Conn) {
-	headerSize := headerProtoSize[srv.protocol]
-	stream := f.Stream(srv.protocol)
+func (srv *TestServer) process(f *framer) {
+	head := f.header
+	if head == nil {
+		srv.t.Error("process frame with a nil header")
+		return
+	}
 
-	switch f.Op(srv.protocol) {
+	switch head.op {
 	case opStartup:
-		f = f[:headerSize]
-		f.setHeader(protoDirectionMask|srv.protocol, 0, stream, opReady)
+		f.writeHeader(0, opReady, head.stream)
 	case opOptions:
-		f = f[:headerSize]
-		f.setHeader(protoDirectionMask|srv.protocol, 0, stream, opSupported)
+		f.writeHeader(0, opSupported, head.stream)
 		f.writeShort(0)
 	case opQuery:
-		input := f
-		input.skipHeader(srv.protocol)
-		query := strings.TrimSpace(input.readLongString())
-		f = f[:headerSize]
-		f.setHeader(protoDirectionMask|srv.protocol, 0, stream, opResult)
+		query := f.readLongString()
 		first := query
 		if n := strings.Index(query, " "); n > 0 {
 			first = first[:n]
@@ -502,62 +503,63 @@ func (srv *TestServer) process(f frame, conn net.Conn) {
 		switch strings.ToLower(first) {
 		case "kill":
 			atomic.AddInt64(&srv.nKillReq, 1)
-			f = f[:headerSize]
-			f.setHeader(protoDirectionMask|srv.protocol, 0, stream, opError)
+			f.writeHeader(0, opError, head.stream)
 			f.writeInt(0x1001)
 			f.writeString("query killed")
 		case "slow":
 			go func() {
 				<-time.After(1 * time.Second)
+				f.writeHeader(0, opResult, head.stream)
+				f.wbuf[0] = srv.protocol | 0x80
 				f.writeInt(resultKindVoid)
-				f.setLength(len(f)-headerSize, srv.protocol)
-				if _, err := conn.Write(f); err != nil {
-					return
+				if err := f.finishWrite(); err != nil {
+					srv.t.Error(err)
 				}
 			}()
+
 			return
 		case "use":
-			f.writeInt(3)
+			f.writeInt(resultKindKeyspace)
 			f.writeString(strings.TrimSpace(query[3:]))
 		case "void":
+			f.writeHeader(0, opResult, head.stream)
 			f.writeInt(resultKindVoid)
 		default:
+			f.writeHeader(0, opResult, head.stream)
 			f.writeInt(resultKindVoid)
 		}
 	default:
-		f = f[:headerSize]
-		f.setHeader(protoDirectionMask|srv.protocol, 0, stream, opError)
+		f.writeHeader(0, opError, head.stream)
 		f.writeInt(0)
 		f.writeString("not supported")
 	}
 
-	f.setLength(len(f)-headerSize, srv.protocol)
-	if _, err := conn.Write(f); err != nil {
-		srv.t.Log(err)
-		return
+	f.wbuf[0] = srv.protocol | 0x80
+
+	if err := f.finishWrite(); err != nil {
+		srv.t.Error(err)
 	}
 }
 
-func (srv *TestServer) readFrame(conn net.Conn) (frame, error) {
-	frame := make(frame, srv.headerSize, srv.headerSize+512)
-	if _, err := io.ReadFull(conn, frame); err != nil {
+func (srv *TestServer) readFrame(conn net.Conn) (*framer, error) {
+	buf := make([]byte, srv.headerSize)
+	head, err := readHeader(conn, buf)
+	if err != nil {
 		return nil, err
 	}
+	framer := newFramer(conn, conn, nil, srv.protocol)
 
-	// should be a request frame
-	if frame[0]&protoDirectionMask != 0 {
-		return nil, fmt.Errorf("expected to read a request frame got version: 0x%x", frame[0])
-	}
-	if v := frame[0] & protoVersionMask; v != srv.protocol {
-		return nil, fmt.Errorf("expected to read protocol version 0x%x got 0x%x", srv.protocol, v)
+	err = framer.readFrame(&head)
+	if err != nil {
+		return nil, err
 	}
 
-	if n := frame.Length(srv.protocol); n > 0 {
-		frame.grow(n)
-		if _, err := io.ReadFull(conn, frame[srv.headerSize:]); err != nil {
-			return nil, err
-		}
+	// should be a request frame
+	if head.version.response() {
+		return nil, fmt.Errorf("expected to read a request frame got version: %v", head.version)
+	} else if head.version.version() != srv.protocol {
+		return nil, fmt.Errorf("expected to read protocol version 0x%x got 0x%x", srv.protocol, head.version.version())
 	}
 
-	return frame, nil
+	return framer, nil
 }

+ 12 - 0
errors.go

@@ -1,5 +1,7 @@
 package gocql
 
+import "fmt"
+
 const (
 	errServer        = 0x0000
 	errProtocol      = 0x000A
@@ -25,6 +27,8 @@ type RequestError interface {
 }
 
 type errorFrame struct {
+	frameHeader
+
 	code    int
 	message string
 }
@@ -41,6 +45,10 @@ func (e errorFrame) Error() string {
 	return e.Message()
 }
 
+func (e errorFrame) String() string {
+	return fmt.Sprintf("[error code=%x message=%q]", e.code, e.message)
+}
+
 type RequestErrUnavailable struct {
 	errorFrame
 	Consistency Consistency
@@ -48,6 +56,10 @@ type RequestErrUnavailable struct {
 	Alive       int
 }
 
+func (e *RequestErrUnavailable) String() string {
+	return fmt.Sprintf("[request_error_unavailable consistency=%s required=%d alive=%d]", e.Consistency, e.Required, e.Alive)
+}
+
 type RequestErrWriteTimeout struct {
 	errorFrame
 	Consistency Consistency

+ 1 - 1
errors_test.go

@@ -18,7 +18,7 @@ func TestErrorsParse(t *testing.T) {
 		t.Fatal("Should have gotten already exists error from cassandra server.")
 	} else {
 		switch e := err.(type) {
-		case RequestErrAlreadyExists:
+		case *RequestErrAlreadyExists:
 			if e.Table != "errors_parse" {
 				t.Fatal("Failed to parse error response from cassandra for ErrAlreadyExists.")
 			}

+ 1098 - 398
frame.go

@@ -6,7 +6,10 @@ package gocql
 
 import (
 	"fmt"
+	"io"
 	"net"
+	"sync"
+	"time"
 )
 
 const (
@@ -15,603 +18,1300 @@ const (
 	protoVersion1      = 0x01
 	protoVersion2      = 0x02
 	protoVersion3      = 0x03
+)
+
+type protoVersion byte
+
+func (p protoVersion) request() bool {
+	return p&protoDirectionMask == 0x00
+}
+
+func (p protoVersion) response() bool {
+	return p&protoDirectionMask == 0x80
+}
+
+func (p protoVersion) version() byte {
+	return byte(p) & protoVersionMask
+}
+
+func (p protoVersion) String() string {
+	dir := "REQ"
+	if p.response() {
+		dir = "RESP"
+	}
+
+	return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir)
+}
 
-	opError         byte = 0x00
-	opStartup       byte = 0x01
-	opReady         byte = 0x02
-	opAuthenticate  byte = 0x03
-	opOptions       byte = 0x05
-	opSupported     byte = 0x06
-	opQuery         byte = 0x07
-	opResult        byte = 0x08
-	opPrepare       byte = 0x09
-	opExecute       byte = 0x0A
-	opRegister      byte = 0x0B
-	opEvent         byte = 0x0C
-	opBatch         byte = 0x0D
-	opAuthChallenge byte = 0x0E
-	opAuthResponse  byte = 0x0F
-	opAuthSuccess   byte = 0x10
+type frameOp byte
 
+const (
+	// header ops
+	opError         frameOp = 0x00
+	opStartup               = 0x01
+	opReady                 = 0x02
+	opAuthenticate          = 0x03
+	opOptions               = 0x05
+	opSupported             = 0x06
+	opQuery                 = 0x07
+	opResult                = 0x08
+	opPrepare               = 0x09
+	opExecute               = 0x0A
+	opRegister              = 0x0B
+	opEvent                 = 0x0C
+	opBatch                 = 0x0D
+	opAuthChallenge         = 0x0E
+	opAuthResponse          = 0x0F
+	opAuthSuccess           = 0x10
+)
+
+func (f frameOp) String() string {
+	switch f {
+	case opError:
+		return "ERROR"
+	case opStartup:
+		return "STARTUP"
+	case opReady:
+		return "READY"
+	case opAuthenticate:
+		return "AUTHENTICATE"
+	case opOptions:
+		return "OPTIONS"
+	case opSupported:
+		return "SUPPORTED"
+	case opQuery:
+		return "QUERY"
+	case opResult:
+		return "RESULT"
+	case opPrepare:
+		return "PREPARE"
+	case opExecute:
+		return "EXECUTE"
+	case opRegister:
+		return "REGISTER"
+	case opEvent:
+		return "EVENT"
+	case opBatch:
+		return "BATCH"
+	case opAuthChallenge:
+		return "AUTH_CHALLENGE"
+	case opAuthResponse:
+		return "AUTH_RESPONSE"
+	case opAuthSuccess:
+		return "AUTH_SUCCESS"
+	default:
+		return fmt.Sprintf("UNKNOWN_OP_%d", f)
+	}
+}
+
+const (
+	// result kind
 	resultKindVoid          = 1
 	resultKindRows          = 2
 	resultKindKeyspace      = 3
 	resultKindPrepared      = 4
 	resultKindSchemaChanged = 5
 
-	flagQueryValues uint8 = 1
-	flagCompress    uint8 = 1
-	flagTrace       uint8 = 2
-	flagPageSize    uint8 = 4
-	flagPageState   uint8 = 8
-	flagHasMore     uint8 = 2
+	// rows flags
+	flagGlobalTableSpec int = 0x01
+	flagHasMorePages        = 0x02
+	flagNoMetaData          = 0x04
+
+	// query flags
+	flagValues                byte = 0x01
+	flagSkipMetaData               = 0x02
+	flagPageSize                   = 0x04
+	flagWithPagingState            = 0x08
+	flagWithSerialConsistency      = 0x10
+	flagDefaultTimestamp           = 0x20
+	flagWithNameValues             = 0x40
+
+	// header flags
+	flagCompress byte = 0x01
+	flagTracing       = 0x02
+)
+
+type Consistency uint16
+
+const (
+	Any         Consistency = 0x00
+	One                     = 0x01
+	Two                     = 0x02
+	Three                   = 0x03
+	Quorum                  = 0x04
+	All                     = 0x05
+	LocalQuorum             = 0x06
+	EachQuorum              = 0x07
+	Serial                  = 0x08
+	LocalSerial             = 0x09
+	LocalOne                = 0x0A
+)
+
+func (c Consistency) String() string {
+	switch c {
+	case Any:
+		return "ANY"
+	case One:
+		return "ONE"
+	case Two:
+		return "TWO"
+	case Three:
+		return "THREE"
+	case Quorum:
+		return "QUORUM"
+	case All:
+		return "ALL"
+	case LocalQuorum:
+		return "LOCAL_QUORUM"
+	case EachQuorum:
+		return "EACH_QUORUM"
+	case Serial:
+		return "SERIAL"
+	case LocalSerial:
+		return "LOCAL_SERIAL"
+	case LocalOne:
+		return "LOCAL_ONE"
+	default:
+		return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
+	}
+}
 
+const (
 	apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
 )
 
-var headerProtoSize = [...]int{
-	protoVersion1: 8,
-	protoVersion2: 8,
-	protoVersion3: 9,
+func writeInt(p []byte, n int32) {
+	p[0] = byte(n >> 24)
+	p[1] = byte(n >> 16)
+	p[2] = byte(n >> 8)
+	p[3] = byte(n)
 }
 
-// TODO: replace with a struct which has a header and a body buffer,
-// header just has methods like, set/get the options in its backing array
-// then in a writeTo we write the header then the body.
-type frame []byte
+func readInt(p []byte) int32 {
+	return int32(p[0])<<24 | int32(p[1])<<16 | int32(p[2])<<8 | int32(p[3])
+}
 
-func newFrame(version uint8) frame {
-	// TODO: pool these at the session level incase anyone is using different
-	// clusters with different versions in the same application.
-	return make(frame, headerProtoSize[version], defaultFrameSize)
+func writeShort(p []byte, n uint16) {
+	p[0] = byte(n >> 8)
+	p[1] = byte(n)
 }
 
-func (f *frame) writeInt(v int32) {
-	p := f.grow(4)
-	(*f)[p] = byte(v >> 24)
-	(*f)[p+1] = byte(v >> 16)
-	(*f)[p+2] = byte(v >> 8)
-	(*f)[p+3] = byte(v)
+func readShort(p []byte) uint16 {
+	return uint16(p[0])<<8 | uint16(p[1])
 }
 
-func (f *frame) writeShort(v uint16) {
-	p := f.grow(2)
-	(*f)[p] = byte(v >> 8)
-	(*f)[p+1] = byte(v)
+type frameHeader struct {
+	version protoVersion
+	flags   byte
+	stream  int
+	op      frameOp
+	length  int
 }
 
-func (f *frame) writeString(v string) {
-	f.writeShort(uint16(len(v)))
-	p := f.grow(len(v))
-	copy((*f)[p:], v)
+func (f frameHeader) String() string {
+	return fmt.Sprintf("[header version=%s flags=0x%x stream=%d op=%s length=%d]", f.version, f.flags, f.stream, f.op, f.length)
 }
 
-func (f *frame) writeLongString(v string) {
-	f.writeInt(int32(len(v)))
-	p := f.grow(len(v))
-	copy((*f)[p:], v)
+func (f frameHeader) Header() frameHeader {
+	return f
 }
 
-func (f *frame) writeUUID() {
+const defaultBufSize = 128
+
+var framerPool = sync.Pool{
+	New: func() interface{} {
+		return &framer{
+			wbuf:       make([]byte, defaultBufSize),
+			readBuffer: make([]byte, defaultBufSize),
+		}
+	},
 }
 
-func (f *frame) writeStringList(v []string) {
-	f.writeShort(uint16(len(v)))
-	for i := range v {
-		f.writeString(v[i])
+// a framer is responsible for reading, writing and parsing frames on a single stream
+type framer struct {
+	r io.Reader
+	w io.Writer
+
+	proto byte
+	// flags are for outgoing flags, enabling compression and tracing etc
+	flags    byte
+	compres  Compressor
+	headSize int
+	// if this frame was read then the header will be here
+	header *frameHeader
+
+	// if tracing flag is set this is not nil
+	traceID []byte
+
+	// holds a ref to the whole byte slice for rbuf so that it can be reset to
+	// 0 after a read.
+	readBuffer []byte
+
+	rbuf []byte
+	wbuf []byte
+}
+
+func newFramer(r io.Reader, w io.Writer, compressor Compressor, version byte) *framer {
+	f := framerPool.Get().(*framer)
+	var flags byte
+	if compressor != nil {
+		flags |= flagCompress
 	}
+
+	version &= protoVersionMask
+
+	headSize := 8
+	if version > protoVersion2 {
+		headSize = 9
+	}
+
+	f.compres = compressor
+	f.proto = version
+	f.flags = flags
+	f.headSize = headSize
+
+	f.r = r
+	f.rbuf = f.readBuffer[:0]
+
+	f.w = w
+	f.wbuf = f.wbuf[:0]
+
+	f.header = nil
+	f.traceID = nil
+
+	return f
 }
 
-func (f *frame) writeByte(v byte) {
-	p := f.grow(1)
-	(*f)[p] = v
+type frame interface {
+	Header() frameHeader
 }
 
-func (f *frame) writeBytes(v []byte) {
-	if v == nil {
-		f.writeInt(-1)
+func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {
+	_, err = io.ReadFull(r, p)
+	if err != nil {
 		return
 	}
-	f.writeInt(int32(len(v)))
-	p := f.grow(len(v))
-	copy((*f)[p:], v)
-}
 
-func (f *frame) writeShortBytes(v []byte) {
-	f.writeShort(uint16(len(v)))
-	p := f.grow(len(v))
-	copy((*f)[p:], v)
+	version := p[0] & protoVersionMask
+	head.version = protoVersion(p[0])
+
+	head.flags = p[1]
+	if version > protoVersion2 {
+		head.stream = int(readShort(p[2:]))
+		head.op = frameOp(p[4])
+		head.length = int(readInt(p[5:]))
+	} else {
+		head.stream = int(p[2])
+		head.op = frameOp(p[3])
+		head.length = int(readInt(p[4:]))
+	}
+
+	return
 }
 
-func (f *frame) writeInet(ip net.IP, port int) {
-	p := f.grow(1 + len(ip))
-	(*f)[p] = byte(len(ip))
-	copy((*f)[p+1:], ip)
-	f.writeInt(int32(port))
+// explicitly enables tracing for the framers outgoing requests
+func (f *framer) trace() {
+	f.flags |= flagTracing
 }
 
-func (f *frame) writeStringMap(v map[string]string) {
-	f.writeShort(uint16(len(v)))
-	for key, value := range v {
-		f.writeString(key)
-		f.writeString(value)
+// reads a frame form the wire into the framers buffer
+func (f *framer) readFrame(head *frameHeader) error {
+	if cap(f.readBuffer) >= head.length {
+		f.rbuf = f.readBuffer[:head.length]
+	} else {
+		f.readBuffer = make([]byte, head.length)
+		f.rbuf = f.readBuffer
 	}
-}
 
-func (f *frame) writeStringMultimap(v map[string][]string) {
-	f.writeShort(uint16(len(v)))
-	for key, values := range v {
-		f.writeString(key)
-		f.writeStringList(values)
+	// assume the underlying reader takes care of timeouts and retries
+	_, err := io.ReadFull(f.r, f.rbuf)
+	if err != nil {
+		return err
 	}
-}
 
-func (f *frame) setHeader(version, flags uint8, stream int, opcode uint8) {
-	(*f)[0] = version
-	(*f)[1] = flags
-	p := 2
-	if version&maskVersion > protoVersion2 {
-		(*f)[2] = byte(stream >> 8)
-		(*f)[3] = byte(stream)
-		p += 2
-	} else {
-		(*f)[2] = byte(stream & 0xFF)
-		p++
+	if head.flags&flagCompress == flagCompress {
+		if f.compres == nil {
+			return NewErrProtocol("no compressor available with compressed frame body")
+		}
+
+		f.rbuf, err = f.compres.Decode(f.rbuf)
+		if err != nil {
+			return err
+		}
 	}
 
-	(*f)[p] = opcode
+	f.header = head
+	return nil
 }
 
-func (f *frame) setStream(stream int, version uint8) {
-	if version > protoVersion2 {
-		(*f)[2] = byte(stream >> 8)
-		(*f)[3] = byte(stream)
-	} else {
-		(*f)[2] = byte(stream)
+func (f *framer) parseFrame() (frame, error) {
+	if f.header.version.request() {
+		return nil, NewErrProtocol("got a request frame from server: %v", f.header.version)
 	}
-}
 
-func (f *frame) Stream(version uint8) (n int) {
-	if version > protoVersion2 {
-		n = int((*f)[2])<<8 | int((*f)[3])
-	} else {
-		n = int((*f)[2])
+	if f.header.flags&flagTracing == flagTracing {
+		f.readTrace()
 	}
-	return
+
+	var (
+		frame frame
+		err   error
+	)
+
+	// asumes that the frame body has been read into rbuf
+	switch f.header.op {
+	case opError:
+		frame = f.parseErrorFrame()
+	case opReady:
+		frame = f.parseReadyFrame()
+	case opResult:
+		frame, err = f.parseResultFrame()
+	case opSupported:
+		frame = f.parseSupportedFrame()
+	case opAuthenticate:
+		frame = f.parseAuthenticateFrame()
+	case opAuthChallenge:
+		frame = f.parseAuthChallengeFrame()
+	case opAuthSuccess:
+		frame = f.parseAuthSuccessFrame()
+	default:
+		return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
+	}
+
+	return frame, err
 }
 
-func (f *frame) setLength(length int, version uint8) {
-	p := 4
-	if version > protoVersion2 {
-		p = 5
+func (f *framer) parseErrorFrame() frame {
+	code := f.readInt()
+	msg := f.readString()
+
+	errD := errorFrame{
+		frameHeader: *f.header,
+		code:        code,
+		message:     msg,
 	}
 
-	(*f)[p] = byte(length >> 24)
-	(*f)[p+1] = byte(length >> 16)
-	(*f)[p+2] = byte(length >> 8)
-	(*f)[p+3] = byte(length)
+	switch code {
+	case errUnavailable:
+		cl := f.readConsistency()
+		required := f.readInt()
+		alive := f.readInt()
+		return &RequestErrUnavailable{
+			errorFrame:  errD,
+			Consistency: cl,
+			Required:    required,
+			Alive:       alive,
+		}
+	case errWriteTimeout:
+		cl := f.readConsistency()
+		received := f.readInt()
+		blockfor := f.readInt()
+		writeType := f.readString()
+		return &RequestErrWriteTimeout{
+			errorFrame:  errD,
+			Consistency: cl,
+			Received:    received,
+			BlockFor:    blockfor,
+			WriteType:   writeType,
+		}
+	case errReadTimeout:
+		cl := f.readConsistency()
+		received := f.readInt()
+		blockfor := f.readInt()
+		dataPresent := f.readByte()
+		return &RequestErrReadTimeout{
+			errorFrame:  errD,
+			Consistency: cl,
+			Received:    received,
+			BlockFor:    blockfor,
+			DataPresent: dataPresent,
+		}
+	case errAlreadyExists:
+		ks := f.readString()
+		table := f.readString()
+		return &RequestErrAlreadyExists{
+			errorFrame: errD,
+			Keyspace:   ks,
+			Table:      table,
+		}
+	case errUnprepared:
+		stmtId := f.readShortBytes()
+		return &RequestErrUnprepared{
+			errorFrame:  errD,
+			StatementId: stmtId,
+		}
+	default:
+		return &errD
+	}
 }
 
-func (f *frame) Op(version uint8) byte {
-	if version > protoVersion2 {
-		return (*f)[4]
+func (f *framer) writeHeader(flags byte, op frameOp, stream int) {
+	f.wbuf = f.wbuf[:0]
+	f.wbuf = append(f.wbuf,
+		f.proto,
+		flags,
+	)
+
+	if f.proto > protoVersion2 {
+		f.wbuf = append(f.wbuf,
+			byte(stream>>8),
+			byte(stream),
+		)
 	} else {
-		return (*f)[3]
+		f.wbuf = append(f.wbuf,
+			byte(stream),
+		)
 	}
+
+	// pad out length
+	f.wbuf = append(f.wbuf,
+		byte(op),
+		0,
+		0,
+		0,
+		0,
+	)
 }
 
-func (f *frame) Length(version uint8) int {
+func (f *framer) setLength(length int) {
 	p := 4
-	if version > protoVersion2 {
+	if f.proto > protoVersion2 {
 		p = 5
 	}
 
-	return int((*f)[p])<<24 | int((*f)[p+1])<<16 | int((*f)[p+2])<<8 | int((*f)[p+3])
+	f.wbuf[p+0] = byte(length >> 24)
+	f.wbuf[p+1] = byte(length >> 16)
+	f.wbuf[p+2] = byte(length >> 8)
+	f.wbuf[p+3] = byte(length)
 }
 
-func (f *frame) grow(n int) int {
-	if len(*f)+n >= cap(*f) {
-		buf := make(frame, len(*f), len(*f)*2+n)
-		copy(buf, *f)
-		*f = buf
+func (f *framer) finishWrite() error {
+	if f.wbuf[1]&flagCompress == flagCompress {
+		if f.compres == nil {
+			panic("compress flag set with no compressor")
+		}
+
+		// TODO: only compress frames which are big enough
+		compressed, err := f.compres.Encode(f.wbuf[f.headSize:])
+		if err != nil {
+			return err
+		}
+
+		f.wbuf = append(f.wbuf[:f.headSize], compressed...)
 	}
-	p := len(*f)
-	*f = (*f)[:p+n]
-	return p
+	length := len(f.wbuf) - f.headSize
+	f.setLength(length)
+
+	_, err := f.w.Write(f.wbuf)
+	if err != nil {
+		return err
+	}
+
+	return nil
 }
 
-func (f *frame) skipHeader(version uint8) {
-	*f = (*f)[headerProtoSize[version]:]
+func (f *framer) readTrace() {
+	f.traceID = f.readUUID().Bytes()
 }
 
-func (f *frame) readInt() int {
-	if len(*f) < 4 {
-		panic(NewErrProtocol("Trying to read an int while <4 bytes in the buffer"))
-	}
-	v := uint32((*f)[0])<<24 | uint32((*f)[1])<<16 | uint32((*f)[2])<<8 | uint32((*f)[3])
-	*f = (*f)[4:]
-	return int(int32(v))
+type readyFrame struct {
+	frameHeader
 }
 
-func (f *frame) readShort() uint16 {
-	if len(*f) < 2 {
-		panic(NewErrProtocol("Trying to read a short while <2 bytes in the buffer"))
+func (f *framer) parseReadyFrame() frame {
+	return &readyFrame{
+		frameHeader: *f.header,
 	}
-	v := uint16((*f)[0])<<8 | uint16((*f)[1])
-	*f = (*f)[2:]
-	return v
 }
 
-func (f *frame) readString() string {
-	n := int(f.readShort())
-	if len(*f) < n {
-		panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
-	}
-	v := string((*f)[:n])
-	*f = (*f)[n:]
-	return v
+type supportedFrame struct {
+	frameHeader
+
+	supported map[string][]string
 }
 
-func (f *frame) readLongString() string {
-	n := f.readInt()
-	if len(*f) < n {
-		panic(NewErrProtocol("Trying to read a string of %d bytes from a buffer with %d bytes in it", n, len(*f)))
+// TODO: if we move the body buffer onto the frameHeader then we only need a single
+// framer, and can move the methods onto the header.
+func (f *framer) parseSupportedFrame() frame {
+	return &supportedFrame{
+		frameHeader: *f.header,
+
+		supported: f.readStringMultiMap(),
 	}
-	v := string((*f)[:n])
-	*f = (*f)[n:]
-	return v
 }
 
-func (f *frame) readBytes() []byte {
-	n := f.readInt()
-	if n < 0 {
-		return nil
-	}
-	if len(*f) < n {
-		panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
-	}
-	v := (*f)[:n]
-	*f = (*f)[n:]
-	return v
+type writeStartupFrame struct {
+	opts map[string]string
 }
 
-func (f *frame) readShortBytes() []byte {
-	n := int(f.readShort())
-	if len(*f) < n {
-		panic(NewErrProtocol("Trying to read %d bytes from a buffer with %d bytes in it", n, len(*f)))
-	}
-	v := (*f)[:n]
-	*f = (*f)[n:]
-	return v
+func (w *writeStartupFrame) writeFrame(framer *framer, streamID int) error {
+	return framer.writeStartupFrame(streamID, w.opts)
+}
+
+func (f *framer) writeStartupFrame(streamID int, options map[string]string) error {
+	f.writeHeader(f.flags&^flagCompress, opStartup, streamID)
+	f.writeStringMap(options)
+
+	return f.finishWrite()
+}
+
+type writePrepareFrame struct {
+	statement string
+}
+
+func (w *writePrepareFrame) writeFrame(framer *framer, streamID int) error {
+	return framer.writePrepareFrame(streamID, w.statement)
+}
+
+func (f *framer) writePrepareFrame(stream int, statement string) error {
+	f.writeHeader(f.flags, opPrepare, stream)
+	f.writeLongString(statement)
+	return f.finishWrite()
 }
 
-func (f *frame) readTypeInfo(version uint8) *TypeInfo {
-	x := f.readShort()
+func (f *framer) readTypeInfo() *TypeInfo {
+	id := f.readShort()
 	typ := &TypeInfo{
-		Proto: version,
-		Type:  Type(x),
+		// we need to pass proto to the marshaller here
+		Proto: f.proto,
+		Type:  Type(id),
 	}
+
 	switch typ.Type {
 	case TypeCustom:
 		typ.Custom = f.readString()
 		if cassType := getApacheCassandraType(typ.Custom); cassType != TypeCustom {
-			typ = &TypeInfo{Type: cassType}
+			typ = &TypeInfo{
+				Proto: f.proto,
+				Type:  cassType,
+			}
 			switch typ.Type {
 			case TypeMap:
-				typ.Key = f.readTypeInfo(version)
+				typ.Key = f.readTypeInfo()
 				fallthrough
 			case TypeList, TypeSet:
-				typ.Elem = f.readTypeInfo(version)
+				typ.Elem = f.readTypeInfo()
 			}
 		}
 	case TypeMap:
-		typ.Key = f.readTypeInfo(version)
+		typ.Key = f.readTypeInfo()
 		fallthrough
 	case TypeList, TypeSet:
-		typ.Elem = f.readTypeInfo(version)
+		typ.Elem = f.readTypeInfo()
 	}
+
 	return typ
 }
 
-func (f *frame) readMetaData(version uint8) ([]ColumnInfo, []byte) {
-	flags := f.readInt()
-	numColumns := f.readInt()
+type resultMetadata struct {
+	flags int
 
-	var pageState []byte
-	if flags&2 != 0 {
-		pageState = f.readBytes()
+	// only if flagPageState
+	pagingState []byte
+
+	columns []ColumnInfo
+}
+
+func (r resultMetadata) String() string {
+	return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns)
+}
+
+func (f *framer) parseResultMetadata() resultMetadata {
+	meta := resultMetadata{
+		flags: f.readInt(),
 	}
 
-	globalKeyspace := ""
-	globalTable := ""
-	if flags&1 != 0 {
-		globalKeyspace = f.readString()
-		globalTable = f.readString()
+	colCount := f.readInt()
+
+	if meta.flags&flagHasMorePages == flagHasMorePages {
+		meta.pagingState = f.readBytes()
 	}
 
-	columns := make([]ColumnInfo, numColumns)
-	for i := 0; i < numColumns; i++ {
-		columns[i].Keyspace = globalKeyspace
-		columns[i].Table = globalTable
-		if flags&1 == 0 {
-			columns[i].Keyspace = f.readString()
-			columns[i].Table = f.readString()
-		}
-		columns[i].Name = f.readString()
-		columns[i].TypeInfo = f.readTypeInfo(version)
+	if meta.flags&flagNoMetaData == flagNoMetaData {
+		return meta
 	}
-	return columns, pageState
-}
 
-func (f *frame) readError() RequestError {
-	code := f.readInt()
-	msg := f.readString()
-	errD := errorFrame{code, msg}
-	switch code {
-	case errUnavailable:
-		cl := Consistency(f.readShort())
-		required := f.readInt()
-		alive := f.readInt()
-		return RequestErrUnavailable{errorFrame: errD,
-			Consistency: cl,
-			Required:    required,
-			Alive:       alive}
-	case errWriteTimeout:
-		cl := Consistency(f.readShort())
-		received := f.readInt()
-		blockfor := f.readInt()
-		writeType := f.readString()
-		return RequestErrWriteTimeout{errorFrame: errD,
-			Consistency: cl,
-			Received:    received,
-			BlockFor:    blockfor,
-			WriteType:   writeType,
-		}
-	case errReadTimeout:
-		cl := Consistency(f.readShort())
-		received := f.readInt()
-		blockfor := f.readInt()
-		dataPresent := (*f)[0]
-		*f = (*f)[1:]
-		return RequestErrReadTimeout{errorFrame: errD,
-			Consistency: cl,
-			Received:    received,
-			BlockFor:    blockfor,
-			DataPresent: dataPresent,
-		}
-	case errAlreadyExists:
-		ks := f.readString()
-		table := f.readString()
-		return RequestErrAlreadyExists{errorFrame: errD,
-			Keyspace: ks,
-			Table:    table,
-		}
-	case errUnprepared:
-		stmtId := f.readShortBytes()
-		return RequestErrUnprepared{errorFrame: errD,
-			StatementId: stmtId,
+	var keyspace, table string
+	globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
+	if globalSpec {
+		keyspace = f.readString()
+		table = f.readString()
+	}
+
+	cols := make([]ColumnInfo, colCount)
+
+	for i := 0; i < colCount; i++ {
+		col := &cols[i]
+
+		if !globalSpec {
+			col.Keyspace = f.readString()
+			col.Table = f.readString()
+		} else {
+			col.Keyspace = keyspace
+			col.Table = table
 		}
-	default:
-		return errD
+
+		col.Name = f.readString()
+		col.TypeInfo = f.readTypeInfo()
 	}
+
+	meta.columns = cols
+
+	return meta
 }
 
-func (f *frame) writeConsistency(c Consistency) {
-	f.writeShort(consistencyCodes[c])
+type resultVoidFrame struct {
+	frameHeader
 }
 
-func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
-	return f, nil
+func (f *resultVoidFrame) String() string {
+	return "[result_void]"
 }
 
-var consistencyCodes = []uint16{
-	Any:         0x0000,
-	One:         0x0001,
-	Two:         0x0002,
-	Three:       0x0003,
-	Quorum:      0x0004,
-	All:         0x0005,
-	LocalQuorum: 0x0006,
-	EachQuorum:  0x0007,
-	Serial:      0x0008,
-	LocalSerial: 0x0009,
-	LocalOne:    0x000A,
+func (f *framer) parseResultFrame() (frame, error) {
+	kind := f.readInt()
+
+	switch kind {
+	case resultKindVoid:
+		return &resultVoidFrame{frameHeader: *f.header}, nil
+	case resultKindRows:
+		return f.parseResultRows(), nil
+	case resultKindKeyspace:
+		return f.parseResultSetKeyspace(), nil
+	case resultKindPrepared:
+		return f.parseResultPrepared(), nil
+	case resultKindSchemaChanged:
+		return f.parseResultSchemaChange(), nil
+	}
+
+	return nil, NewErrProtocol("unknown result kind: %x", kind)
 }
 
-type readyFrame struct{}
+type resultRowsFrame struct {
+	frameHeader
 
-type supportedFrame struct{}
+	meta resultMetadata
+	rows [][][]byte
+}
 
-type resultVoidFrame struct{}
+func (f *resultRowsFrame) String() string {
+	return fmt.Sprintf("[result_rows meta=%v]", f.meta)
+}
 
-type resultRowsFrame struct {
-	Columns     []ColumnInfo
-	Rows        [][][]byte
-	PagingState []byte
+func (f *framer) parseResultRows() frame {
+	meta := f.parseResultMetadata()
+
+	numRows := f.readInt()
+	colCount := len(meta.columns)
+
+	rows := make([][][]byte, numRows)
+	for i := 0; i < numRows; i++ {
+		rows[i] = make([][]byte, colCount)
+		for j := 0; j < colCount; j++ {
+			rows[i][j] = f.readBytes()
+		}
+	}
+
+	return &resultRowsFrame{
+		frameHeader: *f.header,
+		meta:        meta,
+		rows:        rows,
+	}
 }
 
 type resultKeyspaceFrame struct {
-	Keyspace string
+	frameHeader
+	keyspace string
+}
+
+func (r *resultKeyspaceFrame) String() string {
+	return fmt.Sprintf("[result_keyspace keyspace=%s]", r.keyspace)
+}
+
+func (f *framer) parseResultSetKeyspace() frame {
+	return &resultKeyspaceFrame{
+		frameHeader: *f.header,
+		keyspace:    f.readString(),
+	}
 }
 
 type resultPreparedFrame struct {
-	PreparedId   []byte
-	Arguments    []ColumnInfo
-	ReturnValues []ColumnInfo
+	frameHeader
+
+	preparedID []byte
+	reqMeta    resultMetadata
+	respMeta   resultMetadata
+}
+
+func (f *framer) parseResultPrepared() frame {
+	frame := &resultPreparedFrame{
+		frameHeader: *f.header,
+		preparedID:  f.readShortBytes(),
+		reqMeta:     f.parseResultMetadata(),
+	}
+
+	if f.proto < protoVersion2 {
+		return frame
+	}
+
+	frame.respMeta = f.parseResultMetadata()
+
+	return frame
 }
 
-type operation interface {
-	encodeFrame(version uint8, dst frame) (frame, error)
+type resultSchemaChangeFrame struct {
+	frameHeader
+
+	change   string
+	keyspace string
+	table    string
 }
 
-type startupFrame struct {
-	CQLVersion  string
-	Compression string
+func (s *resultSchemaChangeFrame) String() string {
+	return fmt.Sprintf("[result_schema_change change=%s keyspace=%s table=%s]", s.change, s.keyspace, s.table)
+}
+
+func (f *framer) parseResultSchemaChange() frame {
+	frame := &resultSchemaChangeFrame{
+		frameHeader: *f.header,
+	}
+
+	if f.proto < protoVersion3 {
+		frame.change = f.readString()
+		frame.keyspace = f.readString()
+		frame.table = f.readString()
+	} else {
+		// TODO: improve type representation of this
+		frame.change = f.readString()
+		target := f.readString()
+		switch target {
+		case "KEYSPACE":
+			frame.keyspace = f.readString()
+		case "TABLE", "TYPE":
+			frame.keyspace = f.readString()
+			frame.table = f.readString()
+		}
+	}
+
+	return frame
+}
+
+type authenticateFrame struct {
+	frameHeader
+
+	class string
 }
 
-func (op *startupFrame) String() string {
-	return fmt.Sprintf("[startup cqlversion=%q compression=%q]", op.CQLVersion, op.Compression)
+func (a *authenticateFrame) String() string {
+	return fmt.Sprintf("[authenticate class=%q]", a.class)
 }
 
-func (op *startupFrame) encodeFrame(version uint8, f frame) (frame, error) {
-	if f == nil {
-		f = newFrame(version)
+func (f *framer) parseAuthenticateFrame() frame {
+	return &authenticateFrame{
+		frameHeader: *f.header,
+		class:       f.readString(),
 	}
+}
+
+type authSuccessFrame struct {
+	frameHeader
 
-	f.setHeader(version, 0, 0, opStartup)
+	data []byte
+}
+
+func (a *authSuccessFrame) String() string {
+	return fmt.Sprintf("[auth_success data=%q]", a.data)
+}
 
-	// TODO: fix this, this is actually a StringMap
-	var size uint16 = 1
-	if op.Compression != "" {
-		size++
+func (f *framer) parseAuthSuccessFrame() frame {
+	return &authSuccessFrame{
+		frameHeader: *f.header,
+		data:        f.readBytes(),
 	}
+}
+
+type authChallengeFrame struct {
+	frameHeader
 
-	f.writeShort(size)
-	f.writeString("CQL_VERSION")
-	f.writeString(op.CQLVersion)
+	data []byte
+}
+
+func (a *authChallengeFrame) String() string {
+	return fmt.Sprintf("[auth_challenge data=%q]", a.data)
+}
 
-	if op.Compression != "" {
-		f.writeString("COMPRESSION")
-		f.writeString(op.Compression)
+func (f *framer) parseAuthChallengeFrame() frame {
+	return &authChallengeFrame{
+		frameHeader: *f.header,
+		data:        f.readBytes(),
 	}
+}
+
+type writeAuthResponseFrame struct {
+	data []byte
+}
 
-	return f, nil
+func (a *writeAuthResponseFrame) String() string {
+	return fmt.Sprintf("[auth_response data=%q]", a.data)
 }
 
-type queryFrame struct {
-	Stmt      string
-	Prepared  []byte
-	Cons      Consistency
-	Values    [][]byte
-	PageSize  int
-	PageState []byte
+func (a *writeAuthResponseFrame) writeFrame(framer *framer, streamID int) error {
+	return framer.writeAuthResponseFrame(streamID, a.data)
 }
 
-func (op *queryFrame) String() string {
-	return fmt.Sprintf("[query statement=%q prepared=%x cons=%v ...]", op.Stmt, op.Prepared, op.Cons)
+func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error {
+	f.writeHeader(f.flags, opAuthResponse, streamID)
+	f.writeBytes(data)
+	return f.finishWrite()
 }
 
-func (op *queryFrame) encodeFrame(version uint8, f frame) (frame, error) {
-	if version == 1 && (op.PageSize != 0 || len(op.PageState) > 0 ||
-		(len(op.Values) > 0 && len(op.Prepared) == 0)) {
-		return nil, ErrUnsupported
+type queryValues struct {
+	value []byte
+	// optional name, will set With names for values flag
+	name string
+}
+
+type queryParams struct {
+	consistency Consistency
+	// v2+
+	skipMeta          bool
+	values            []queryValues
+	pageSize          int
+	pagingState       []byte
+	serialConsistency Consistency
+	// v3+
+	timestamp *time.Time
+}
+
+func (q queryParams) String() string {
+	return fmt.Sprintf("[query_params consistency=%v skip_meta=%v page_size=%d paging_state=%q serial_consistency=%v timestamp=%v values=%v]",
+		q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.timestamp, q.values)
+}
+
+func (f *framer) writeQueryParams(opts *queryParams) {
+	f.writeConsistency(opts.consistency)
+
+	if f.proto == protoVersion1 {
+		return
 	}
 
-	if f == nil {
-		f = newFrame(version)
+	var flags byte
+	if len(opts.values) > 0 {
+		flags |= flagValues
+	}
+	if opts.skipMeta {
+		flags |= flagSkipMetaData
+	}
+	if opts.pageSize > 0 {
+		flags |= flagPageSize
+	}
+	if len(opts.pagingState) > 0 {
+		flags |= flagWithPagingState
+	}
+	if opts.serialConsistency > 0 {
+		flags |= flagWithSerialConsistency
 	}
 
-	if len(op.Prepared) > 0 {
-		f.setHeader(version, 0, 0, opExecute)
-		f.writeShortBytes(op.Prepared)
-	} else {
-		f.setHeader(version, 0, 0, opQuery)
-		f.writeLongString(op.Stmt)
+	names := false
+
+	// protoV3 specific things
+	if f.proto > protoVersion2 {
+		if opts.timestamp != nil {
+			flags |= flagDefaultTimestamp
+		}
+		if len(opts.values) > 0 && opts.values[0].name != "" {
+			flags |= flagWithNameValues
+			names = true
+		}
 	}
 
-	if version >= 2 {
-		f.writeConsistency(op.Cons)
-		flagPos := len(f)
-		f.writeByte(0)
+	f.writeByte(flags)
 
-		if len(op.Values) > 0 {
-			f[flagPos] |= flagQueryValues
-			f.writeShort(uint16(len(op.Values)))
-			for _, value := range op.Values {
-				f.writeBytes(value)
+	if n := len(opts.values); n > 0 {
+		f.writeShort(uint16(n))
+		for i := 0; i < n; i++ {
+			if names {
+				f.writeString(opts.values[i].name)
 			}
+			f.writeBytes(opts.values[i].value)
 		}
+	}
+
+	if opts.pageSize > 0 {
+		f.writeInt(int32(opts.pageSize))
+	}
+
+	if len(opts.pagingState) > 0 {
+		f.writeBytes(opts.pagingState)
+	}
+
+	if opts.serialConsistency > 0 {
+		f.writeConsistency(opts.serialConsistency)
+	}
+
+	if f.proto > protoVersion2 && opts.timestamp != nil {
+		// timestamp in microseconds
+		// TODO: should the timpestamp be set on the queryParams or should we set
+		// it here?
+		ts := opts.timestamp.UnixNano() / 1000
+		f.writeLong(ts)
+	}
+}
+
+type writeQueryFrame struct {
+	statement string
+	params    queryParams
+}
+
+func (w *writeQueryFrame) String() string {
+	return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params)
+}
+
+func (w *writeQueryFrame) writeFrame(framer *framer, streamID int) error {
+	return framer.writeQueryFrame(streamID, w.statement, &w.params)
+}
+
+func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams) error {
+	f.writeHeader(f.flags, opQuery, streamID)
+	f.writeLongString(statement)
+	f.writeQueryParams(params)
+
+	return f.finishWrite()
+}
+
+type frameWriter interface {
+	writeFrame(framer *framer, streamID int) error
+}
+
+type writeExecuteFrame struct {
+	preparedID []byte
+	params     queryParams
+}
+
+func (e *writeExecuteFrame) String() string {
+	return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params)
+}
+
+func (e *writeExecuteFrame) writeFrame(fr *framer, streamID int) error {
+	return fr.writeExecuteFrame(streamID, e.preparedID, &e.params)
+}
 
-		if op.PageSize > 0 {
-			f[flagPos] |= flagPageSize
-			f.writeInt(int32(op.PageSize))
+func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams) error {
+	f.writeHeader(f.flags, opExecute, streamID)
+	f.writeShortBytes(preparedID)
+	if f.proto > protoVersion1 {
+		f.writeQueryParams(params)
+	} else {
+		n := len(params.values)
+		f.writeShort(uint16(n))
+		for i := 0; i < n; i++ {
+			f.writeBytes(params.values[i].value)
 		}
+		f.writeConsistency(params.consistency)
+	}
+
+	return f.finishWrite()
+}
+
+// TODO: can we replace BatchStatemt with batchStatement? As they prety much
+// duplicate each other
+type batchStatment struct {
+	preparedID []byte
+	statement  string
+	values     []queryValues
+}
+
+type writeBatchFrame struct {
+	typ               BatchType
+	statements        []batchStatment
+	consistency       Consistency
+	serialConsistency Consistency
+	defaultTimestamp  bool
+}
+
+func (w *writeBatchFrame) writeFrame(framer *framer, streamID int) error {
+	return framer.writeBatchFrame(streamID, w)
+}
+
+func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame) error {
+	f.writeHeader(f.flags, opBatch, streamID)
+	f.writeByte(byte(w.typ))
 
-		if len(op.PageState) > 0 {
-			f[flagPos] |= flagPageState
-			f.writeBytes(op.PageState)
+	n := len(w.statements)
+	f.writeShort(uint16(n))
+
+	var flags byte
+
+	for i := 0; i < n; i++ {
+		b := &w.statements[i]
+		if len(b.preparedID) == 0 {
+			f.writeByte(0)
+			f.writeLongString(b.statement)
+			continue
 		}
-	} else if version == 1 {
-		if len(op.Prepared) > 0 {
-			f.writeShort(uint16(len(op.Values)))
-			for _, value := range op.Values {
-				f.writeBytes(value)
+
+		f.writeByte(1)
+		f.writeShortBytes(b.preparedID)
+		f.writeShort(uint16(len(b.values)))
+		for j := range b.values {
+			col := &b.values[j]
+			if f.proto > protoVersion2 && col.name != "" {
+				// TODO: move this check into the caller and set a flag on writeBatchFrame
+				// to indicate using named values
+				flags |= flagWithNameValues
+				f.writeString(col.name)
 			}
+			f.writeBytes(col.value)
+		}
+	}
+
+	f.writeConsistency(w.consistency)
+
+	if f.proto > protoVersion2 {
+		if w.serialConsistency > 0 {
+			flags |= flagWithSerialConsistency
+		}
+		if w.defaultTimestamp {
+			flags |= flagDefaultTimestamp
+		}
+
+		f.writeByte(flags)
+
+		if w.serialConsistency > 0 {
+			f.writeConsistency(w.serialConsistency)
+		}
+		if w.defaultTimestamp {
+			now := time.Now().UnixNano() / 1000
+			f.writeLong(now)
 		}
-		f.writeConsistency(op.Cons)
 	}
 
-	return f, nil
+	return f.finishWrite()
+}
+
+func (f *framer) readByte() byte {
+	b := f.rbuf[0]
+	f.rbuf = f.rbuf[1:]
+	return b
+}
+
+func (f *framer) readInt() (n int) {
+	n = int(int32(f.rbuf[0])<<24 | int32(f.rbuf[1])<<16 | int32(f.rbuf[2])<<8 | int32(f.rbuf[3]))
+	f.rbuf = f.rbuf[4:]
+	return
+}
+
+func (f *framer) readShort() (n uint16) {
+	n = uint16(f.rbuf[0])<<8 | uint16(f.rbuf[1])
+	f.rbuf = f.rbuf[2:]
+	return
+}
+
+func (f *framer) readLong() (n int64) {
+	n = int64(f.rbuf[0])<<56 | int64(f.rbuf[1])<<48 | int64(f.rbuf[2])<<40 | int64(f.rbuf[3])<<32 |
+		int64(f.rbuf[4])<<24 | int64(f.rbuf[5])<<16 | int64(f.rbuf[6])<<8 | int64(f.rbuf[7])
+	f.rbuf = f.rbuf[8:]
+	return
+}
+
+func (f *framer) readString() (s string) {
+	size := f.readShort()
+	s = string(f.rbuf[:size])
+	f.rbuf = f.rbuf[size:]
+	return
+}
+
+func (f *framer) readLongString() (s string) {
+	size := f.readInt()
+	s = string(f.rbuf[:size])
+	f.rbuf = f.rbuf[size:]
+	return
+}
+
+func (f *framer) readUUID() *UUID {
+	// TODO: how to handle this error, if it is a uuid, then sureley, problems?
+	u, _ := UUIDFromBytes(f.rbuf[:16])
+	f.rbuf = f.rbuf[16:]
+	return &u
+}
+
+func (f *framer) readStringList() []string {
+	size := f.readShort()
+
+	l := make([]string, size)
+	for i := 0; i < int(size); i++ {
+		l[i] = f.readString()
+	}
+
+	return l
 }
 
-type prepareFrame struct {
-	Stmt string
+func (f *framer) readBytes() []byte {
+	size := f.readInt()
+	if size < 0 {
+		return nil
+	}
+
+	// we cant make assumptions about the length of the life of the supplied byte
+	// slice so we defensivly copy it out of the underlying buffer. This has the
+	// downside of increasing allocs per read but will provide much greater memory
+	// safety. The allocs can hopefully be improved in the future.
+	// TODO: dont copy into a new slice
+	l := make([]byte, size)
+	copy(l, f.rbuf[:size])
+	f.rbuf = f.rbuf[size:]
+
+	return l
 }
 
-func (op *prepareFrame) String() string {
-	return fmt.Sprintf("[prepare statement=%q]", op.Stmt)
+func (f *framer) readShortBytes() []byte {
+	size := f.readShort()
+
+	l := make([]byte, size)
+	copy(l, f.rbuf[:size])
+	f.rbuf = f.rbuf[size:]
+
+	return l
 }
 
-func (op *prepareFrame) encodeFrame(version uint8, f frame) (frame, error) {
-	if f == nil {
-		f = newFrame(version)
+func (f *framer) readInet() (net.IP, int) {
+	size := f.rbuf[0]
+	f.rbuf = f.rbuf[1:]
+
+	if !(size == 4 || size == 16) {
+		panic(fmt.Sprintf("invalid IP size: %d", size))
 	}
-	f.setHeader(version, 0, 0, opPrepare)
-	f.writeLongString(op.Stmt)
-	return f, nil
+
+	ip := make([]byte, size)
+	copy(ip, f.rbuf[:size])
+	f.rbuf = f.rbuf[size:]
+
+	port := f.readInt()
+	return net.IP(ip), port
+}
+
+func (f *framer) readConsistency() Consistency {
+	return Consistency(f.readShort())
 }
 
-type optionsFrame struct{}
+func (f *framer) readStringMap() map[string]string {
+	size := f.readShort()
+	m := make(map[string]string)
+
+	for i := 0; i < int(size); i++ {
+		k := f.readString()
+		v := f.readString()
+		m[k] = v
+	}
 
-func (op *optionsFrame) String() string {
-	return "[options]"
+	return m
 }
 
-func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
-	if f == nil {
-		f = newFrame(version)
+func (f *framer) readStringMultiMap() map[string][]string {
+	size := f.readShort()
+	m := make(map[string][]string)
+
+	for i := 0; i < int(size); i++ {
+		k := f.readString()
+		v := f.readStringList()
+		m[k] = v
 	}
-	f.setHeader(version, 0, 0, opOptions)
-	return f, nil
+
+	return m
 }
 
-type authenticateFrame struct {
-	Authenticator string
+func (f *framer) writeByte(b byte) {
+	f.wbuf = append(f.wbuf, b)
+}
+
+// these are protocol level binary types
+func (f *framer) writeInt(n int32) {
+	f.wbuf = append(f.wbuf,
+		byte(n>>24),
+		byte(n>>16),
+		byte(n>>8),
+		byte(n),
+	)
+}
+
+func (f *framer) writeShort(n uint16) {
+	f.wbuf = append(f.wbuf,
+		byte(n>>8),
+		byte(n),
+	)
+}
+
+func (f *framer) writeLong(n int64) {
+	f.wbuf = append(f.wbuf,
+		byte(n>>56),
+		byte(n>>48),
+		byte(n>>40),
+		byte(n>>32),
+		byte(n>>24),
+		byte(n>>16),
+		byte(n>>8),
+		byte(n),
+	)
 }
 
-func (op *authenticateFrame) String() string {
-	return fmt.Sprintf("[authenticate authenticator=%q]", op.Authenticator)
+func (f *framer) writeString(s string) {
+	f.writeShort(uint16(len(s)))
+	f.wbuf = append(f.wbuf, s...)
 }
 
-type authResponseFrame struct {
-	Data []byte
+func (f *framer) writeLongString(s string) {
+	f.writeInt(int32(len(s)))
+	f.wbuf = append(f.wbuf, s...)
 }
 
-func (op *authResponseFrame) String() string {
-	return fmt.Sprintf("[auth_response data=%q]", op.Data)
+func (f *framer) writeUUID(u *UUID) {
+	f.wbuf = append(f.wbuf, u[:]...)
 }
 
-func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
-	if f == nil {
-		f = newFrame(version)
+func (f *framer) writeStringList(l []string) {
+	f.writeShort(uint16(len(l)))
+	for _, s := range l {
+		f.writeString(s)
 	}
-	f.setHeader(version, 0, 0, opAuthResponse)
-	f.writeBytes(op.Data)
-	return f, nil
 }
 
-type authSuccessFrame struct {
-	Data []byte
+func (f *framer) writeBytes(p []byte) {
+	// TODO: handle null case correctly,
+	//     [bytes]        A [int] n, followed by n bytes if n >= 0. If n < 0,
+	//					  no byte should follow and the value represented is `null`.
+	if p == nil {
+		f.writeInt(-1)
+	} else {
+		f.writeInt(int32(len(p)))
+		f.wbuf = append(f.wbuf, p...)
+	}
 }
 
-func (op *authSuccessFrame) String() string {
-	return fmt.Sprintf("[auth_success data=%q]", op.Data)
+func (f *framer) writeShortBytes(p []byte) {
+	f.writeShort(uint16(len(p)))
+	f.wbuf = append(f.wbuf, p...)
 }
 
-type authChallengeFrame struct {
-	Data []byte
+func (f *framer) writeInet(ip net.IP, port int) {
+	f.wbuf = append(f.wbuf,
+		byte(len(ip)),
+	)
+
+	f.wbuf = append(f.wbuf,
+		[]byte(ip)...,
+	)
+
+	f.writeInt(int32(port))
 }
 
-func (op *authChallengeFrame) String() string {
-	return fmt.Sprintf("[auth_challenge data=%q]", op.Data)
+func (f *framer) writeConsistency(cons Consistency) {
+	f.writeShort(uint16(cons))
+}
+
+func (f *framer) writeStringMap(m map[string]string) {
+	f.writeShort(uint16(len(m)))
+	for k, v := range m {
+		f.writeString(k)
+		f.writeString(v)
+	}
 }

+ 19 - 19
marshal.go

@@ -1176,24 +1176,24 @@ type Type int
 
 const (
 	TypeCustom    Type = 0x0000
-	TypeAscii     Type = 0x0001
-	TypeBigInt    Type = 0x0002
-	TypeBlob      Type = 0x0003
-	TypeBoolean   Type = 0x0004
-	TypeCounter   Type = 0x0005
-	TypeDecimal   Type = 0x0006
-	TypeDouble    Type = 0x0007
-	TypeFloat     Type = 0x0008
-	TypeInt       Type = 0x0009
-	TypeTimestamp Type = 0x000B
-	TypeUUID      Type = 0x000C
-	TypeVarchar   Type = 0x000D
-	TypeVarint    Type = 0x000E
-	TypeTimeUUID  Type = 0x000F
-	TypeInet      Type = 0x0010
-	TypeList      Type = 0x0020
-	TypeMap       Type = 0x0021
-	TypeSet       Type = 0x0022
+	TypeAscii          = 0x0001
+	TypeBigInt         = 0x0002
+	TypeBlob           = 0x0003
+	TypeBoolean        = 0x0004
+	TypeCounter        = 0x0005
+	TypeDecimal        = 0x0006
+	TypeDouble         = 0x0007
+	TypeFloat          = 0x0008
+	TypeInt            = 0x0009
+	TypeTimestamp      = 0x000B
+	TypeUUID           = 0x000C
+	TypeVarchar        = 0x000D
+	TypeVarint         = 0x000E
+	TypeTimeUUID       = 0x000F
+	TypeInet           = 0x0010
+	TypeList           = 0x0020
+	TypeMap            = 0x0021
+	TypeSet            = 0x0022
 )
 
 // String returns the name of the identifier.
@@ -1238,7 +1238,7 @@ func (t Type) String() string {
 	case TypeVarint:
 		return "varint"
 	default:
-		return "unknown"
+		return fmt.Sprintf("unkown_type_%d", t)
 	}
 }
 

+ 23 - 44
session.go

@@ -101,6 +101,12 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
 	return qry
 }
 
+type QueryInfo struct {
+	Id   []byte
+	Args []ColumnInfo
+	Rval []ColumnInfo
+}
+
 // Bind generates a new query object based on the query statement passed in.
 // The query is automatically prepared if it has not previously been executed.
 // The binding callback allows the application to define which query argument
@@ -225,8 +231,10 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	s.routingKeyInfoCache.lru.Add(cacheKey, inflight)
 	s.routingKeyInfoCache.mu.Unlock()
 
-	var queryInfo *QueryInfo
-	var partitionKey []*ColumnMetadata
+	var (
+		prepared     *resultPreparedFrame
+		partitionKey []*ColumnMetadata
+	)
 
 	// get the query info for the statement
 	conn := s.Pool.Pick(nil)
@@ -238,19 +246,21 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		return nil, inflight.err
 	}
 
-	queryInfo, inflight.err = conn.prepareStatement(stmt, nil)
+	prepared, inflight.err = conn.prepareStatement(stmt, nil)
 	if inflight.err != nil {
 		// don't cache this error
 		s.routingKeyInfoCache.Remove(cacheKey)
 		return nil, inflight.err
 	}
-	if len(queryInfo.Args) == 0 {
+
+	if len(prepared.reqMeta.columns) == 0 {
 		// no arguments, no routing key, and no error
 		return nil, nil
 	}
 
 	// get the table metadata
-	table := queryInfo.Args[0].Table
+	table := prepared.reqMeta.columns[0].Table
+
 	var keyspaceMetadata *KeyspaceMetadata
 	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
 	if inflight.err != nil {
@@ -282,7 +292,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		routingKeyInfo.indexes[keyIndex] = -1
 
 		// find the column in the query info
-		for argIndex, boundColumn := range queryInfo.Args {
+		for argIndex, boundColumn := range prepared.reqMeta.columns {
 			if keyColumn.Name == boundColumn.Name {
 				// there may be many such bound columns, pick the first
 				routingKeyInfo.indexes[keyIndex] = argIndex
@@ -731,12 +741,12 @@ func (b *Batch) Size() int {
 	return len(b.Entries)
 }
 
-type BatchType int
+type BatchType byte
 
 const (
 	LoggedBatch   BatchType = 0
-	UnloggedBatch BatchType = 1
-	CounterBatch  BatchType = 2
+	UnloggedBatch           = 1
+	CounterBatch            = 2
 )
 
 type BatchEntry struct {
@@ -745,41 +755,6 @@ type BatchEntry struct {
 	binding func(q *QueryInfo) ([]interface{}, error)
 }
 
-type Consistency int
-
-const (
-	Any Consistency = 1 + iota
-	One
-	Two
-	Three
-	Quorum
-	All
-	LocalQuorum
-	EachQuorum
-	Serial
-	LocalSerial
-	LocalOne
-)
-
-var ConsistencyNames = []string{
-	0:           "default",
-	Any:         "any",
-	One:         "one",
-	Two:         "two",
-	Three:       "three",
-	Quorum:      "quorum",
-	All:         "all",
-	LocalQuorum: "localquorum",
-	EachQuorum:  "eachquorum",
-	Serial:      "serial",
-	LocalSerial: "localserial",
-	LocalOne:    "localone",
-}
-
-func (c Consistency) String() string {
-	return ConsistencyNames[c]
-}
-
 type ColumnInfo struct {
 	Keyspace string
 	Table    string
@@ -787,6 +762,10 @@ type ColumnInfo struct {
 	TypeInfo *TypeInfo
 }
 
+func (c ColumnInfo) String() string {
+	return fmt.Sprintf("[column keyspace=%s table=%s name=%s type=%v]", c.Keyspace, c.Table, c.Name, c.TypeInfo)
+}
+
 // routing key indexes LRU cache
 type routingKeyInfoLRU struct {
 	lru *lru.Cache

+ 11 - 12
session_test.go

@@ -222,18 +222,17 @@ func TestBatchBasicAPI(t *testing.T) {
 
 func TestConsistencyNames(t *testing.T) {
 	names := map[Consistency]string{
-		0:           "default",
-		Any:         "any",
-		One:         "one",
-		Two:         "two",
-		Three:       "three",
-		Quorum:      "quorum",
-		All:         "all",
-		LocalQuorum: "localquorum",
-		EachQuorum:  "eachquorum",
-		Serial:      "serial",
-		LocalSerial: "localserial",
-		LocalOne:    "localone",
+		Any:         "ANY",
+		One:         "ONE",
+		Two:         "TWO",
+		Three:       "THREE",
+		Quorum:      "QUORUM",
+		All:         "ALL",
+		LocalQuorum: "LOCAL_QUORUM",
+		EachQuorum:  "EACH_QUORUM",
+		Serial:      "SERIAL",
+		LocalSerial: "LOCAL_SERIAL",
+		LocalOne:    "LOCAL_ONE",
 	}
 
 	for k, v := range names {