|
|
@@ -63,6 +63,9 @@ const (
|
|
|
keyCompression string = "COMPRESSION"
|
|
|
)
|
|
|
|
|
|
+var consistencyLevels = map[string]byte{"any": 0x00, "one": 0x01, "two": 0x02,
|
|
|
+ "three": 0x03, "quorum": 0x04, "all": 0x05, "local_quorum": 0x06, "each_quorum": 0x07}
|
|
|
+
|
|
|
var rnd = rand.New(rand.NewSource(0))
|
|
|
|
|
|
type drv struct{}
|
|
|
@@ -74,6 +77,7 @@ func (d drv) Open(name string) (driver.Conn, error) {
|
|
|
type connection struct {
|
|
|
c net.Conn
|
|
|
compression string
|
|
|
+ consistency byte
|
|
|
}
|
|
|
|
|
|
func Open(name string) (*connection, error) {
|
|
|
@@ -91,8 +95,12 @@ func Open(name string) (*connection, error) {
|
|
|
}
|
|
|
|
|
|
version := "3.0.0"
|
|
|
- keyspace := ""
|
|
|
- compression := ""
|
|
|
+ var (
|
|
|
+ keyspace string
|
|
|
+ compression string
|
|
|
+ consistency byte = 0x01
|
|
|
+ ok bool
|
|
|
+ )
|
|
|
for i := 1; i < len(parts); i++ {
|
|
|
switch {
|
|
|
case parts[i] == "":
|
|
|
@@ -106,13 +114,18 @@ func Open(name string) (*connection, error) {
|
|
|
compression)
|
|
|
}
|
|
|
case strings.HasPrefix(parts[i], "version="):
|
|
|
- compression = strings.TrimSpace(parts[i][8:])
|
|
|
+ version = strings.TrimSpace(parts[i][8:])
|
|
|
+ case strings.HasPrefix(parts[i], "consistency="):
|
|
|
+ cs := strings.TrimSpace(parts[i][12:])
|
|
|
+ if consistency, ok = consistencyLevels[cs]; !ok {
|
|
|
+ return nil, fmt.Errorf("unknown consistency level %q", cs)
|
|
|
+ }
|
|
|
default:
|
|
|
return nil, fmt.Errorf("unsupported option %q", parts[i])
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- cn := &connection{c: c, compression: compression}
|
|
|
+ cn := &connection{c: c, compression: compression, consistency: consistency}
|
|
|
|
|
|
b := &bytes.Buffer{}
|
|
|
|
|
|
@@ -306,6 +319,7 @@ func (st *statement) exec(v []driver.Value) error {
|
|
|
copy(body[p+4:], b)
|
|
|
p += 4 + len(b)
|
|
|
}
|
|
|
+ binary.BigEndian.PutUint16(body[p:], uint16(st.cn.consistency))
|
|
|
if err := st.cn.send(opExecute, body); err != nil {
|
|
|
return err
|
|
|
}
|