Переглянути джерело

added support for frame compression

Christoph Hack 13 роки тому
батько
коміт
bd2eb29878
3 змінених файлів з 57 додано та 22 видалено
  1. 1 0
      README.md
  2. 55 21
      gocql.go
  3. 1 1
      gocql_test.go

+ 1 - 0
README.md

@@ -37,6 +37,7 @@ Features
 * Modern Cassandra client that is based on Cassandra's new native protocol
 * Compatible with Go's `database/sql` package
 * Built-In support for UUIDs (version 1 and 4)
+* Optional frame compression (using snappy)
 
 License
 -------

+ 55 - 21
gocql.go

@@ -28,11 +28,14 @@
 package gocql
 
 import (
+	"bytes"
+	"code.google.com/p/snappy-go/snappy"
 	"database/sql"
 	"database/sql/driver"
 	"encoding/binary"
 	"fmt"
 	"io"
+	"math/rand"
 	"net"
 	"strings"
 )
@@ -56,6 +59,8 @@ const (
 	flagCompressed byte = 0x01
 )
 
+var rnd = rand.New(rand.NewSource(0))
+
 type drv struct{}
 
 func (d drv) Open(name string) (driver.Conn, error) {
@@ -63,49 +68,71 @@ func (d drv) Open(name string) (driver.Conn, error) {
 }
 
 type connection struct {
-	c net.Conn
+	c           net.Conn
+	compression string
 }
 
 func Open(name string) (*connection, error) {
 	parts := strings.Split(name, " ")
 	address := ""
 	if len(parts) >= 1 {
-		address = parts[0]
+		addresses := strings.Split(parts[0], ",")
+		if len(addresses) > 0 {
+			address = addresses[rnd.Intn(len(addresses))]
+		}
 	}
 	c, err := net.Dial("tcp", address)
 	if err != nil {
 		return nil, err
 	}
-	cn := &connection{c: c}
-
-	version := []byte("3.0.0")
-	body := make([]byte, 4+len(version))
-	binary.BigEndian.PutUint16(body[0:2], uint16(len(version)))
-	copy(body[2:len(body)-2], version)
-	binary.BigEndian.PutUint16(body[len(body)-2:], 0)
-	if err := cn.send(opStartup, body); err != nil {
-		return nil, err
-	}
-
-	opcode, body, err := cn.recv()
-	if err != nil {
-		return nil, err
-	}
-	if opcode != opReady {
-		return nil, fmt.Errorf("connection not ready")
-	}
 
+	version := "3.0.0"
 	keyspace := ""
+	compression := ""
 	for i := 1; i < len(parts); i++ {
 		switch {
 		case parts[i] == "":
 			continue
 		case strings.HasPrefix(parts[i], "keyspace="):
-			keyspace = parts[i][9:]
+			keyspace = strings.TrimSpace(parts[i][9:])
+		case strings.HasPrefix(parts[i], "compression="):
+			compression = strings.TrimSpace(parts[i][12:])
+			if compression != "snappy" {
+				return nil, fmt.Errorf("unknown compression algorithm %q",
+					compression)
+			}
+		case strings.HasPrefix(parts[i], "version="):
+			compression = strings.TrimSpace(parts[i][8:])
 		default:
 			return nil, fmt.Errorf("unsupported option %q", parts[i])
 		}
 	}
+
+	cn := &connection{c: c, compression: compression}
+
+	b := &bytes.Buffer{}
+	binary.Write(b, binary.BigEndian, uint16(len(version)))
+	b.WriteString(version)
+	if compression != "" {
+		binary.Write(b, binary.BigEndian, uint16(1))
+		binary.Write(b, binary.BigEndian, uint16(1))
+		binary.Write(b, binary.BigEndian, uint16(len(compression)))
+		b.WriteString(compression)
+	} else {
+		binary.Write(b, binary.BigEndian, uint16(0))
+	}
+	if err := cn.send(opStartup, b.Bytes()); err != nil {
+		return nil, err
+	}
+
+	opcode, _, err := cn.recv()
+	if err != nil {
+		return nil, err
+	}
+	if opcode != opReady {
+		return nil, fmt.Errorf("connection not ready")
+	}
+
 	if keyspace != "" {
 		st, err := cn.Prepare(fmt.Sprintf("USE %s", keyspace))
 		if err != nil {
@@ -147,6 +174,13 @@ func (cn *connection) recv() (byte, []byte, error) {
 			return 0, nil, err
 		}
 	}
+	if header[1]&flagCompressed != 0 && cn.compression == "snappy" {
+		var err error
+		body, err = snappy.Decode(nil, body)
+		if err != nil {
+			return 0, nil, err
+		}
+	}
 	if opcode == opError {
 		code := binary.BigEndian.Uint32(body[0:4])
 		msglen := binary.BigEndian.Uint16(body[4:6])

+ 1 - 1
gocql_test.go

@@ -53,7 +53,7 @@ var pages = []*Page{
 }
 
 func TestWiki(t *testing.T) {
-	db, err := sql.Open("gocql", "localhost:8000")
+	db, err := sql.Open("gocql", "localhost:8000 compression=snappy")
 	if err != nil {
 		t.Fatal(err)
 	}