Browse Source

readded batches

Christoph Hack 12 years ago
parent
commit
3fadac3f58
3 changed files with 60 additions and 95 deletions
  1. 48 84
      conn.go
  2. 4 0
      frame.go
  3. 8 11
      session.go

+ 48 - 84
conn.go

@@ -275,90 +275,6 @@ func (c *Conn) prepareStatement(stmt string) (*queryInfo, error) {
 	return info, nil
 }
 
-func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
-	op := &queryFrame{Stmt: qry.Stmt, Cons: qry.Cons, PageSize: qry.PageSize}
-	if len(qry.Args) > 0 {
-		info, err := c.prepareStatement(qry.Stmt)
-		if err != nil {
-			return nil, err
-		}
-		op.Prepared = info.id
-		op.Values = make([][]byte, len(qry.Args))
-		for i := 0; i < len(qry.Args); i++ {
-			val, err := Marshal(info.args[i].TypeInfo, qry.Args[i])
-			if err != nil {
-				return nil, err
-			}
-			op.Values[i] = val
-		}
-	}
-	resp, err := c.exec(op)
-	if err != nil {
-		return nil, err
-	}
-	switch x := resp.(type) {
-	case resultVoidFrame:
-		return &Iter{}, nil
-	case resultRowsFrame:
-		return &Iter{columns: x.Columns, rows: x.Rows}, nil
-	case resultKeyspaceFrame:
-		c.cluster.HandleKeyspace(c, x.Keyspace)
-		return &Iter{}, nil
-	case error:
-		return &Iter{err: x}, nil
-	}
-	return nil, ErrProtocol
-}
-
-func (c *Conn) ExecuteBatch(batch *Batch) error {
-	/*
-		if c.version == 1 {
-			return ErrUnsupported
-		}
-		frame := make(frame, headerSize, defaultFrameSize)
-		frame.setHeader(c.version, 0, 0, opBatch)
-		frame.writeByte(byte(batch.Type))
-		frame.writeShort(uint16(len(batch.Entries)))
-		for i := 0; i < len(batch.Entries); i++ {
-			entry := &batch.Entries[i]
-			var info *queryInfo
-			if len(entry.Args) > 0 {
-				var err error
-				info, err = c.prepareStatement(entry.Stmt)
-				if err != nil {
-					return err
-				}
-				frame.writeByte(1)
-				frame.writeShortBytes(info.id)
-			} else {
-				frame.writeByte(0)
-				frame.writeLongString(entry.Stmt)
-			}
-			frame.writeShort(uint16(len(entry.Args)))
-			for j := 0; j < len(entry.Args); j++ {
-				val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
-				if err != nil {
-					return err
-				}
-				frame.writeBytes(val)
-			}
-		}
-		frame.writeConsistency(batch.Cons)
-
-		resp, err := c.call(frame)
-		if err != nil {
-			return err
-		}
-		switch x := resp.(type) {
-		case resultVoidFrame:
-		case error:
-			return x
-		default:
-			return ErrProtocol
-		}*/
-	return nil
-}
-
 func (c *Conn) Pick(qry *Query) *Conn {
 	return c
 }
@@ -386,6 +302,54 @@ func (c *Conn) UseKeyspace(keyspace string) error {
 	return nil
 }
 
+func (c *Conn) executeBatch(batch *Batch) error {
+	if c.version == 1 {
+		return ErrUnsupported
+	}
+	f := make(frame, headerSize, defaultFrameSize)
+	f.setHeader(c.version, 0, 0, opBatch)
+	f.writeByte(byte(batch.Type))
+	f.writeShort(uint16(len(batch.Entries)))
+	for i := 0; i < len(batch.Entries); i++ {
+		entry := &batch.Entries[i]
+		var info *queryInfo
+		if len(entry.Args) > 0 {
+			var err error
+			info, err = c.prepareStatement(entry.Stmt)
+			if err != nil {
+				return err
+			}
+			f.writeByte(1)
+			f.writeShortBytes(info.id)
+		} else {
+			f.writeByte(0)
+			f.writeLongString(entry.Stmt)
+		}
+		f.writeShort(uint16(len(entry.Args)))
+		for j := 0; j < len(entry.Args); j++ {
+			val, err := Marshal(info.args[j].TypeInfo, entry.Args[j])
+			if err != nil {
+				return err
+			}
+			f.writeBytes(val)
+		}
+	}
+	f.writeConsistency(batch.Cons)
+
+	resp, err := c.exec(f)
+	if err != nil {
+		return err
+	}
+	switch x := resp.(type) {
+	case resultVoidFrame:
+		return nil
+	case error:
+		return x
+	default:
+		return ErrProtocol
+	}
+}
+
 func (c *Conn) decodeFrame(f frame) (rval interface{}, err error) {
 	defer func() {
 		if r := recover(); r != nil {

+ 4 - 0
frame.go

@@ -266,6 +266,10 @@ func (f *frame) writeConsistency(c Consistency) {
 	f.writeShort(consistencyCodes[c])
 }
 
+func (f frame) encodeFrame(version uint8, dst frame) (frame, error) {
+	return f, nil
+}
+
 var consistencyCodes = []uint16{
 	Any:         0x0000,
 	One:         0x0001,

+ 8 - 11
session.go

@@ -42,17 +42,6 @@ func (s *Session) Close() {
 	s.Node.Close()
 }
 
-// ExecuteBatch executes a Batch on the underlying Node.
-func (s *Session) ExecuteBatch(batch *Batch) error {
-	/*
-		if batch.Cons == 0 {
-			batch.Cons = s.Cons
-		}
-		return s.Node.ExecuteBatch(batch)
-	*/
-	return nil
-}
-
 // ExecuteQuery executes a Query on the underlying Node.
 func (s *Session) ExecuteQuery(qry *Query) *Iter {
 	return s.executeQuery(qry, nil)
@@ -113,6 +102,14 @@ func (s *Session) executeQuery(qry *Query, pageState []byte) *Iter {
 	}
 }
 
+func (s *Session) ExecuteBatch(batch *Batch) error {
+	conn := s.Node.Pick(nil)
+	if conn == nil {
+		return ErrUnavailable
+	}
+	return conn.executeBatch(batch)
+}
+
 type Query struct {
 	Stmt     string
 	Args     []interface{}