瀏覽代碼

Merge remote-tracking branch 'upstream/master'

Staņislavs Koikovs 11 年之前
父節點
當前提交
e4b8bf756e
共有 11 個文件被更改,包括 522 次插入79 次删除
  1. 2 0
      AUTHORS
  2. 13 13
      README.md
  3. 136 0
      cassandra_test.go
  4. 22 19
      cluster.go
  5. 95 26
      conn.go
  6. 33 5
      conn_test.go
  7. 25 0
      frame.go
  8. 111 0
      helpers.go
  9. 5 1
      marshal.go
  10. 10 0
      policies.go
  11. 70 15
      session.go

+ 2 - 0
AUTHORS

@@ -16,3 +16,5 @@ Chris Bannister <c.bannister@gmail.com>
 Maxim Bublis <b@codemonkey.ru>
 Alex Zorin <git@zor.io>
 Kasper Middelboe Petersen <me@phant.dk>
+Harpreet Sawhney <harpreet.sawhney@gmail.com>
+Charlie Andrews <charlieandrews.cwa@gmail.com>

+ 13 - 13
README.md

@@ -1,28 +1,22 @@
 gocql
 =====
 
-[![Build Status](https://travis-ci.org/tux21b/gocql.png?branch=master)](https://travis-ci.org/tux21b/gocql)
-[![GoDoc](http://godoc.org/tux21b.org/v1/gocql?status.png)](http://godoc.org/tux21b.org/v1/gocql)
+[![Build Status](https://travis-ci.org/gocql/gocql.png?branch=master)](https://travis-ci.org/gocql/gocql)
+[![GoDoc](http://godoc.org/github.com/gocql/gocql?status.png)](http://godoc.org/github.com/gocql/gocql)
 
 **Package Status:** Alpha 
 
 Package gocql implements a fast and robust Cassandra client for the
 Go programming language.
 
-
-**Attention:** This package is currently actively developed and the API may
-change in the future. The old "datbase/sql" based package is now called
-[gocqldriver](https://github.com/tux21b/gocqldriver) and is no longer
-maintained.
-
-Project Website: http://tux21b.org/gocql/<br>
-API documentation: http://godoc.org/tux21b.org/v1/gocql<br>
+Project Website: http://gocql.github.io/<br>
+API documentation: http://godoc.org/github.com/gocql/gocql<br>
 Discussions: https://groups.google.com/forum/#!forum/gocql
 
 Installation
 ------------
 
-    go get tux21b.org/v1/gocql
+    go get github.com/gocql/gocql
 
 
 Features
@@ -45,7 +39,7 @@ Features
 * Automatic query preparation
 * Support for query tracing
 
-Please visit the [Roadmap](https://github.com/tux21b/gocql/wiki/Roadmap) page to see what is on the horizion.
+Please visit the [Roadmap](https://github.com/gocql/gocql/wiki/Roadmap) page to see what is on the horizion.
 
 Example
 -------
@@ -57,7 +51,7 @@ import (
 	"fmt"
 	"log"
 
-	"tux21b.org/v1/gocql"
+	"github.com/gocql/gocql"
 )
 
 func main() {
@@ -95,6 +89,12 @@ func main() {
 }
 ```
 
+Other Projects
+--------------
+
+* [cqlc](http://relops.com/cqlc) generates gocql compliant code from your Cassandra schema so that you can write type safe CQL statements in Go with a natural query syntax.
+* [gocqldriver](https://github.com/tux21b/gocqldriver) is the predecessor of gocql based on Go's "database/sql" package. This project isn't maintained anymore, because Cassandra wasn't a good fit for the traditional "database/sql" API. Use this package instead.
+
 License
 -------
 

+ 136 - 0
cassandra_test.go

@@ -27,6 +27,10 @@ func createSession(t *testing.T) *Session {
 	cluster := NewCluster(strings.Split(*flagCluster, ",")...)
 	cluster.ProtoVersion = *flagProto
 	cluster.CQLVersion = *flagCQL
+	cluster.Authenticator = PasswordAuthenticator{
+		Username: "cassandra",
+		Password: "cassandra",
+	}
 
 	session, err := cluster.CreateSession()
 	if err != nil {
@@ -148,6 +152,9 @@ func TestTracing(t *testing.T) {
 }
 
 func TestPaging(t *testing.T) {
+
+	t.Skip("Skip until https://github.com/gocql/gocql/issues/110 is resolved")
+
 	if *flagProto == 1 {
 		t.Skip("Paging not supported. Please use Cassandra >= 2.0")
 	}
@@ -245,6 +252,29 @@ func TestBatch(t *testing.T) {
 	}
 }
 
+// TestBatchLimit tests gocql to make sure batch operations larger than the maximum
+// statement limit are not submitted to a cassandra node.
+func TestBatchLimit(t *testing.T) {
+	if *flagProto == 1 {
+		t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
+	}
+	session := createSession(t)
+	defer session.Close()
+
+	if err := session.Query(`CREATE TABLE batch_table2 (id int primary key)`).Exec(); err != nil {
+		t.Fatal("create table:", err)
+	}
+
+	batch := NewBatch(LoggedBatch)
+	for i := 0; i < 65537; i++ {
+		batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
+	}
+	if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
+		t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
+	}
+
+}
+
 type Page struct {
 	Title       string
 	RevId       UUID
@@ -277,3 +307,109 @@ var pageTestData = []*Page{
 		Modified: time.Date(2013, time.August, 13, 9, 52, 3, 0, time.UTC),
 	},
 }
+
+func TestSliceMap(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+	if err := session.Query(`CREATE TABLE slice_map_table (
+			testuuid       timeuuid PRIMARY KEY,
+			testvarchar    varchar,
+			testbigint     bigint,
+			testblob       blob,
+			testbool       boolean,
+			testfloat	   float,
+			testdouble	   double,
+			testint        int,
+			testset        set<int>,
+			testmap        map<varchar, varchar>
+		)`).Exec(); err != nil {
+		t.Fatal("create table:", err)
+	}
+	m := make(map[string]interface{})
+	m["testuuid"] = TimeUUID()
+	m["testvarchar"] = "Test VarChar"
+	m["testbigint"] = time.Now().Unix()
+	m["testblob"] = []byte("test blob")
+	m["testbool"] = true
+	m["testfloat"] = float32(4.564)
+	m["testdouble"] = float64(4.815162342)
+	m["testint"] = 2343
+	m["testset"] = []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
+	m["testmap"] = map[string]string{"field1": "val1", "field2": "val2", "field3": "val3"}
+	sliceMap := []map[string]interface{}{m}
+	if err := session.Query(`INSERT INTO slice_map_table (testuuid, testvarchar, testbigint, testblob, testbool, testfloat, testdouble, testint, testset, testmap) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+		m["testuuid"], m["testvarchar"], m["testbigint"], m["testblob"], m["testbool"], m["testfloat"], m["testdouble"], m["testint"], m["testset"], m["testmap"]).Exec(); err != nil {
+		t.Fatal("insert:", err)
+	}
+	if returned, retErr := session.Query(`SELECT * FROM slice_map_table`).Iter().SliceMap(); retErr != nil {
+		t.Fatal("select:", retErr)
+	} else {
+		if sliceMap[0]["testuuid"] != returned[0]["testuuid"] {
+			t.Fatal("returned testuuid did not match")
+		}
+		if sliceMap[0]["testvarchar"] != returned[0]["testvarchar"] {
+			t.Fatal("returned testvarchar did not match")
+		}
+		if sliceMap[0]["testbigint"] != returned[0]["testbigint"] {
+			t.Fatal("returned testbigint did not match")
+		}
+		if !reflect.DeepEqual(sliceMap[0]["testblob"], returned[0]["testblob"]) {
+			t.Fatal("returned testblob did not match")
+		}
+		if sliceMap[0]["testbool"] != returned[0]["testbool"] {
+			t.Fatal("returned testbool did not match")
+		}
+		if sliceMap[0]["testfloat"] != returned[0]["testfloat"] {
+			t.Fatal("returned testfloat did not match")
+		}
+		if sliceMap[0]["testdouble"] != returned[0]["testdouble"] {
+			t.Fatal("returned testdouble did not match")
+		}
+		if sliceMap[0]["testint"] != returned[0]["testint"] {
+			t.Fatal("returned testint did not match")
+		}
+		if !reflect.DeepEqual(sliceMap[0]["testset"], returned[0]["testset"]) {
+			t.Fatal("returned testset did not match")
+		}
+		if !reflect.DeepEqual(sliceMap[0]["testmap"], returned[0]["testmap"]) {
+			t.Fatal("returned testmap did not match")
+		}
+	}
+
+	// Test for MapScan()
+	testMap := make(map[string]interface{})
+	if !session.Query(`SELECT * FROM slice_map_table`).Iter().MapScan(testMap) {
+		t.Fatal("MapScan failed to work with one row")
+	}
+	if sliceMap[0]["testuuid"] != testMap["testuuid"] {
+		t.Fatal("returned testuuid did not match")
+	}
+	if sliceMap[0]["testvarchar"] != testMap["testvarchar"] {
+		t.Fatal("returned testvarchar did not match")
+	}
+	if sliceMap[0]["testbigint"] != testMap["testbigint"] {
+		t.Fatal("returned testbigint did not match")
+	}
+	if !reflect.DeepEqual(sliceMap[0]["testblob"], testMap["testblob"]) {
+		t.Fatal("returned testblob did not match")
+	}
+	if sliceMap[0]["testbool"] != testMap["testbool"] {
+		t.Fatal("returned testbool did not match")
+	}
+	if sliceMap[0]["testfloat"] != testMap["testfloat"] {
+		t.Fatal("returned testfloat did not match")
+	}
+	if sliceMap[0]["testdouble"] != testMap["testdouble"] {
+		t.Fatal("returned testdouble did not match")
+	}
+	if sliceMap[0]["testint"] != testMap["testint"] {
+		t.Fatal("returned testint did not match")
+	}
+	if !reflect.DeepEqual(sliceMap[0]["testset"], testMap["testset"]) {
+		t.Fatal("returned testset did not match")
+	}
+	if !reflect.DeepEqual(sliceMap[0]["testmap"], testMap["testmap"]) {
+		t.Fatal("returned testmap did not match")
+	}
+
+}

+ 22 - 19
cluster.go

@@ -18,19 +18,21 @@ import (
 // behavior to fit the most common use cases. Applications that requre a
 // different setup must implement their own cluster.
 type ClusterConfig struct {
-	Hosts        []string      // addresses for the initial connections
-	CQLVersion   string        // CQL version (default: 3.0.0)
-	ProtoVersion int           // version of the native protocol (default: 2)
-	Timeout      time.Duration // connection timeout (default: 200ms)
-	DefaultPort  int           // default port (default: 9042)
-	Keyspace     string        // initial keyspace (optional)
-	NumConns     int           // number of connections per host (default: 2)
-	NumStreams   int           // number of streams per connection (default: 128)
-	DelayMin     time.Duration // minimum reconnection delay (default: 1s)
-	DelayMax     time.Duration // maximum reconnection delay (default: 10min)
-	StartupMin   int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
-	Consistency  Consistency   // default consistency level (default: Quorum)
-	Compressor   Compressor    // compression algorithm (default: nil)
+	Hosts         []string      // addresses for the initial connections
+	CQLVersion    string        // CQL version (default: 3.0.0)
+	ProtoVersion  int           // version of the native protocol (default: 2)
+	Timeout       time.Duration // connection timeout (default: 600ms)
+	DefaultPort   int           // default port (default: 9042)
+	Keyspace      string        // initial keyspace (optional)
+	NumConns      int           // number of connections per host (default: 2)
+	NumStreams    int           // number of streams per connection (default: 128)
+	DelayMin      time.Duration // minimum reconnection delay (default: 1s)
+	DelayMax      time.Duration // maximum reconnection delay (default: 10min)
+	StartupMin    int           // wait for StartupMin hosts (default: len(Hosts)/2+1)
+	Consistency   Consistency   // default consistency level (default: Quorum)
+	Compressor    Compressor    // compression algorithm (default: nil)
+	Authenticator Authenticator // authenticator (default: nil)
+	RetryPolicy   RetryPolicy   // Default retry policy to use for queries(default:0)
 }
 
 // NewCluster generates a new config for the default cluster implementation.
@@ -39,7 +41,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
 		Hosts:        hosts,
 		CQLVersion:   "3.0.0",
 		ProtoVersion: 2,
-		Timeout:      200 * time.Millisecond,
+		Timeout:      600 * time.Millisecond,
 		DefaultPort:  9042,
 		NumConns:     2,
 		NumStreams:   128,
@@ -102,11 +104,12 @@ type clusterImpl struct {
 
 func (c *clusterImpl) connect(addr string) {
 	cfg := ConnConfig{
-		ProtoVersion: c.cfg.ProtoVersion,
-		CQLVersion:   c.cfg.CQLVersion,
-		Timeout:      c.cfg.Timeout,
-		NumStreams:   c.cfg.NumStreams,
-		Compressor:   c.cfg.Compressor,
+		ProtoVersion:  c.cfg.ProtoVersion,
+		CQLVersion:    c.cfg.CQLVersion,
+		Timeout:       c.cfg.Timeout,
+		NumStreams:    c.cfg.NumStreams,
+		Compressor:    c.cfg.Compressor,
+		Authenticator: c.cfg.Authenticator,
 	}
 	delay := c.cfg.DelayMin
 	for {

+ 95 - 26
conn.go

@@ -6,10 +6,13 @@ package gocql
 
 import (
 	"bufio"
+	"fmt"
 	"net"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
+	"unicode"
 
 	"code.google.com/p/snappy-go/snappy"
 )
@@ -19,22 +22,43 @@ const flagResponse = 0x80
 const maskVersion = 0x7F
 
 type Cluster interface {
-	//HandleAuth(addr, method string) ([]byte, Challenger, error)
 	HandleError(conn *Conn, err error, closed bool)
 	HandleKeyspace(conn *Conn, keyspace string)
-	// Authenticate(addr string)
 }
 
-/* type Challenger interface {
-	Challenge(data []byte) ([]byte, error)
-} */
+type Authenticator interface {
+	Challenge(req []byte) (resp []byte, auth Authenticator, err error)
+	Success(data []byte) error
+}
+
+type PasswordAuthenticator struct {
+	Username string
+	Password string
+}
+
+func (p PasswordAuthenticator) Challenge(req []byte) ([]byte, Authenticator, error) {
+	if string(req) != "org.apache.cassandra.auth.PasswordAuthenticator" {
+		return nil, nil, fmt.Errorf("unexpected authenticator %q", req)
+	}
+	resp := make([]byte, 2+len(p.Username)+len(p.Password))
+	resp[0] = 0
+	copy(resp[1:], p.Username)
+	resp[len(p.Username)+1] = 0
+	copy(resp[2+len(p.Username):], p.Password)
+	return resp, nil, nil
+}
+
+func (p PasswordAuthenticator) Success(data []byte) error {
+	return nil
+}
 
 type ConnConfig struct {
-	ProtoVersion int
-	CQLVersion   string
-	Timeout      time.Duration
-	NumStreams   int
-	Compressor   Compressor
+	ProtoVersion  int
+	CQLVersion    string
+	Timeout       time.Duration
+	NumStreams    int
+	Compressor    Compressor
+	Authenticator Authenticator
 }
 
 // Conn is a single connection to a Cassandra node. It can be used to execute
@@ -54,6 +78,7 @@ type Conn struct {
 
 	cluster    Cluster
 	compressor Compressor
+	auth       Authenticator
 	addr       string
 	version    uint8
 }
@@ -82,6 +107,7 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 		addr:       conn.RemoteAddr().String(),
 		cluster:    cluster,
 		compressor: cfg.Compressor,
+		auth:       cfg.Authenticator,
 	}
 	for i := 0; i < cap(c.uniq); i++ {
 		c.uniq <- uint8(i)
@@ -97,24 +123,54 @@ func Connect(addr string, cfg ConnConfig, cluster Cluster) (*Conn, error) {
 }
 
 func (c *Conn) startup(cfg *ConnConfig) error {
-	req := &startupFrame{
-		CQLVersion: cfg.CQLVersion,
-	}
+	compression := ""
 	if c.compressor != nil {
-		req.Compression = c.compressor.Name()
+		compression = c.compressor.Name()
 	}
-	resp, err := c.execSimple(req)
-	if err != nil {
-		return err
+	var req operation = &startupFrame{
+		CQLVersion:  cfg.CQLVersion,
+		Compression: compression,
 	}
-	switch x := resp.(type) {
-	case readyFrame:
-	case error:
-		return x
-	default:
-		return ErrProtocol
+	var challenger Authenticator
+	for {
+		resp, err := c.execSimple(req)
+		if err != nil {
+			return err
+		}
+		switch x := resp.(type) {
+		case readyFrame:
+			return nil
+		case error:
+			return x
+		case authenticateFrame:
+			if c.auth == nil {
+				return fmt.Errorf("authentication required (using %q)", x.Authenticator)
+			}
+			var resp []byte
+			resp, challenger, err = c.auth.Challenge([]byte(x.Authenticator))
+			if err != nil {
+				return err
+			}
+			req = &authResponseFrame{resp}
+		case authChallengeFrame:
+			if challenger == nil {
+				return fmt.Errorf("authentication error (invalid challenge)")
+			}
+			var resp []byte
+			resp, challenger, err = challenger.Challenge(x.Data)
+			if err != nil {
+				return err
+			}
+			req = &authResponseFrame{resp}
+		case authSuccessFrame:
+			if challenger != nil {
+				return challenger.Success(x.Data)
+			}
+			return nil
+		default:
+			return ErrProtocol
+		}
 	}
-	return nil
 }
 
 // Serve starts the stream multiplexer for this connection, which is required
@@ -216,6 +272,7 @@ func (c *Conn) exec(op operation, trace Tracer) (interface{}, error) {
 
 	if n, err := c.conn.Write(req); err != nil {
 		c.conn.Close()
+		c.uniq <- id
 		if n > 0 {
 			return nil, ErrProtocol
 		}
@@ -294,12 +351,18 @@ func (c *Conn) prepareStatement(stmt string, trace Tracer) (*queryInfo, error) {
 
 func (c *Conn) executeQuery(qry *Query) *Iter {
 	op := &queryFrame{
-		Stmt:      qry.stmt,
+		Stmt:      strings.TrimSpace(qry.stmt),
 		Cons:      qry.cons,
 		PageSize:  qry.pageSize,
 		PageState: qry.pageState,
 	}
-	if len(qry.values) > 0 {
+	stmtType := op.Stmt
+	if n := strings.IndexFunc(stmtType, unicode.IsSpace); n >= 0 {
+		stmtType = strings.ToLower(stmtType[:n])
+	}
+	switch stmtType {
+	case "select", "insert", "update", "delete":
+		// Prepare all DML queries. Other queries can not be prepared.
 		info, err := c.prepareStatement(qry.stmt, qry.trace)
 		if err != nil {
 			return &Iter{err: err}
@@ -493,6 +556,12 @@ func (c *Conn) decodeFrame(f frame, trace Tracer) (rval interface{}, err error)
 		default:
 			return nil, ErrProtocol
 		}
+	case opAuthenticate:
+		return authenticateFrame{f.readString()}, nil
+	case opAuthChallenge:
+		return authChallengeFrame{f.readBytes()}, nil
+	case opAuthSuccess:
+		return authSuccessFrame{f.readBytes()}, nil
 	case opSupported:
 		return supportedFrame{}, nil
 	case opError:

+ 33 - 5
conn_test.go

@@ -15,10 +15,11 @@ import (
 )
 
 type TestServer struct {
-	Address string
-	t       *testing.T
-	nreq    uint64
-	listen  net.Listener
+	Address  string
+	t        *testing.T
+	nreq     uint64
+	listen   net.Listener
+	nKillReq uint64
 }
 
 func TestSimple(t *testing.T) {
@@ -60,7 +61,7 @@ func TestTimeout(t *testing.T) {
 	}
 
 	go func() {
-		<-time.After(1 * time.Second)
+		<-time.After(2 * time.Second)
 		t.Fatal("no timeout")
 	}()
 
@@ -69,6 +70,32 @@ func TestTimeout(t *testing.T) {
 	}
 }
 
+// TestQueryRetry will test to make sure that gocql will execute
+// the exact amount of retry queries designated by the user.
+func TestQueryRetry(t *testing.T) {
+	srv := NewTestServer(t)
+	defer srv.Stop()
+
+	db, err := NewCluster(srv.Address).CreateSession()
+	if err != nil {
+		t.Errorf("NewCluster: %v", err)
+	}
+
+	go func() {
+		<-time.After(5 * time.Second)
+		t.Fatal("no timeout")
+	}()
+	rt := RetryPolicy{NumRetries: 1}
+
+	if err := db.Query("kill").RetryPolicy(rt).Exec(); err == nil {
+		t.Fatal("expected error")
+	}
+	//Minus 1 from the nKillReq variable since there is the initial query attempt
+	if srv.nKillReq-1 != uint64(rt.NumRetries) {
+		t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, srv.nKillReq-1)
+	}
+}
+
 func TestSlowQuery(t *testing.T) {
 	srv := NewTestServer(t)
 	defer srv.Stop()
@@ -183,6 +210,7 @@ func (srv *TestServer) process(frame frame, conn net.Conn) {
 		}
 		switch strings.ToLower(first) {
 		case "kill":
+			atomic.AddUint64(&srv.nKillReq, 1)
 			select {}
 		case "slow":
 			go func() {

+ 25 - 0
frame.go

@@ -432,3 +432,28 @@ func (op *optionsFrame) encodeFrame(version uint8, f frame) (frame, error) {
 	f.setHeader(version, 0, 0, opOptions)
 	return f, nil
 }
+
+type authenticateFrame struct {
+	Authenticator string
+}
+
+type authResponseFrame struct {
+	Data []byte
+}
+
+func (op *authResponseFrame) encodeFrame(version uint8, f frame) (frame, error) {
+	if f == nil {
+		f = make(frame, headerSize, defaultFrameSize)
+	}
+	f.setHeader(version, 0, 0, opAuthResponse)
+	f.writeBytes(op.Data)
+	return f, nil
+}
+
+type authSuccessFrame struct {
+	Data []byte
+}
+
+type authChallengeFrame struct {
+	Data []byte
+}

+ 111 - 0
helpers.go

@@ -0,0 +1,111 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gocql
+
+import "reflect"
+
+type rowData struct {
+	Columns []string
+	Values []interface{}
+}
+
+// New creates a pointer to an empty version of whatever type 
+// is referenced by the TypeInfo receiver
+func (t *TypeInfo) New() interface{} {
+	return reflect.New(goType(t)).Interface()
+}
+
+func goType(t *TypeInfo) reflect.Type {
+	switch t.Type {
+	case TypeVarchar, TypeAscii:
+		return reflect.TypeOf(*new(string))
+	case TypeBigInt, TypeCounter, TypeTimestamp:
+		return reflect.TypeOf(*new(int64))
+	case TypeBlob:
+		return reflect.TypeOf(*new([]byte))
+	case TypeBoolean:
+		return reflect.TypeOf(*new(bool))
+	case TypeFloat:
+		return reflect.TypeOf(*new(float32))
+	case TypeDouble:
+		return reflect.TypeOf(*new(float64))
+	case TypeInt:
+		return reflect.TypeOf(*new(int))
+	case TypeUUID, TypeTimeUUID:
+		return reflect.TypeOf(*new(UUID))
+	case TypeList, TypeSet:
+		return reflect.SliceOf(goType(t.Elem))
+	case TypeMap:
+		return reflect.MapOf(goType(t.Key), goType(t.Elem))
+	default:
+		return nil
+	}
+}
+
+func dereference(i interface{}) interface{} {
+	return reflect.Indirect(reflect.ValueOf(i)).Interface()
+}
+
+func (r *rowData) rowMap(m map[string]interface{}) {
+	for i, column := range r.Columns {
+		m[column] = dereference(r.Values[i])
+	}
+}
+
+func (iter *Iter) rowData() (rowData, error) {
+	if iter.err != nil {
+		return rowData{}, iter.err
+	}
+	columns := make([]string, 0)
+	values := make([]interface{}, 0)
+	for _, column := range iter.Columns() {
+		val := column.TypeInfo.New()
+		columns = append(columns, column.Name)
+		values = append(values, val)
+	}
+	rowData := rowData{
+		Columns: columns,
+		Values: values,
+	}
+	return rowData, nil
+}
+
+// SliceMap is a helper function to make the API easier to use
+// returns the data from the query in the form of []map[string]interface{}
+func (iter *Iter) SliceMap() ([]map[string]interface{}, error) {
+	if iter.err != nil {
+		return nil, iter.err
+	}
+
+	// Not checking for the error because we just did
+	rowData, _ := iter.rowData()
+	dataToReturn := make([]map[string]interface{}, 0)
+	for iter.Scan(rowData.Values...) {
+		m := make(map[string]interface{})
+		rowData.rowMap(m)
+		dataToReturn = append(dataToReturn, m)
+	}
+	if iter.err != nil {
+		return nil, iter.err
+	}
+	return dataToReturn, nil
+}
+
+// MapScan takes a map[string]interface{} and populates it with a row
+// That is returned from cassandra.
+func (iter *Iter) MapScan(m map[string]interface{}) bool {
+	if iter.err != nil {
+		return false
+	}
+
+	// Not checking for the error because we just did
+	rowData, _ := iter.rowData()
+
+	if iter.Scan(rowData.Values...) {
+		rowData.rowMap(m)	
+		return true
+	}
+	return false
+}

+ 5 - 1
marshal.go

@@ -921,7 +921,7 @@ func marshalUUID(info *TypeInfo, value interface{}) ([]byte, error) {
 }
 
 func unmarshalUUID(info *TypeInfo, data []byte, value interface{}) error {
-	if data == nil {
+	if data == nil || len(data) == 0 {
 		switch v := value.(type) {
 		case *string:
 			*v = ""
@@ -932,11 +932,15 @@ func unmarshalUUID(info *TypeInfo, data []byte, value interface{}) error {
 		default:
 			return unmarshalErrorf("can not unmarshal X %s into %T", info, value)
 		}
+
+		return nil
 	}
+
 	u, err := UUIDFromBytes(data)
 	if err != nil {
 		return unmarshalErrorf("Unable to parse UUID: %s", err)
 	}
+
 	switch v := value.(type) {
 	case *string:
 		*v = u.String()

+ 10 - 0
policies.go

@@ -0,0 +1,10 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+//This file will be the future home for more policies
+package gocql
+
+// RetryPolicy represents the retry behavour for a query.
+type RetryPolicy struct {
+	NumRetries int //Number of times to retry a query
+}

+ 70 - 15
session.go

@@ -28,11 +28,12 @@ type Session struct {
 	prefetch float64
 	trace    Tracer
 	mu       sync.RWMutex
+	cfg      ClusterConfig
 }
 
 // NewSession wraps an existing Node.
-func NewSession(node Node) *Session {
-	return &Session{Node: node, cons: Quorum, prefetch: 0.25}
+func NewSession(c *clusterImpl) *Session {
+	return &Session{Node: c, cons: Quorum, prefetch: 0.25, cfg: c.cfg}
 }
 
 // SetConsistency sets the default consistency level for this session. This
@@ -77,7 +78,7 @@ func (s *Session) Query(stmt string, values ...interface{}) *Query {
 	s.mu.RLock()
 	qry := &Query{stmt: stmt, values: values, cons: s.cons,
 		session: s, pageSize: s.pageSize, trace: s.trace,
-		prefetch: s.prefetch}
+		prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
 	s.mu.RUnlock()
 	return qry
 }
@@ -89,19 +90,51 @@ func (s *Session) Close() {
 }
 
 func (s *Session) executeQuery(qry *Query) *Iter {
-	conn := s.Node.Pick(nil)
-	if conn == nil {
-		return &Iter{err: ErrUnavailable}
+	var itr *Iter
+	count := 0
+	for count <= qry.rt.NumRetries {
+		conn := s.Node.Pick(nil)
+		//Assign the error unavailable to the iterator
+		if conn == nil {
+			itr = &Iter{err: ErrUnavailable}
+			break
+		}
+		itr = conn.executeQuery(qry)
+		//Exit for loop if the query was successful
+		if itr.err == nil {
+			break
+		}
+		count++
 	}
-	return conn.executeQuery(qry)
+	return itr
 }
 
+// ExecuteBatch executes a batch operation and returns nil if successful
+// otherwise an error is returned describing the failure.
 func (s *Session) ExecuteBatch(batch *Batch) error {
-	conn := s.Node.Pick(nil)
-	if conn == nil {
-		return ErrUnavailable
+	// Prevent the execution of the batch if greater than the limit
+	// Currently batches have a limit of 65536 queries.
+	// https://datastax-oss.atlassian.net/browse/JAVA-229
+	if len(batch.Entries) > 65536 {
+		return ErrTooManyStmts
+	}
+	var err error
+	count := 0
+	for count <= batch.rt.NumRetries {
+		conn := s.Node.Pick(nil)
+		//Assign the error unavailable and break loop
+		if conn == nil {
+			err = ErrUnavailable
+			break
+		}
+		err = conn.executeBatch(batch)
+		//Exit loop if operation executed correctly
+		if err == nil {
+			break
+		}
+		count++
 	}
-	return conn.executeBatch(batch)
+	return err
 }
 
 // Query represents a CQL statement that can be executed.
@@ -114,6 +147,7 @@ type Query struct {
 	prefetch  float64
 	trace     Tracer
 	session   *Session
+	rt        RetryPolicy
 }
 
 // Consistency sets the consistency level for this query. If no consistency
@@ -148,6 +182,12 @@ func (q *Query) Prefetch(p float64) *Query {
 	return q
 }
 
+// RetryPolicy sets the policy to use when retrying the query.
+func (q *Query) RetryPolicy(r RetryPolicy) *Query {
+	q.rt = r
+	return q
+}
+
 // Exec executes the query without returning any rows.
 func (q *Query) Exec() error {
 	iter := q.session.executeQuery(q)
@@ -276,16 +316,30 @@ type Batch struct {
 	Type    BatchType
 	Entries []BatchEntry
 	Cons    Consistency
+	rt      RetryPolicy
 }
 
+// NewBatch creates a new batch operation without defaults from the cluster
 func NewBatch(typ BatchType) *Batch {
 	return &Batch{Type: typ}
 }
 
+// NewBatch creates a new batch operation using defaults defined in the cluster
+func (s *Session) NewBatch(typ BatchType) *Batch {
+	return &Batch{Type: typ, rt: s.cfg.RetryPolicy}
+}
+
+// Query adds the query to the batch operation
 func (b *Batch) Query(stmt string, args ...interface{}) {
 	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
 }
 
+// RetryPolicy sets the retry policy to use when executing the batch operation
+func (b *Batch) RetryPolicy(r RetryPolicy) *Batch {
+	b.rt = r
+	return b
+}
+
 type BatchType int
 
 const (
@@ -403,8 +457,9 @@ func (e Error) Error() string {
 }
 
 var (
-	ErrNotFound    = errors.New("not found")
-	ErrUnavailable = errors.New("unavailable")
-	ErrProtocol    = errors.New("protocol error")
-	ErrUnsupported = errors.New("feature not supported")
+	ErrNotFound     = errors.New("not found")
+	ErrUnavailable  = errors.New("unavailable")
+	ErrProtocol     = errors.New("protocol error")
+	ErrUnsupported  = errors.New("feature not supported")
+	ErrTooManyStmts = errors.New("too many statements")
 )