Christoph Hack 12 лет назад
Родитель
Сommit
d2fcc2dc4a
4 измененных файлов с 67 добавлено и 22 удалено
  1. 50 5
      conn.go
  2. 10 11
      gocql.go
  3. 4 4
      gocql_test.go
  4. 3 2
      session.go

+ 50 - 5
conn.go

@@ -191,13 +191,13 @@ func (c *Conn) ping() error {
 	return err
 }
 
-func (c *Conn) prepareQuery(stmt string) *queryInfo {
+func (c *Conn) prepareStatement(stmt string) (*queryInfo, error) {
 	c.prepMu.Lock()
 	info := c.prep[stmt]
 	if info != nil {
 		c.prepMu.Unlock()
 		info.wg.Wait()
-		return info
+		return info, nil
 	}
 	info = new(queryInfo)
 	info.wg.Add(1)
@@ -211,7 +211,10 @@ func (c *Conn) prepareQuery(stmt string) *queryInfo {
 
 	frame, err := c.call(frame)
 	if err != nil {
-		return nil
+		return nil, err
+	}
+	if frame[3] == opError {
+		return nil, frame.readErrorFrame()
 	}
 	frame.skipHeader()
 	frame.readInt() // kind
@@ -219,7 +222,7 @@ func (c *Conn) prepareQuery(stmt string) *queryInfo {
 	info.args = frame.readMetaData()
 	info.rval = frame.readMetaData()
 	info.wg.Done()
-	return info
+	return info, nil
 }
 
 func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
@@ -238,6 +241,44 @@ func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
 }
 
 func (c *Conn) ExecuteBatch(batch *Batch) error {
+	frame := make(frame, headerSize, defaultFrameSize)
+	frame.setHeader(protoRequest, 0, 0, opBatch)
+	frame.writeByte(byte(batch.Type))
+	frame.writeShort(uint16(len(batch.Entries)))
+	for i := 0; i < len(batch.Entries); i++ {
+		entry := &batch.Entries[i]
+		var info *queryInfo
+		if len(entry.Args) > 0 {
+			info, err := c.prepareStatement(entry.Stmt)
+			if err != nil {
+				return err
+			}
+			frame.writeByte(1)
+			frame.writeShortBytes(info.id)
+		} else {
+			frame.writeByte(0)
+			frame.writeLongString(entry.Stmt)
+		}
+		frame.writeShort(uint16(len(entry.Args)))
+		for j := 0; j < len(entry.Args); j++ {
+			val, err := Marshal(info.args[j].TypeInfo, entry.Args[i])
+			if err != nil {
+				return err
+			}
+			frame.writeBytes(val)
+		}
+	}
+	frame.writeShort(uint16(batch.Cons))
+
+	frame, err := c.call(frame)
+	if err != nil {
+		return err
+	}
+
+	if frame[3] == opError {
+		return frame.readErrorFrame()
+	}
+
 	return nil
 }
 
@@ -248,7 +289,11 @@ func (c *Conn) Close() {
 func (c *Conn) executeQuery(query *Query) (frame, error) {
 	var info *queryInfo
 	if len(query.Args) > 0 {
-		info = c.prepareQuery(query.Stmt)
+		var err error
+		info, err = c.prepareStatement(query.Stmt)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	frame := make(frame, headerSize, headerSize+512)

+ 10 - 11
gocql.go

@@ -50,16 +50,16 @@ func (b *Batch) Apply() error {
 type Consistency uint16
 
 const (
-	ConAny         Consistency = 0x0000
-	ConOne         Consistency = 0x0001
-	ConTwo         Consistency = 0x0002
-	ConThree       Consistency = 0x0003
-	ConQuorum      Consistency = 0x0004
-	ConAll         Consistency = 0x0005
-	ConLocalQuorum Consistency = 0x0006
-	ConEachQuorum  Consistency = 0x0007
-	ConSerial      Consistency = 0x0008
-	ConLocalSerial Consistency = 0x0009
+	Any Consistency = iota
+	One
+	Two
+	Three
+	Quorum
+	All
+	LocalQuorum
+	EachQuorum
+	Serial
+	LocalSerial
 )
 
 type Error struct {
@@ -74,6 +74,5 @@ func (e Error) Error() string {
 var (
 	ErrNotFound        = errors.New("not found")
 	ErrNoHostAvailable = errors.New("no host available")
-	ErrQueryUnbound    = errors.New("can not execute unbound query")
 	ErrProtocol        = errors.New("protocol error")
 )

+ 4 - 4
gocql_test.go

@@ -125,7 +125,7 @@ func TestSimple(t *testing.T) {
 
 	db := NewSession(Config{
 		Nodes:       []string{srv.Address},
-		Consistency: ConQuorum,
+		Consistency: Quorum,
 	})
 	if err := db.Query("void").Exec(); err != nil {
 		t.Error(err)
@@ -138,7 +138,7 @@ func TestTimeout(t *testing.T) {
 
 	db := NewSession(Config{
 		Nodes:       []string{srv.Address},
-		Consistency: ConQuorum,
+		Consistency: Quorum,
 	})
 
 	go func() {
@@ -157,7 +157,7 @@ func TestSlowQuery(t *testing.T) {
 
 	db := NewSession(Config{
 		Nodes:       []string{srv.Address},
-		Consistency: ConQuorum,
+		Consistency: Quorum,
 	})
 
 	if err := db.Query("slow").Exec(); err != nil {
@@ -175,7 +175,7 @@ func TestRoundRobin(t *testing.T) {
 	}
 	db := NewSession(Config{
 		Nodes:       addrs,
-		Consistency: ConQuorum,
+		Consistency: Quorum,
 	})
 	time.Sleep(1 * time.Second)
 

+ 3 - 2
session.go

@@ -217,10 +217,11 @@ func (iter *Iter) Close() error {
 type Batch struct {
 	Type    BatchType
 	Entries []BatchEntry
+	Cons    Consistency
 }
 
-func NewBatch(typ BatchType) *Batch {
-	return &Batch{Type: typ}
+func NewBatch(typ BatchType, cons Consistency) *Batch {
+	return &Batch{Type: typ, Cons: cons}
 }
 
 func (b *Batch) Query(stmt string, args ...interface{}) {