瀏覽代碼

Merge branch 'master' into more-hostinfo

Chris Bannister 8 年之前
父節點
當前提交
b38f2a2fa9
共有 12 個文件被更改,包括 100 次插入13 次删除
  1. 49 2
      cassandra_test.go
  2. 2 2
      cluster.go
  3. 1 1
      conn.go
  4. 1 1
      conn_test.go
  5. 1 1
      connectionpool.go
  6. 3 0
      events.go
  7. 1 1
      frame.go
  8. 36 0
      helpers.go
  9. 1 1
      internal/ccm/ccm.go
  10. 1 1
      internal/lru/lru.go
  11. 1 1
      metadata.go
  12. 3 2
      session.go

+ 49 - 2
cassandra_test.go

@@ -620,7 +620,8 @@ func TestMapScanWithRefMap(t *testing.T) {
 	m["testfullname"] = FullName{"John", "Doe"}
 	m["testint"] = 100
 
-	if err := session.Query(`INSERT INTO scan_map_ref_table (testtext, testfullname, testint) values (?,?,?)`, m["testtext"], m["testfullname"], m["testint"]).Exec(); err != nil {
+	if err := session.Query(`INSERT INTO scan_map_ref_table (testtext, testfullname, testint) values (?,?,?)`,
+		m["testtext"], m["testfullname"], m["testint"]).Exec(); err != nil {
 		t.Fatal("insert:", err)
 	}
 
@@ -646,7 +647,53 @@ func TestMapScanWithRefMap(t *testing.T) {
 			t.Fatal("returned testinit did not match")
 		}
 	}
+	if testText != "testtext" {
+		t.Fatal("returned testtext did not match")
+	}
+	if testFullName.FirstName != "John" || testFullName.LastName != "Doe" {
+		t.Fatal("returned testfullname did not match")
+	}
+}
+
+func TestMapScan(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+	if err := createTable(session, `CREATE TABLE gocql_test.scan_map_table (
+			fullname       text PRIMARY KEY,
+			age            int,
+			address        inet,
+		)`); err != nil {
+		t.Fatal("create table:", err)
+	}
+
+	if err := session.Query(`INSERT INTO scan_map_table (fullname, age, address) values (?,?,?)`,
+		"Grace Hopper", 31, net.ParseIP("10.0.0.1")).Exec(); err != nil {
+		t.Fatal("insert:", err)
+	}
+	if err := session.Query(`INSERT INTO scan_map_table (fullname, age, address) values (?,?,?)`,
+		"Ada Lovelace", 30, net.ParseIP("10.0.0.2")).Exec(); err != nil {
+		t.Fatal("insert:", err)
+	}
+
+	iter := session.Query(`SELECT * FROM scan_map_table`).Iter()
 
+	// First iteration
+	row := make(map[string]interface{})
+	if !iter.MapScan(row) {
+		t.Fatal("select:", iter.Close())
+	}
+	assertEqual(t, "fullname", "Ada Lovelace", row["fullname"])
+	assertEqual(t, "age", 30, row["age"])
+	assertEqual(t, "address", "10.0.0.2", row["address"])
+
+	// Second iteration using a new map
+	row = make(map[string]interface{})
+	if !iter.MapScan(row) {
+		t.Fatal("select:", iter.Close())
+	}
+	assertEqual(t, "fullname", "Grace Hopper", row["fullname"])
+	assertEqual(t, "age", 31, row["age"])
+	assertEqual(t, "address", "10.0.0.1", row["address"])
 }
 
 func TestSliceMap(t *testing.T) {
@@ -2108,7 +2155,7 @@ func TestManualQueryPaging(t *testing.T) {
 	var id, count, fetched int
 
 	iter := query.Iter()
-	// NOTE: this isnt very indicitive of how it should be used, the idea is that
+	// NOTE: this isnt very indicative of how it should be used, the idea is that
 	// the page state is returned to some client who will send it back to manually
 	// page through the results.
 	for {

+ 2 - 2
cluster.go

@@ -28,7 +28,7 @@ func (p PoolConfig) buildPool(session *Session) *policyConnPool {
 // behavior to fit the most common use cases. Applications that require a
 // different setup must implement their own cluster.
 type ClusterConfig struct {
-	// addresses for the initial connections. It is recomended to use the value set in
+	// addresses for the initial connections. It is recommended to use the value set in
 	// the Cassandra config for broadcast_address or listen_address, an IP address not
 	// a domain name. This is because events from Cassandra will use the configured IP
 	// address, which is used to index connected hosts. If the domain name specified
@@ -122,7 +122,7 @@ type ClusterConfig struct {
 // NewCluster generates a new config for the default cluster implementation.
 //
 // The supplied hosts are used to initially connect to the cluster then the rest of
-// the ring will be automatically discovered. It is recomended to use the value set in
+// the ring will be automatically discovered. It is recommended to use the value set in
 // the Cassandra config for broadcast_address or listen_address, an IP address not
 // a domain name. This is because events from Cassandra will use the configured IP
 // address, which is used to index connected hosts. If the domain name specified

+ 1 - 1
conn.go

@@ -731,7 +731,7 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer)
 	switch x := frame.(type) {
 	case *resultPreparedFrame:
 		flight.preparedStatment = &preparedStatment{
-			// defensivly copy as we will recycle the underlying buffer after we
+			// defensively copy as we will recycle the underlying buffer after we
 			// return.
 			id: copyBytes(x.preparedID),
 			// the type info's should _not_ have a reference to the framers read buffer,

+ 1 - 1
conn_test.go

@@ -251,7 +251,7 @@ func TestQueryRetry(t *testing.T) {
 	requests := atomic.LoadInt64(&srv.nKillReq)
 	attempts := qry.Attempts()
 	if requests != int64(attempts) {
-		t.Fatalf("expected requests %v to match query attemps %v", requests, attempts)
+		t.Fatalf("expected requests %v to match query attempts %v", requests, attempts)
 	}
 
 	// the query will only be attempted once, but is being retried

+ 1 - 1
connectionpool.go

@@ -153,7 +153,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
 		pool := <-pools
 		createCount--
 		if pool.Size() > 0 {
-			// add pool onyl if there a connections available
+			// add pool only if there a connections available
 			p.hostConnPools[string(pool.host.ConnectAddress())] = pool
 		}
 	}

+ 3 - 0
events.go

@@ -106,6 +106,9 @@ func (s *Session) handleEvent(framer *framer) {
 }
 
 func (s *Session) handleSchemaEvent(frames []frame) {
+	s.mu.RLock()
+	defer s.mu.RUnlock()
+
 	if s.schemaDescriber == nil {
 		return
 	}

+ 1 - 1
frame.go

@@ -558,7 +558,7 @@ func (f *framer) parseErrorFrame() frame {
 		stmtId := f.readShortBytes()
 		return &RequestErrUnprepared{
 			errorFrame:  errD,
-			StatementId: copyBytes(stmtId), // defensivly copy
+			StatementId: copyBytes(stmtId), // defensively copy
 		}
 	case errReadFailure:
 		res := &RequestErrReadFailure{

+ 36 - 0
helpers.go

@@ -250,6 +250,42 @@ func (iter *Iter) SliceMap() ([]map[string]interface{}, error) {
 
 // MapScan takes a map[string]interface{} and populates it with a row
 // that is returned from cassandra.
+//
+// Each call to MapScan() must be called with a new map object.
+// During the call to MapScan() any pointers in the existing map
+// are replaced with non pointer types before the call returns
+//
+//	iter := session.Query(`SELECT * FROM mytable`).Iter()
+//	for {
+//		// New map each iteration
+//		row = make(map[string]interface{})
+//		if !iter.MapScan(row) {
+//			break
+//		}
+//		// Do things with row
+//		if fullname, ok := row["fullname"]; ok {
+//			fmt.Printf("Full Name: %s\n", fullname)
+//		}
+//	}
+//
+// You can also pass pointers in the map before each call
+//
+//	var fullName FullName // Implements gocql.Unmarshaler and gocql.Marshaler interfaces
+//	var address net.IP
+//	var age int
+//	iter := session.Query(`SELECT * FROM scan_map_table`).Iter()
+//	for {
+//		// New map each iteration
+//		row := map[string]interface{}{
+//			"fullname": &fullName,
+//			"age":      &age,
+//			"address":  &address,
+//		}
+//		if !iter.MapScan(row) {
+//			break
+//		}
+//		fmt.Printf("First: %s Age: %d Address: %q\n", fullName.FirstName, age, address)
+//	}
 func (iter *Iter) MapScan(m map[string]interface{}) bool {
 	if iter.err != nil {
 		return false

+ 1 - 1
internal/ccm/ccm.go

@@ -78,7 +78,7 @@ const (
 )
 
 func Status() (map[string]Host, error) {
-	// TODO: parse into struct o maniuplate
+	// TODO: parse into struct to manipulate
 	out, err := execCmd("status", "-v")
 	if err != nil {
 		return nil, err

+ 1 - 1
internal/lru/lru.go

@@ -30,7 +30,7 @@ type Cache struct {
 	// an item is evicted. Zero means no limit.
 	MaxEntries int
 
-	// OnEvicted optionally specificies a callback function to be
+	// OnEvicted optionally specifies a callback function to be
 	// executed when an entry is purged from the cache.
 	OnEvicted func(key string, value interface{})
 

+ 1 - 1
metadata.go

@@ -90,7 +90,7 @@ func (c ColumnKind) String() string {
 	case ColumnStatic:
 		return "static"
 	default:
-		return fmt.Sprintf("unkown_column_%d", c)
+		return fmt.Sprintf("unknown_column_%d", c)
 	}
 }
 

+ 3 - 2
session.go

@@ -428,7 +428,7 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI
 	if cached {
 		// done accessing the cache
 		s.routingKeyInfoCache.mu.Unlock()
-		// the entry is an inflight struct similiar to that used by
+		// the entry is an inflight struct similar to that used by
 		// Conn to prepare statements
 		inflight := entry.(*inflightCachedEntry)
 
@@ -459,7 +459,7 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string) (*routingKeyI
 	conn := s.getConn()
 	if conn == nil {
 		// TODO: better error?
-		inflight.err = errors.New("gocql: unable to fetch preapred info: no connection avilable")
+		inflight.err = errors.New("gocql: unable to fetch prepared info: no connection available")
 		return nil, inflight.err
 	}
 
@@ -1010,6 +1010,7 @@ func (q *Query) reset() {
 	q.defaultTimestamp = false
 	q.disableSkipMetadata = false
 	q.disableAutoPage = false
+	q.context = nil
 }
 
 // Iter represents an iterator that can be used to iterate over all rows that