瀏覽代碼

add support engine group

xormplus 8 年之前
父節點
當前提交
3f81e18373
共有 6 個文件被更改,包括 350 次插入5 次删除
  1. 2 0
      engine.go
  2. 180 0
      engine_group.go
  3. 148 0
      engine_group_policy.go
  4. 8 0
      engine_maxlife.go
  5. 2 2
      session.go
  6. 10 3
      session_raw.go

+ 2 - 0
engine.go

@@ -50,6 +50,8 @@ type Engine struct {
 	disableGlobalCache bool
 
 	tagHandlers map[string]tagHandler
+
+	engineGroup *EngineGroup
 }
 
 // ShowSQL show SQL statement or not on logger if log level is great than INFO

+ 180 - 0
engine_group.go

@@ -0,0 +1,180 @@
+// Copyright 2017 The Xorm Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xorm
+
+import (
+	"strings"
+
+	"github.com/xormplus/core"
+)
+
+type EngineGroup struct {
+	*Engine
+	slaves []*Engine
+	policy GroupPolicy
+}
+
+func NewEngineGroup(args1 interface{}, args2 interface{}, policies ...GroupPolicy) (*EngineGroup, error) {
+	var eg EngineGroup
+	if len(policies) > 0 {
+		eg.policy = policies[0]
+	} else {
+		eg.policy = NewRandomPolicy()
+	}
+
+	driverName, ok1 := args1.(string)
+	dataSourceNames, ok2 := args2.(string)
+	if ok1 && ok2 {
+		conns := strings.Split(dataSourceNames, ";")
+		engines := make([]*Engine, len(conns))
+		for i, conn := range conns {
+			engine, err := NewEngine(driverName, conn)
+			if err != nil {
+				return nil, err
+			}
+			engine.engineGroup = &eg
+			engines[i] = engine
+		}
+
+		eg.Engine = engines[0]
+		eg.slaves = engines[1:]
+		return &eg, nil
+	}
+
+	master, ok3 := args1.(*Engine)
+	slaves, ok4 := args2.([]*Engine)
+	if ok3 && ok4 {
+		master.engineGroup = &eg
+		for i := 0; i < len(slaves); i++ {
+			slaves[i].engineGroup = &eg
+		}
+		eg.Engine = master
+		eg.slaves = slaves
+		return &eg, nil
+	}
+	return nil, ErrParamsType
+}
+
+func (eg *EngineGroup) SetPolicy(policy GroupPolicy) *EngineGroup {
+	eg.policy = policy
+	return eg
+}
+
+func (eg *EngineGroup) Master() *Engine {
+	return eg.Engine
+}
+
+// Slave returns one of the physical databases which is a slave according the policy
+func (eg *EngineGroup) Slave() *Engine {
+	switch len(eg.slaves) {
+	case 0:
+		return eg.Engine
+	case 1:
+		return eg.slaves[0]
+	}
+	return eg.policy.Slave(eg)
+}
+
+func (eg *EngineGroup) Slaves() []*Engine {
+	return eg.slaves
+}
+
+func (eg *EngineGroup) GetSlave(i int) *Engine {
+	return eg.slaves[i]
+}
+
+// ShowSQL show SQL statement or not on logger if log level is great than INFO
+func (eg *EngineGroup) ShowSQL(show ...bool) {
+	eg.Engine.ShowSQL(show...)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].ShowSQL(show...)
+	}
+}
+
+// ShowExecTime show SQL statement and execute time or not on logger if log level is great than INFO
+func (eg *EngineGroup) ShowExecTime(show ...bool) {
+	eg.Engine.ShowExecTime(show...)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].ShowExecTime(show...)
+	}
+}
+
+// SetMapper set the name mapping rules
+func (eg *EngineGroup) SetMapper(mapper core.IMapper) {
+	eg.Engine.SetMapper(mapper)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].SetMapper(mapper)
+	}
+}
+
+// SetLogger set the new logger
+func (eg *EngineGroup) SetLogger(logger core.ILogger) {
+	eg.Engine.SetLogger(logger)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].SetLogger(logger)
+	}
+}
+
+// SetTableMapper set the table name mapping rule
+func (eg *EngineGroup) SetTableMapper(mapper core.IMapper) {
+	eg.Engine.TableMapper = mapper
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].TableMapper = mapper
+	}
+}
+
+// SetColumnMapper set the column name mapping rule
+func (eg *EngineGroup) SetColumnMapper(mapper core.IMapper) {
+	eg.Engine.ColumnMapper = mapper
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].ColumnMapper = mapper
+	}
+}
+
+// SetMaxOpenConns is only available for go 1.2+
+func (eg *EngineGroup) SetMaxOpenConns(conns int) {
+	eg.Engine.db.SetMaxOpenConns(conns)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].db.SetMaxOpenConns(conns)
+	}
+}
+
+// SetMaxIdleConns set the max idle connections on pool, default is 2
+func (eg *EngineGroup) SetMaxIdleConns(conns int) {
+	eg.Engine.db.SetMaxIdleConns(conns)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].db.SetMaxIdleConns(conns)
+	}
+}
+
+// Close the engine
+func (eg *EngineGroup) Close() error {
+	err := eg.Engine.Close()
+	if err != nil {
+		return err
+	}
+
+	for i := 0; i < len(eg.slaves); i++ {
+		err := eg.slaves[i].Close()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// Ping tests if database is alive
+func (eg *EngineGroup) Ping() error {
+	if err := eg.Engine.Ping(); err != nil {
+		return err
+	}
+
+	for _, slave := range eg.slaves {
+		if err := slave.Ping(); err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 148 - 0
engine_group_policy.go

@@ -0,0 +1,148 @@
+// Copyright 2017 The Xorm Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package xorm
+
+import (
+	"math/rand"
+	"sync"
+	"time"
+)
+
+type GroupPolicy interface {
+	Slave(*EngineGroup) *Engine
+}
+
+type RandomPolicy struct {
+	r *rand.Rand
+}
+
+func NewRandomPolicy() *RandomPolicy {
+	return &RandomPolicy{
+		r: rand.New(rand.NewSource(time.Now().UnixNano())),
+	}
+}
+
+func (policy *RandomPolicy) Slave(g *EngineGroup) *Engine {
+	return g.Slaves()[policy.r.Intn(len(g.Slaves()))]
+}
+
+type WeightRandomPolicy struct {
+	weights []int
+	rands   []int
+	r       *rand.Rand
+}
+
+func NewWeightRandomPolicy(weights []int) *WeightRandomPolicy {
+	var rands = make([]int, 0, len(weights))
+	for i := 0; i < len(weights); i++ {
+		for n := 0; n < weights[i]; n++ {
+			rands = append(rands, i)
+		}
+	}
+
+	return &WeightRandomPolicy{
+		weights: weights,
+		rands:   rands,
+		r:       rand.New(rand.NewSource(time.Now().UnixNano())),
+	}
+}
+
+func (policy *WeightRandomPolicy) Slave(g *EngineGroup) *Engine {
+	var slaves = g.Slaves()
+	idx := policy.rands[policy.r.Intn(len(policy.rands))]
+	if idx >= len(slaves) {
+		idx = len(slaves) - 1
+	}
+	return slaves[idx]
+}
+
+type RoundRobinPolicy struct {
+	pos  int
+	lock sync.Mutex
+}
+
+func NewRoundRobinPolicy() *RoundRobinPolicy {
+	return &RoundRobinPolicy{pos: -1}
+}
+
+func (policy *RoundRobinPolicy) Slave(g *EngineGroup) *Engine {
+	var slaves = g.Slaves()
+	var pos int
+	policy.lock.Lock()
+	policy.pos++
+	if policy.pos >= len(slaves) {
+		policy.pos = 0
+	}
+	pos = policy.pos
+	policy.lock.Unlock()
+
+	return slaves[pos]
+}
+
+type WeightRoundRobinPolicy struct {
+	weights []int
+	rands   []int
+	r       *rand.Rand
+	lock    sync.Mutex
+	pos     int
+}
+
+func NewWeightRoundRobinPolicy(weights []int) *WeightRoundRobinPolicy {
+	var rands = make([]int, 0, len(weights))
+	for i := 0; i < len(weights); i++ {
+		for n := 0; n < weights[i]; n++ {
+			rands = append(rands, i)
+		}
+	}
+
+	return &WeightRoundRobinPolicy{
+		weights: weights,
+		rands:   rands,
+		r:       rand.New(rand.NewSource(time.Now().UnixNano())),
+		pos:     -1,
+	}
+}
+
+func (policy *WeightRoundRobinPolicy) Slave(g *EngineGroup) *Engine {
+	var slaves = g.Slaves()
+	var pos int
+	policy.lock.Lock()
+	policy.pos++
+	if policy.pos >= len(policy.rands) {
+		policy.pos = 0
+	}
+	pos = policy.pos
+	policy.lock.Unlock()
+
+	idx := policy.rands[pos]
+	if idx >= len(slaves) {
+		idx = len(slaves) - 1
+	}
+	return slaves[idx]
+}
+
+type LeastConnPolicy struct {
+}
+
+func NewLeastConnPolicy() *LeastConnPolicy {
+	return &LeastConnPolicy{}
+}
+
+func (policy *LeastConnPolicy) Slave(g *EngineGroup) *Engine {
+	var slaves = g.Slaves()
+	connections := 0
+	idx := 0
+	for i, _ := range slaves {
+		open_connections := slaves[i].DB().Stats().OpenConnections
+		if i == 0 {
+			connections = open_connections
+			idx = i
+		} else if open_connections <= connections {
+			connections = open_connections
+			idx = i
+		}
+	}
+	return slaves[idx]
+}

+ 8 - 0
engine_maxlife.go

@@ -12,3 +12,11 @@ import "time"
 func (engine *Engine) SetConnMaxLifetime(d time.Duration) {
 	engine.db.SetConnMaxLifetime(d)
 }
+
+// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
+func (eg *EngineGroup) SetConnMaxLifetime(d time.Duration) {
+	eg.Engine.SetConnMaxLifetime(d)
+	for i := 0; i < len(eg.slaves); i++ {
+		eg.slaves[i].SetConnMaxLifetime(d)
+	}
+}

+ 2 - 2
session.go

@@ -264,13 +264,13 @@ func (session *Session) canCache() bool {
 	return true
 }
 
-func (session *Session) doPrepare(sqlStr string) (stmt *core.Stmt, err error) {
+func (session *Session) doPrepare(db *core.DB, sqlStr string) (stmt *core.Stmt, err error) {
 	crc := crc32.ChecksumIEEE([]byte(sqlStr))
 	// TODO try hash(sqlStr+len(sqlStr))
 	var has bool
 	stmt, has = session.stmtCache[crc]
 	if !has {
-		stmt, err = session.DB().Prepare(sqlStr)
+		stmt, err = db.Prepare(sqlStr)
 		if err != nil {
 			return nil, err
 		}

+ 10 - 3
session_raw.go

@@ -47,9 +47,16 @@ func (session *Session) queryRows(sqlStr string, args ...interface{}) (*core.Row
 	}
 
 	if session.isAutoCommit {
+		var db *core.DB
+		if session.engine.engineGroup != nil {
+			db = session.engine.engineGroup.Slave().DB()
+		} else {
+			db = session.DB()
+		}
+
 		if session.prepareStmt {
 			// don't clear stmt since session will cache them
-			stmt, err := session.doPrepare(sqlStr)
+			stmt, err := session.doPrepare(db, sqlStr)
 			if err != nil {
 				return nil, err
 			}
@@ -61,7 +68,7 @@ func (session *Session) queryRows(sqlStr string, args ...interface{}) (*core.Row
 			return rows, nil
 		}
 
-		rows, err := session.DB().Query(sqlStr, args...)
+		rows, err := db.Query(sqlStr, args...)
 		if err != nil {
 			return nil, err
 		}
@@ -171,7 +178,7 @@ func (session *Session) exec(sqlStr string, args ...interface{}) (sql.Result, er
 	}
 
 	if session.prepareStmt {
-		stmt, err := session.doPrepare(sqlStr)
+		stmt, err := session.doPrepare(session.DB(), sqlStr)
 		if err != nil {
 			return nil, err
 		}