Prechádzať zdrojové kódy

improved the API a bit

Christoph Hack 12 rokov pred
rodič
commit
a36312c89e
5 zmenil súbory, kde vykonal 372 pridanie a 282 odobranie
  1. 34 11
      conn.go
  2. 2 2
      frame.go
  3. 24 212
      gocql.go
  4. 202 57
      session.go
  5. 110 0
      topology.go

+ 34 - 11
conn.go

@@ -222,30 +222,53 @@ func (c *Conn) prepareQuery(stmt string) *queryInfo {
 	return info
 }
 
+func (c *Conn) ExecuteQuery(qry *Query) (*Iter, error) {
+	frame, err := c.executeQuery(qry)
+	if err != nil {
+		return nil, err
+	}
+	if frame[3] == opError {
+		return nil, frame.readErrorFrame()
+	} else if frame[3] == opResult {
+		iter := new(Iter)
+		iter.readFrame(frame)
+		return iter, nil
+	}
+	return nil, nil
+}
+
+func (c *Conn) ExecuteBatch(batch *Batch) error {
+	return nil
+}
+
+func (c *Conn) Close() {
+	c.conn.Close()
+}
+
 func (c *Conn) executeQuery(query *Query) (frame, error) {
 	var info *queryInfo
-	if len(query.args) > 0 {
-		info = c.prepareQuery(query.stmt)
+	if len(query.Args) > 0 {
+		info = c.prepareQuery(query.Stmt)
 	}
 
 	frame := make(frame, headerSize, headerSize+512)
 	if info == nil {
 		frame.setHeader(protoRequest, 0, 0, opQuery)
-		frame.writeLongString(query.stmt)
+		frame.writeLongString(query.Stmt)
 	} else {
 		frame.setHeader(protoRequest, 0, 0, opExecute)
 		frame.writeShortBytes(info.id)
 	}
-	frame.writeShort(uint16(query.cons))
+	frame.writeShort(uint16(query.Cons))
 	flags := uint8(0)
-	if len(query.args) > 0 {
+	if len(query.Args) > 0 {
 		flags |= flagQueryValues
 	}
 	frame.writeByte(flags)
-	if len(query.args) > 0 {
-		frame.writeShort(uint16(len(query.args)))
-		for i := 0; i < len(query.args); i++ {
-			val, err := Marshal(info.args[i].TypeInfo, query.args[i])
+	if len(query.Args) > 0 {
+		frame.writeShort(uint16(len(query.Args)))
+		for i := 0; i < len(query.Args); i++ {
+			val, err := Marshal(info.args[i].TypeInfo, query.Args[i])
 			if err != nil {
 				return nil, err
 			}
@@ -270,8 +293,8 @@ func (c *Conn) executeQuery(query *Query) (frame, error) {
 
 type queryInfo struct {
 	id   []byte
-	args []columnInfo
-	rval []columnInfo
+	args []ColumnInfo
+	rval []ColumnInfo
 	wg   sync.WaitGroup
 }
 

+ 2 - 2
frame.go

@@ -231,7 +231,7 @@ func (f *frame) readTypeInfo() *TypeInfo {
 	return typ
 }
 
-func (f *frame) readMetaData() []columnInfo {
+func (f *frame) readMetaData() []ColumnInfo {
 	flags := f.readInt()
 	numColumns := f.readInt()
 	globalKeyspace := ""
@@ -240,7 +240,7 @@ func (f *frame) readMetaData() []columnInfo {
 		globalKeyspace = f.readString()
 		globalTable = f.readString()
 	}
-	info := make([]columnInfo, numColumns)
+	info := make([]ColumnInfo, numColumns)
 	for i := 0; i < numColumns; i++ {
 		info[i].Keyspace = globalKeyspace
 		info[i].Table = globalTable

+ 24 - 212
gocql.go

@@ -6,230 +6,46 @@ package gocql
 
 import (
 	"errors"
-	"fmt"
-	"strings"
-	"time"
 )
 
-type Config struct {
-	Nodes       []string
-	CQLVersion  string
-	Keyspace    string
-	Consistency Consistency
-	DefaultPort int
-	Timeout     time.Duration
-	NodePicker  NodePicker
-	Reconnector Reconnector
+type queryContext interface {
+	executeQuery(query *Query) (frame, error)
 }
 
-func (c *Config) normalize() {
-	if c.CQLVersion == "" {
-		c.CQLVersion = "3.0.0"
-	}
-	if c.DefaultPort == 0 {
-		c.DefaultPort = 9042
-	}
-	if c.Timeout <= 0 {
-		c.Timeout = 200 * time.Millisecond
-	}
-	if c.NodePicker == nil {
-		c.NodePicker = NewRoundRobinPicker()
-	}
-	if c.Reconnector == nil {
-		c.Reconnector = NewExponentialReconnector(1*time.Second, 10*time.Minute)
-	}
-	for i := 0; i < len(c.Nodes); i++ {
-		c.Nodes[i] = strings.TrimSpace(c.Nodes[i])
-		if strings.IndexByte(c.Nodes[i], ':') < 0 {
-			c.Nodes[i] = fmt.Sprintf("%s:%d", c.Nodes[i], c.DefaultPort)
-		}
-	}
+type ColumnInfo struct {
+	Keyspace string
+	Table    string
+	Name     string
+	TypeInfo *TypeInfo
 }
 
-type Session struct {
-	cfg         *Config
-	pool        NodePicker
-	reconnector Reconnector
-	keyspace    string
-	nohosts     chan bool
-}
+type BatchType int
 
-func NewSession(cfg Config) *Session {
-	cfg.normalize()
-	s := &Session{
-		cfg:         &cfg,
-		nohosts:     make(chan bool),
-		reconnector: cfg.Reconnector,
-		pool:        cfg.NodePicker,
-	}
-	for _, address := range cfg.Nodes {
-		go s.reconnector.Reconnect(s, address)
-	}
-	return s
+const (
+	LoggedBatch   BatchType = 0
+	UnloggedBatch BatchType = 1
+	CounterBatch  BatchType = 2
+)
+
+/*
+type Batch struct {
+	queries []*Query
+	ctx     queryContext
+	cons    Consistency
 }
 
-func (s *Session) Query(stmt string, args ...interface{}) *Query {
+func (b *Batch) Query(stmt string, args ...interface{}) *Query {
 	return &Query{
 		stmt: stmt,
 		args: args,
-		cons: s.cfg.Consistency,
-		ctx:  s,
+		cons: b.cons,
+		//ctx:  b,
 	}
 }
 
-func (s *Session) Do(query *Query) *Query {
-	q := *query
-	q.ctx = s
-	return &q
-}
-
-func (s *Session) Close() {
-	return
-}
-
-func (s *Session) executeQuery(query *Query) (frame, error) {
-	node := s.pool.Pick(query)
-	if node == nil {
-		<-time.After(s.cfg.Timeout)
-		node = s.pool.Pick(query)
-	}
-	if node == nil {
-		return nil, ErrNoHostAvailable
-	}
-	return node.conn.executeQuery(query)
-}
-
-type Node struct {
-	conn *Conn
-}
-
-type Query struct {
-	stmt string
-	args []interface{}
-	cons Consistency
-	ctx  queryContext
-}
-
-func NewQuery(stmt string) *Query {
-	return &Query{stmt: stmt, cons: ConQuorum}
-}
-
-func (q *Query) Exec() error {
-	if q.ctx == nil {
-		return ErrQueryUnbound
-	}
-	frame, err := q.ctx.executeQuery(q)
-	if err != nil {
-		return err
-	} else if frame[3] == opError {
-		return frame.readErrorFrame()
-	} else if frame[3] != opResult {
-		return ErrProtocol
-	}
+func (b *Batch) Apply() error {
 	return nil
-}
-
-func (q *Query) Iter() *Iter {
-	if q.ctx == nil {
-		return &Iter{err: ErrQueryUnbound}
-	}
-	frame, err := q.ctx.executeQuery(q)
-	if err != nil {
-		return &Iter{err: err}
-	} else if frame[3] == opError {
-		return &Iter{err: frame.readErrorFrame()}
-	} else if frame[3] != opResult {
-		return &Iter{err: ErrProtocol}
-	}
-	iter := new(Iter)
-	iter.readFrame(frame)
-	return iter
-}
-
-func (q *Query) Scan(values ...interface{}) error {
-	found := false
-	iter := q.Iter()
-	if iter.Scan(values...) {
-		found = true
-	}
-	if err := iter.Close(); err != nil {
-		return err
-	} else if !found {
-		return ErrNotFound
-	}
-	return nil
-}
-
-func (q *Query) Consistency(cons Consistency) *Query {
-	q.cons = cons
-	return q
-}
-
-type Iter struct {
-	err    error
-	pos    int
-	values [][]byte
-	info   []columnInfo
-}
-
-func (iter *Iter) readFrame(frame frame) {
-	defer func() {
-		if r := recover(); r != nil {
-			if e, ok := r.(error); ok && e == ErrProtocol {
-				iter.err = e
-				return
-			}
-			panic(r)
-		}
-	}()
-	frame.skipHeader()
-	iter.pos = 0
-	iter.err = nil
-	iter.values = nil
-	if frame.readInt() != resultKindRows {
-		return
-	}
-	iter.info = frame.readMetaData()
-	numRows := frame.readInt()
-	iter.values = make([][]byte, numRows*len(iter.info))
-	for i := 0; i < len(iter.values); i++ {
-		iter.values[i] = frame.readBytes()
-	}
-}
-
-func (iter *Iter) Scan(values ...interface{}) bool {
-	if iter.err != nil || iter.pos >= len(iter.values) {
-		return false
-	}
-	if len(values) != len(iter.info) {
-		iter.err = errors.New("count mismatch")
-		return false
-	}
-	for i := 0; i < len(values); i++ {
-		err := Unmarshal(iter.info[i].TypeInfo, iter.values[i+iter.pos], values[i])
-		if err != nil {
-			iter.err = err
-			return false
-		}
-	}
-	iter.pos += len(values)
-	return true
-}
-
-func (iter *Iter) Close() error {
-	return iter.err
-}
-
-type queryContext interface {
-	executeQuery(query *Query) (frame, error)
-}
-
-type columnInfo struct {
-	Keyspace string
-	Table    string
-	Name     string
-	TypeInfo *TypeInfo
-}
+} */
 
 type Consistency uint16
 
@@ -261,7 +77,3 @@ var (
 	ErrQueryUnbound    = errors.New("can not execute unbound query")
 	ErrProtocol        = errors.New("protocol error")
 )
-
-type node struct {
-	conn *Conn
-}

+ 202 - 57
session.go

@@ -1,88 +1,233 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
 package gocql
 
 import (
-	"sync"
-	"sync/atomic"
+	"errors"
+	"fmt"
+	"strings"
 	"time"
 )
 
-type NodePicker interface {
-	AddNode(node *Node)
-	RemoveNode(node *Node)
-	Pick(query *Query) *Node
+type Config struct {
+	Nodes       []string
+	CQLVersion  string
+	Keyspace    string
+	Consistency Consistency
+	DefaultPort int
+	Timeout     time.Duration
+	NodePicker  NodePicker
+	Reconnector Reconnector
 }
 
-type RoundRobinPicker struct {
-	pool []*Node
-	pos  uint32
-	mu   sync.RWMutex
+func (c *Config) normalize() {
+	if c.CQLVersion == "" {
+		c.CQLVersion = "3.0.0"
+	}
+	if c.DefaultPort == 0 {
+		c.DefaultPort = 9042
+	}
+	if c.Timeout <= 0 {
+		c.Timeout = 200 * time.Millisecond
+	}
+	if c.NodePicker == nil {
+		c.NodePicker = NewRoundRobinPicker()
+	}
+	if c.Reconnector == nil {
+		c.Reconnector = NewExponentialReconnector(1*time.Second, 10*time.Minute)
+	}
+	for i := 0; i < len(c.Nodes); i++ {
+		c.Nodes[i] = strings.TrimSpace(c.Nodes[i])
+		if strings.IndexByte(c.Nodes[i], ':') < 0 {
+			c.Nodes[i] = fmt.Sprintf("%s:%d", c.Nodes[i], c.DefaultPort)
+		}
+	}
 }
 
-func NewRoundRobinPicker() *RoundRobinPicker {
-	return &RoundRobinPicker{}
+type Session struct {
+	cfg         *Config
+	pool        NodePicker
+	reconnector Reconnector
+	keyspace    string
+	nohosts     chan bool
 }
 
-func (r *RoundRobinPicker) AddNode(node *Node) {
-	r.mu.Lock()
-	r.pool = append(r.pool, node)
-	r.mu.Unlock()
+func NewSession(cfg Config) *Session {
+	cfg.normalize()
+	s := &Session{
+		cfg:         &cfg,
+		nohosts:     make(chan bool),
+		reconnector: cfg.Reconnector,
+		pool:        cfg.NodePicker,
+	}
+	for _, address := range cfg.Nodes {
+		go s.reconnector.Reconnect(s, address)
+	}
+	return s
 }
 
-func (r *RoundRobinPicker) RemoveNode(node *Node) {
-	r.mu.Lock()
-	n := len(r.pool)
-	for i := 0; i < n; i++ {
-		if r.pool[i] == node {
-			r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
-			r.pool = r.pool[:n-1]
-			break
-		}
+func (s *Session) Query(stmt string, args ...interface{}) QueryBuilder {
+	return QueryBuilder{
+		&Query{
+			Stmt: stmt,
+			Args: args,
+			Cons: s.cfg.Consistency,
+		},
+		s,
 	}
-	r.mu.Unlock()
 }
 
-func (r *RoundRobinPicker) Pick(query *Query) *Node {
-	pos := atomic.AddUint32(&r.pos, 1)
-	var node *Node
-	r.mu.RLock()
-	if len(r.pool) > 0 {
-		node = r.pool[pos%uint32(len(r.pool))]
+func (s *Session) Do(qry *Query) QueryBuilder {
+	q := *qry
+	return QueryBuilder{&q, s}
+}
+
+func (s *Session) Close() {
+	return
+}
+
+func (s *Session) ExecuteBatch(batch *Batch) error {
+	return nil
+}
+
+func (s *Session) ExecuteQuery(qry *Query) (*Iter, error) {
+	node := s.pool.Pick(qry)
+	if node == nil {
+		<-time.After(s.cfg.Timeout)
+		node = s.pool.Pick(qry)
+	}
+	if node == nil {
+		return nil, ErrNoHostAvailable
 	}
-	r.mu.RUnlock()
-	return node
+	return node.ExecuteQuery(qry)
 }
 
-type Reconnector interface {
-	Reconnect(session *Session, address string)
+type Query struct {
+	Stmt     string
+	Args     []interface{}
+	Cons     Consistency
+	PageSize int
+	Trace    bool
 }
 
-type ExponentialReconnector struct {
-	baseDelay time.Duration
-	maxDelay  time.Duration
+type QueryBuilder struct {
+	qry *Query
+	ctx Node
 }
 
-func NewExponentialReconnector(baseDelay, maxDelay time.Duration) *ExponentialReconnector {
-	return &ExponentialReconnector{baseDelay, maxDelay}
+func (b QueryBuilder) Args(args ...interface{}) {
+	b.qry.Args = args
 }
 
-func (e *ExponentialReconnector) Reconnect(session *Session, address string) {
-	delay := e.baseDelay
-	for {
-		conn, err := Connect(address, session.cfg)
-		if err != nil {
-			<-time.After(delay)
-			if delay *= 2; delay > e.maxDelay {
-				delay = e.maxDelay
+func (b QueryBuilder) Consistency(cons Consistency) QueryBuilder {
+	b.qry.Cons = cons
+	return b
+}
+
+func (b QueryBuilder) Trace(trace bool) QueryBuilder {
+	b.qry.Trace = trace
+	return b
+}
+
+func (b QueryBuilder) PageSize(size int) QueryBuilder {
+	b.qry.PageSize = size
+	return b
+}
+
+func (b QueryBuilder) Exec() error {
+	_, err := b.ctx.ExecuteQuery(b.qry)
+	return err
+}
+
+func (b QueryBuilder) Iter() *Iter {
+	iter, err := b.ctx.ExecuteQuery(b.qry)
+	if err != nil {
+		return &Iter{err: err}
+	}
+	return iter
+}
+
+func (b QueryBuilder) Scan(values ...interface{}) error {
+	iter := b.Iter()
+	iter.Scan(values...)
+	return iter.Close()
+}
+
+type Iter struct {
+	err    error
+	pos    int
+	values [][]byte
+	info   []ColumnInfo
+}
+
+func (iter *Iter) readFrame(frame frame) {
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok && e == ErrProtocol {
+				iter.err = e
+				return
 			}
-			continue
+			panic(r)
 		}
-		node := &Node{conn}
-		go func() {
-			conn.Serve()
-			session.pool.RemoveNode(node)
-			e.Reconnect(session, address)
-		}()
-		session.pool.AddNode(node)
+	}()
+	frame.skipHeader()
+	iter.pos = 0
+	iter.err = nil
+	iter.values = nil
+	if frame.readInt() != resultKindRows {
 		return
 	}
+	iter.info = frame.readMetaData()
+	numRows := frame.readInt()
+	iter.values = make([][]byte, numRows*len(iter.info))
+	for i := 0; i < len(iter.values); i++ {
+		iter.values[i] = frame.readBytes()
+	}
+}
+
+func (iter *Iter) Columns() []ColumnInfo {
+	return iter.info
+}
+
+func (iter *Iter) Scan(values ...interface{}) bool {
+	if iter.err != nil || iter.pos >= len(iter.values) {
+		return false
+	}
+	if len(values) != len(iter.info) {
+		iter.err = errors.New("count mismatch")
+		return false
+	}
+	for i := 0; i < len(values); i++ {
+		err := Unmarshal(iter.info[i].TypeInfo, iter.values[i+iter.pos], values[i])
+		if err != nil {
+			iter.err = err
+			return false
+		}
+	}
+	iter.pos += len(values)
+	return true
+}
+
+func (iter *Iter) Close() error {
+	return iter.err
+}
+
+type Batch struct {
+	Type    BatchType
+	Entries []BatchEntry
+}
+
+func NewBatch(typ BatchType) *Batch {
+	return &Batch{Type: typ}
+}
+
+func (b *Batch) Query(stmt string, args ...interface{}) {
+	b.Entries = append(b.Entries, BatchEntry{Stmt: stmt, Args: args})
+}
+
+type BatchEntry struct {
+	Stmt string
+	Args []interface{}
 }

+ 110 - 0
topology.go

@@ -0,0 +1,110 @@
+package gocql
+
+import (
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+type Node interface {
+	ExecuteQuery(qry *Query) (*Iter, error)
+	ExecuteBatch(batch *Batch) error
+	Close()
+}
+
+type NodePicker interface {
+	AddNode(node Node)
+	RemoveNode(node Node)
+	Pick(qry *Query) Node
+}
+
+type RoundRobinPicker struct {
+	pool []Node
+	pos  uint32
+	mu   sync.RWMutex
+}
+
+func NewRoundRobinPicker() *RoundRobinPicker {
+	return &RoundRobinPicker{}
+}
+
+func (r *RoundRobinPicker) AddNode(node Node) {
+	r.mu.Lock()
+	r.pool = append(r.pool, node)
+	r.mu.Unlock()
+}
+
+func (r *RoundRobinPicker) RemoveNode(node Node) {
+	r.mu.Lock()
+	n := len(r.pool)
+	for i := 0; i < n; i++ {
+		if r.pool[i] == node {
+			r.pool[i], r.pool[n-1] = r.pool[n-1], r.pool[i]
+			r.pool = r.pool[:n-1]
+			break
+		}
+	}
+	r.mu.Unlock()
+}
+
+func (r *RoundRobinPicker) Pick(query *Query) Node {
+	pos := atomic.AddUint32(&r.pos, 1)
+	var node Node
+	r.mu.RLock()
+	if len(r.pool) > 0 {
+		node = r.pool[pos%uint32(len(r.pool))]
+	}
+	r.mu.RUnlock()
+	return node
+}
+
+type Reconnector interface {
+	Reconnect(session *Session, address string)
+}
+
+type ExponentialReconnector struct {
+	baseDelay time.Duration
+	maxDelay  time.Duration
+}
+
+func NewExponentialReconnector(baseDelay, maxDelay time.Duration) *ExponentialReconnector {
+	return &ExponentialReconnector{baseDelay, maxDelay}
+}
+
+func (e *ExponentialReconnector) Reconnect(session *Session, address string) {
+	delay := e.baseDelay
+	for {
+		conn, err := Connect(address, session.cfg)
+		if err != nil {
+			<-time.After(delay)
+			if delay *= 2; delay > e.maxDelay {
+				delay = e.maxDelay
+			}
+			continue
+		}
+		node := &Host{conn}
+		go func() {
+			conn.Serve()
+			session.pool.RemoveNode(node)
+			e.Reconnect(session, address)
+		}()
+		session.pool.AddNode(node)
+		return
+	}
+}
+
+type Host struct {
+	conn *Conn
+}
+
+func (h *Host) ExecuteQuery(qry *Query) (*Iter, error) {
+	return h.conn.ExecuteQuery(qry)
+}
+
+func (h *Host) ExecuteBatch(batch *Batch) error {
+	return nil
+}
+
+func (h *Host) Close() {
+	h.conn.conn.Close()
+}