Browse Source

Update frame for proto v4 prepared and events

This adds support for parsing v4 protocol changes, this includes
the new prepared statement metadata and changed event format.
Chris Bannister 10 years ago
parent
commit
6b4a5e5a2c
2 changed files with 148 additions and 14 deletions
  1. 2 2
      conn.go
  2. 146 12
      frame.go

+ 2 - 2
conn.go

@@ -144,7 +144,7 @@ func Connect(addr string, cfg ConnConfig, errorHandler ConnErrorHandler) (*Conn,
 	}
 	}
 
 
 	// going to default to proto 2
 	// going to default to proto 2
-	if cfg.ProtoVersion < protoVersion1 || cfg.ProtoVersion > protoVersion3 {
+	if cfg.ProtoVersion < protoVersion1 || cfg.ProtoVersion > protoVersion4 {
 		log.Printf("unsupported protocol version: %d using 2\n", cfg.ProtoVersion)
 		log.Printf("unsupported protocol version: %d using 2\n", cfg.ProtoVersion)
 		cfg.ProtoVersion = 2
 		cfg.ProtoVersion = 2
 	}
 	}
@@ -646,7 +646,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 		}
 
 
 		return iter
 		return iter
-	case *resultKeyspaceFrame, *resultSchemaChangeFrame:
+	case *resultKeyspaceFrame, *resultSchemaChangeFrame, *schemaChangeKeyspace,*schemaChangeTable:
 		return &Iter{}
 		return &Iter{}
 	case *RequestErrUnprepared:
 	case *RequestErrUnprepared:
 		stmtsLRU.Lock()
 		stmtsLRU.Lock()

+ 146 - 12
frame.go

@@ -21,6 +21,7 @@ const (
 	protoVersion1      = 0x01
 	protoVersion1      = 0x01
 	protoVersion2      = 0x02
 	protoVersion2      = 0x02
 	protoVersion3      = 0x03
 	protoVersion3      = 0x03
+	protoVersion4      = 0x04
 
 
 	maxFrameSize = 256 * 1024 * 1024
 	maxFrameSize = 256 * 1024 * 1024
 )
 )
@@ -315,8 +316,8 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {
 
 
 	version := p[0] & protoVersionMask
 	version := p[0] & protoVersionMask
 
 
-	if version < protoVersion1 || version > protoVersion3 {
-		err = fmt.Errorf("invalid version: %x", version)
+	if version < protoVersion1 || version > protoVersion4 {
+		err = fmt.Errorf("gocql: invalid version: %x", version)
 		return
 		return
 	}
 	}
 
 
@@ -600,6 +601,10 @@ type writeStartupFrame struct {
 	opts map[string]string
 	opts map[string]string
 }
 }
 
 
+func (w writeStartupFrame) String() string {
+	return fmt.Sprintf("[startup opts=%+v]", w.opts)
+}
+
 func (w *writeStartupFrame) writeFrame(framer *framer, streamID int) error {
 func (w *writeStartupFrame) writeFrame(framer *framer, streamID int) error {
 	return framer.writeStartupFrame(streamID, w.opts)
 	return framer.writeStartupFrame(streamID, w.opts)
 }
 }
@@ -689,6 +694,74 @@ func (f *framer) readTypeInfo() TypeInfo {
 	return simple
 	return simple
 }
 }
 
 
+type preparedMetadata struct {
+	resultMetadata
+
+	// proto v4+
+	pkeyColumns []int
+}
+
+func (r preparedMetadata) String() string {
+	return fmt.Sprintf("[paging_metadata flags=0x%x pkey=%q paging_state=% X columns=%v]", r.flags, r.pkeyColumns, r.pagingState, r.columns)
+}
+
+func (f *framer) parsePreparedMetadata() preparedMetadata {
+	// TODO: deduplicate this from parseMetadata
+	meta := preparedMetadata{}
+	meta.flags = f.readInt()
+
+	colCount := f.readInt()
+	if colCount < 0 {
+		panic(fmt.Errorf("received negative column count: %d", colCount))
+	}
+	meta.actualColCount = colCount
+
+	if f.proto >= protoVersion4 {
+		pkeyCount := f.readInt()
+		pkeys := make([]int, pkeyCount)
+		for i := 0; i < pkeyCount; i++ {
+			pkeys[i] = int(f.readShort())
+		}
+		meta.pkeyColumns = pkeys
+	}
+
+	if meta.flags&flagHasMorePages == flagHasMorePages {
+		meta.pagingState = f.readBytes()
+	}
+
+	if meta.flags&flagNoMetaData == flagNoMetaData {
+		return meta
+	}
+
+	var keyspace, table string
+	globalSpec := meta.flags&flagGlobalTableSpec == flagGlobalTableSpec
+	if globalSpec {
+		keyspace = f.readString()
+		table = f.readString()
+	}
+
+	var cols []ColumnInfo
+	if colCount < 1000 {
+		// preallocate columninfo to avoid excess copying
+		cols = make([]ColumnInfo, colCount)
+		for i := 0; i < colCount; i++ {
+			f.readCol(&cols[i], &meta.resultMetadata, globalSpec, keyspace, table)
+		}
+	} else {
+		// use append, huge number of columns usually indicates a corrupt frame or
+		// just a huge row.
+		for i := 0; i < colCount; i++ {
+			var col ColumnInfo
+			f.readCol(&col, &meta.resultMetadata, globalSpec, keyspace, table)
+			cols = append(cols, col)
+		}
+	}
+
+	meta.columns = cols
+
+	return meta
+}
+
 type resultMetadata struct {
 type resultMetadata struct {
 	flags int
 	flags int
 
 
@@ -858,7 +931,7 @@ type resultPreparedFrame struct {
 	frameHeader
 	frameHeader
 
 
 	preparedID []byte
 	preparedID []byte
-	reqMeta    resultMetadata
+	reqMeta    preparedMetadata
 	respMeta   resultMetadata
 	respMeta   resultMetadata
 }
 }
 
 
@@ -866,7 +939,7 @@ func (f *framer) parseResultPrepared() frame {
 	frame := &resultPreparedFrame{
 	frame := &resultPreparedFrame{
 		frameHeader: *f.header,
 		frameHeader: *f.header,
 		preparedID:  f.readShortBytes(),
 		preparedID:  f.readShortBytes(),
-		reqMeta:     f.parseResultMetadata(),
+		reqMeta:     f.parsePreparedMetadata(),
 	}
 	}
 
 
 	if f.proto < protoVersion2 {
 	if f.proto < protoVersion2 {
@@ -890,29 +963,90 @@ func (s *resultSchemaChangeFrame) String() string {
 	return fmt.Sprintf("[result_schema_change change=%s keyspace=%s table=%s]", s.change, s.keyspace, s.table)
 	return fmt.Sprintf("[result_schema_change change=%s keyspace=%s table=%s]", s.change, s.keyspace, s.table)
 }
 }
 
 
+type schemaChangeKeyspace struct {
+	frameHeader
+
+	change   string
+	keyspace string
+}
+
+func (f schemaChangeKeyspace) String() string {
+	return fmt.Sprintf("[event schema_change_keyspace change=%q keyspace=%q]", f.change, f.keyspace)
+}
+
+type schemaChangeTable struct {
+	frameHeader
+
+	change   string
+	keyspace string
+	object   string
+}
+
+func (f schemaChangeTable) String() string {
+	return fmt.Sprintf("[event schema_change change=%q keyspace=%q object=%q]", f.change, f.keyspace, f.object)
+}
+
+type schemaChangeFunction struct {
+	frameHeader
+
+	change   string
+	keyspace string
+	name     string
+	args     []string
+}
+
 func (f *framer) parseResultSchemaChange() frame {
 func (f *framer) parseResultSchemaChange() frame {
-	frame := &resultSchemaChangeFrame{
-		frameHeader: *f.header,
-	}
+	if f.proto <= protoVersion2 {
+		frame := &resultSchemaChangeFrame{
+			frameHeader: *f.header,
+		}
 
 
-	if f.proto < protoVersion3 {
 		frame.change = f.readString()
 		frame.change = f.readString()
 		frame.keyspace = f.readString()
 		frame.keyspace = f.readString()
 		frame.table = f.readString()
 		frame.table = f.readString()
+
+		return frame
 	} else {
 	} else {
-		// TODO: improve type representation of this
-		frame.change = f.readString()
+		change := f.readString()
 		target := f.readString()
 		target := f.readString()
+
+		// TODO: could just use a seperate type for each target
 		switch target {
 		switch target {
 		case "KEYSPACE":
 		case "KEYSPACE":
+			frame := &schemaChangeKeyspace{
+				frameHeader: *f.header,
+				change:      change,
+			}
+
 			frame.keyspace = f.readString()
 			frame.keyspace = f.readString()
+
+			return frame
 		case "TABLE", "TYPE":
 		case "TABLE", "TYPE":
+			frame := &schemaChangeTable{
+				frameHeader: *f.header,
+				change:      change,
+			}
+
 			frame.keyspace = f.readString()
 			frame.keyspace = f.readString()
-			frame.table = f.readString()
+			frame.object = f.readString()
+
+			return frame
+		case "FUNCTION", "AGGREGATE":
+			frame := &schemaChangeFunction{
+				frameHeader: *f.header,
+				change:      change,
+			}
+
+			frame.keyspace = f.readString()
+			frame.name = f.readString()
+			frame.args = f.readStringList()
+
+			return frame
+		default:
+			panic(fmt.Errorf("gocql: unknown SCHEMA_CHANGE target: %q change: %q", target, change))
 		}
 		}
 	}
 	}
 
 
-	return frame
 }
 }
 
 
 type authenticateFrame struct {
 type authenticateFrame struct {