Browse Source

Dont store the prepared frame in the LRU

Instead of storing the prepared frame in the LRU store
the values required, defensivly copy []byte values which
we need.
Chris Bannister 10 years ago
parent
commit
23252e61b3
3 changed files with 44 additions and 50 deletions
  1. 15 18
      cassandra_test.go
  2. 23 27
      conn.go
  3. 6 5
      session.go

+ 15 - 18
cassandra_test.go

@@ -1156,21 +1156,18 @@ func injectInvalidPreparedStatement(t *testing.T, session *Session, table string
 	stmtsLRU.Lock()
 	stmtsLRU.Lock()
 	stmtsLRU.lru.Add(conn.addr+stmt, flight)
 	stmtsLRU.lru.Add(conn.addr+stmt, flight)
 	stmtsLRU.Unlock()
 	stmtsLRU.Unlock()
-	flight.info = &resultPreparedFrame{
-		preparedID: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
-		reqMeta: preparedMetadata{
-			resultMetadata: resultMetadata{
-				columns: []ColumnInfo{
-					{
-						Keyspace: "gocql_test",
-						Table:    table,
-						Name:     "foo",
-						TypeInfo: NativeType{
-							typ: TypeVarchar,
-						},
-					},
+	flight.info = QueryInfo{
+		Id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
+		Args: []ColumnInfo{
+			{
+				Keyspace: "gocql_test",
+				Table:    table,
+				Name:     "foo",
+				TypeInfo: NativeType{
+					typ: TypeVarchar,
 				},
 				},
-			}},
+			},
+		},
 	}
 	}
 	return stmt, conn
 	return stmt, conn
 }
 }
@@ -1233,13 +1230,13 @@ func TestQueryInfo(t *testing.T) {
 		t.Fatalf("Failed to execute query for preparing statement: %v", err)
 		t.Fatalf("Failed to execute query for preparing statement: %v", err)
 	}
 	}
 
 
-	if len(info.reqMeta.columns) != 1 {
-		t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, len(info.reqMeta.columns))
+	if x := len(info.Args); x != 1 {
+		t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, x)
 	}
 	}
 
 
 	if *flagProto > 1 {
 	if *flagProto > 1 {
-		if len(info.respMeta.columns) != 2 {
-			t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, len(info.respMeta.columns))
+		if x := len(info.Rval); x != 2 {
+			t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, x)
 		}
 		}
 	}
 	}
 }
 }

+ 23 - 27
conn.go

@@ -524,7 +524,7 @@ func (c *Conn) exec(req frameWriter, tracer Tracer) (frame, error) {
 	return frame, nil
 	return frame, nil
 }
 }
 
 
-func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame, error) {
+func (c *Conn) prepareStatement(stmt string, trace Tracer) (*QueryInfo, error) {
 	stmtsLRU.Lock()
 	stmtsLRU.Lock()
 	if stmtsLRU.lru == nil {
 	if stmtsLRU.lru == nil {
 		initStmtsLRU(defaultMaxPreparedStmts)
 		initStmtsLRU(defaultMaxPreparedStmts)
@@ -536,7 +536,7 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame
 		stmtsLRU.Unlock()
 		stmtsLRU.Unlock()
 		flight := val.(*inflightPrepare)
 		flight := val.(*inflightPrepare)
 		flight.wg.Wait()
 		flight.wg.Wait()
-		return flight.info, flight.err
+		return &flight.info, flight.err
 	}
 	}
 
 
 	flight := new(inflightPrepare)
 	flight := new(inflightPrepare)
@@ -557,7 +557,13 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame
 
 
 	switch x := resp.(type) {
 	switch x := resp.(type) {
 	case *resultPreparedFrame:
 	case *resultPreparedFrame:
-		flight.info = x
+		flight.info.Id = make([]byte, len(x.preparedID))
+		copy(flight.info.Id, x.preparedID)
+		// the type info's should _not_ have a reference to the framers read buffer,
+		// therefore we can just copy them directly.
+		flight.info.Args = x.reqMeta.columns
+		flight.info.PKeyColumns = x.reqMeta.pkeyColumns
+		flight.info.Rval = x.respMeta.columns
 	case error:
 	case error:
 		flight.err = x
 		flight.err = x
 	default:
 	default:
@@ -571,7 +577,7 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*resultPreparedFrame
 		stmtsLRU.Unlock()
 		stmtsLRU.Unlock()
 	}
 	}
 
 
-	return flight.info, flight.err
+	return &flight.info, flight.err
 }
 }
 
 
 func (c *Conn) executeQuery(qry *Query) *Iter {
 func (c *Conn) executeQuery(qry *Query) *Iter {
@@ -603,24 +609,19 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		if qry.binding == nil {
 		if qry.binding == nil {
 			values = qry.values
 			values = qry.values
 		} else {
 		} else {
-			binding := &QueryInfo{
-				Id:   info.preparedID,
-				Args: info.reqMeta.columns,
-				Rval: info.respMeta.columns,
-			}
-
-			values, err = qry.binding(binding)
+			values, err = qry.binding(info)
 			if err != nil {
 			if err != nil {
 				return &Iter{err: err}
 				return &Iter{err: err}
 			}
 			}
 		}
 		}
 
 
-		if len(values) != len(info.reqMeta.columns) {
+		if len(values) != len(info.Args) {
 			return &Iter{err: ErrQueryArgLength}
 			return &Iter{err: ErrQueryArgLength}
 		}
 		}
+
 		params.values = make([]queryValues, len(values))
 		params.values = make([]queryValues, len(values))
 		for i := 0; i < len(values); i++ {
 		for i := 0; i < len(values); i++ {
-			val, err := Marshal(info.reqMeta.columns[i].TypeInfo, values[i])
+			val, err := Marshal(info.Args[i].TypeInfo, values[i])
 			if err != nil {
 			if err != nil {
 				return &Iter{err: err}
 				return &Iter{err: err}
 			}
 			}
@@ -631,7 +632,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		}
 		}
 
 
 		frame = &writeExecuteFrame{
 		frame = &writeExecuteFrame{
-			preparedID: info.preparedID,
+			preparedID: info.Id,
 			params:     params,
 			params:     params,
 		}
 		}
 	} else {
 	} else {
@@ -757,28 +758,23 @@ func (c *Conn) executeBatch(batch *Batch) (*Iter, error) {
 			if entry.binding == nil {
 			if entry.binding == nil {
 				args = entry.Args
 				args = entry.Args
 			} else {
 			} else {
-				binding := &QueryInfo{
-					Id:   info.preparedID,
-					Args: info.reqMeta.columns,
-					Rval: info.respMeta.columns,
-				}
-				args, err = entry.binding(binding)
+				args, err = entry.binding(info)
 				if err != nil {
 				if err != nil {
 					return nil, err
 					return nil, err
 				}
 				}
 			}
 			}
 
 
-			if len(args) != len(info.reqMeta.columns) {
+			if len(args) != len(info.Args) {
 				return nil, ErrQueryArgLength
 				return nil, ErrQueryArgLength
 			}
 			}
 
 
-			b.preparedID = info.preparedID
-			stmts[string(info.preparedID)] = entry.Stmt
+			b.preparedID = info.Id
+			stmts[string(info.Id)] = entry.Stmt
 
 
-			b.values = make([]queryValues, len(info.reqMeta.columns))
+			b.values = make([]queryValues, len(info.Args))
 
 
-			for j := 0; j < len(info.reqMeta.columns); j++ {
-				val, err := Marshal(info.reqMeta.columns[j].TypeInfo, args[j])
+			for j := 0; j < len(info.Args); j++ {
+				val, err := Marshal(info.Args[j].TypeInfo, args[j])
 				if err != nil {
 				if err != nil {
 					return nil, err
 					return nil, err
 				}
 				}
@@ -900,7 +896,7 @@ func (c *Conn) awaitSchemaAgreement() (err error) {
 }
 }
 
 
 type inflightPrepare struct {
 type inflightPrepare struct {
-	info *resultPreparedFrame
+	info QueryInfo
 	err  error
 	err  error
 	wg   sync.WaitGroup
 	wg   sync.WaitGroup
 }
 }

+ 6 - 5
session.go

@@ -154,6 +154,7 @@ type QueryInfo struct {
 	Id   []byte
 	Id   []byte
 	Args []ColumnInfo
 	Args []ColumnInfo
 	Rval []ColumnInfo
 	Rval []ColumnInfo
+	PKeyColumns []int
 }
 }
 
 
 // Bind generates a new query object based on the query statement passed in.
 // Bind generates a new query object based on the query statement passed in.
@@ -286,7 +287,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 	s.routingKeyInfoCache.mu.Unlock()
 	s.routingKeyInfoCache.mu.Unlock()
 
 
 	var (
 	var (
-		prepared     *resultPreparedFrame
+		info     *QueryInfo
 		partitionKey []*ColumnMetadata
 		partitionKey []*ColumnMetadata
 	)
 	)
 
 
@@ -300,20 +301,20 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		return nil, inflight.err
 		return nil, inflight.err
 	}
 	}
 
 
-	prepared, inflight.err = conn.prepareStatement(stmt, nil)
+	info, inflight.err = conn.prepareStatement(stmt, nil)
 	if inflight.err != nil {
 	if inflight.err != nil {
 		// don't cache this error
 		// don't cache this error
 		s.routingKeyInfoCache.Remove(stmt)
 		s.routingKeyInfoCache.Remove(stmt)
 		return nil, inflight.err
 		return nil, inflight.err
 	}
 	}
 
 
-	if len(prepared.reqMeta.columns) == 0 {
+	if len(info.Args) == 0 {
 		// no arguments, no routing key, and no error
 		// no arguments, no routing key, and no error
 		return nil, nil
 		return nil, nil
 	}
 	}
 
 
 	// get the table metadata
 	// get the table metadata
-	table := prepared.reqMeta.columns[0].Table
+	table := info.Args[0].Table
 
 
 	var keyspaceMetadata *KeyspaceMetadata
 	var keyspaceMetadata *KeyspaceMetadata
 	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
 	keyspaceMetadata, inflight.err = s.KeyspaceMetadata(s.cfg.Keyspace)
@@ -346,7 +347,7 @@ func (s *Session) routingKeyInfo(stmt string) (*routingKeyInfo, error) {
 		routingKeyInfo.indexes[keyIndex] = -1
 		routingKeyInfo.indexes[keyIndex] = -1
 
 
 		// find the column in the query info
 		// find the column in the query info
-		for argIndex, boundColumn := range prepared.reqMeta.columns {
+		for argIndex, boundColumn := range info.Args {
 			if keyColumn.Name == boundColumn.Name {
 			if keyColumn.Name == boundColumn.Name {
 				// there may be many such bound columns, pick the first
 				// there may be many such bound columns, pick the first
 				routingKeyInfo.indexes[keyIndex] = argIndex
 				routingKeyInfo.indexes[keyIndex] = argIndex