Browse Source

Merge branch 'v2' of https://git.qianqiusoft.com/qianqiusoft/light-apiengine into v2

huangyh 5 năm trước cách đây
mục cha
commit
590a7d245c
3 tập tin đã thay đổi với 134 bổ sung59 xóa
  1. 101 43
      engine/auth/auth_client.go
  2. 2 1
      middleware/logger_cassandra.cql
  3. 31 15
      middleware/logger_cassandra.go

+ 101 - 43
engine/auth/auth_client.go

@@ -7,6 +7,7 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"git.qianqiusoft.com/public/glog"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/entitys"
 	sysutils "git.qianqiusoft.com/qianqiusoft/light-apiengine/utils"
@@ -20,23 +21,26 @@ const (
 
 	CMD_NEW = "new"
 	CMD_REMOVE = "remove"
+	CMD_PINGPONG = "pg"
+	CMD_PINGPONG_RESP = "pg_resp"
 )
 
 var TCPClient *TcpClient
 
 type authPackage struct {
-	Cmd string
-	Content []byte
+	Cmd      string
+	TokenStr string
+	Content  []byte
 }
 
 func (ap *authPackage)toBytes()[]byte{
 	buf := bytes.NewBuffer([]byte{})
 
 	b := []byte(ap.Cmd)
-	buf.Write(uint64ToBytes(len(b)))
+	buf.Write(uint32ToBytes(len(b)))
 	buf.Write(b)
 
-	buf.Write(uint64ToBytes(len(ap.Content)))
+	buf.Write(uint32ToBytes(len(ap.Content)))
 	buf.Write(ap.Content)
 
 	return buf.Bytes()
@@ -68,15 +72,15 @@ func(c *TcpClient)Start() {
 			if p := recover(); p != nil {
 				fmt.Println("ecover", p)
 			}
+			c.restart()
 		}()
 
 		var err error = nil
 		address := config.AppConfig.GetKey("auth_server")
-		fmt.Println("auth client start, dial address is", address)
+		//fmt.Println("auth client start, dial address is", address)
 		c.conn, err = net.Dial("tcp", address)
 		if err != nil {
-			fmt.Println("Error dialing", err.Error())
-			c.restart()
+			//fmt.Println("Error dialing", err.Error())
 			return
 		}
 
@@ -87,7 +91,6 @@ func(c *TcpClient)Start() {
 		vresp, err := readString(c.conn)
 		if err != nil {
 			fmt.Println("Error dialing", err.Error())
-			c.restart()
 			return
 		}
 		if vresp != "ok"{
@@ -95,6 +98,7 @@ func(c *TcpClient)Start() {
 			fmt.Println("verify is not ok", vresp)
 			return
 		}
+		fmt.Println("验证成功")
 		c.verified = true
 
 		// send
@@ -102,6 +106,7 @@ func(c *TcpClient)Start() {
 			for {
 				select {
 				case data := <-c.pchan:
+					c.conn.SetWriteDeadline(time.Now().Add(time.Second * 2))
 					_, err := c.conn.Write(data.toBytes())
 					if err != nil {
 						fmt.Println("写入内容错误", err.Error())
@@ -121,23 +126,22 @@ func(c *TcpClient)Start() {
 				fmt.Println("读取命令错误", err.Error())
 				break
 			}
-			bytess, err := readBytes(c.conn)
-			if err != nil {
+			if cmd == CMD_NEW{
+				err = c.newHandler()
+			} else if cmd == CMD_REMOVE{
+				err = c.removeHandler()
+			} else if cmd == CMD_PINGPONG_RESP{
+
+			} else {
+				fmt.Println("未知cmd", cmd)
+				continue
+			}
+			if err != nil{
 				c.done <- true
-				fmt.Println("读取token内容错误", err.Error())
+				fmt.Println("处理错误", err.Error())
 				break
 			}
-			if cmd == "remove" {
-				// 移除,此时bytess为tokenstring
-				sysutils.GetGlobalTokenStore().Remove(string(bytess))
-			} else if cmd == "new" {
-				// 新建
-				token, _ := bytesToToken(bytess)
-				sysutils.GetGlobalTokenStore().Set(token.AccessToken, token)
-			}
 		}
-
-		c.restart()
 	}()
 }
 
@@ -176,7 +180,40 @@ func(c *TcpClient)Send(cmd string, bytess []byte){
 
 // 发送token
 func(c *TcpClient)SendToken(token *entitys.Token){
-	c.Send(CMD_NEW, tokenToBytes(token))
+	glog.Infoln("发送新建token")
+	bytess := tokenToBytes(token)
+	c.Send(CMD_NEW, bytess)
+}
+
+// 处理创建
+func(c *TcpClient)newHandler()error{
+	fmt.Println("处理新建")
+	bytess, err := readBytes(c.conn)
+	if err != nil {
+		fmt.Println("读取token内容错误", err.Error())
+		return err
+	}
+	// 新建
+	token, err := bytesToToken(bytess)
+	if err != nil{
+		glog.Infoln("bytesToToken 错误", err.Error())
+		return err
+	}
+	sysutils.GetGlobalTokenStore().Set(token.AccessToken, token)
+	return nil
+}
+
+// 处理删除
+func(c *TcpClient)removeHandler()error{
+	fmt.Println("处理删除")
+	bytess, err := readBytes(c.conn)
+	if err != nil {
+		fmt.Println("读取token内容错误", err.Error())
+		return err
+	}
+	// 移除,此时bytess为tokenstring
+	sysutils.GetGlobalTokenStore().Remove(string(bytess))
+	return nil
 }
 
 // 读取字符串
@@ -201,14 +238,11 @@ func readString(conn net.Conn)(string, error){
 func writeString(conn net.Conn, str string)error{
 	if str == ""{
 		return errors.New("字符串为空")
-	}else{
-		fmt.Println("---->1", str)
 	}
 
 	bytess := []byte(str)
 	size := len(bytess)
 
-	fmt.Println("---->2", str)
 	// 发送长度
 	err := writeUInt32(conn, uint32(size))
 	if err != nil{
@@ -216,7 +250,6 @@ func writeString(conn net.Conn, str string)error{
 		return err
 	}
 
-	fmt.Println("---->3", str)
 	// 发送内容
 	n, err := conn.Write(bytess)
 	if err != nil{
@@ -226,7 +259,6 @@ func writeString(conn net.Conn, str string)error{
 	if n != size{
 		return errors.New("发送长度不是" + strconv.Itoa(int(size)))
 	}
-	fmt.Println("---->4", str)
 
 	return nil
 }
@@ -307,12 +339,29 @@ func readStringByBytes(bytess []byte)(string, int, error) {
 	return string(bytess[4 : 4+size]), int(size), nil
 }
 
+// int转bytes
+func uint32ToBytes(v int)[]byte{
+	b := make([]byte, 4)
+	binary.BigEndian.PutUint32(b, uint32(v))
+
+	return b
+}
+
+// int转bytes
+func uint64ToBytes(v int)[]byte{
+	b := make([]byte, 8)
+	binary.BigEndian.PutUint32(b, uint32(v))
+
+	return b
+}
+
 // 转token
 func bytesToToken(content []byte)(*entitys.Token, error){
 	token := &entitys.Token{}
 	var index int = 0
 	var size int
 	var err error = nil
+	fmt.Println("读取userid")
 	token.UserId, size, err = readStringByBytes(content)
 	if err != nil{
 		fmt.Println("读取userid错误")
@@ -320,6 +369,7 @@ func bytesToToken(content []byte)(*entitys.Token, error){
 	}
 	index += 4 + size
 
+	fmt.Println("读取AccessToken")
 	token.AccessToken, size, err = readStringByBytes(content[index:])
 	if err != nil{
 		fmt.Println("读取AccessToken错误")
@@ -327,6 +377,7 @@ func bytesToToken(content []byte)(*entitys.Token, error){
 	}
 	index += 4 + size
 
+	fmt.Println("读取RefreshToken")
 	token.RefreshToken, size, err = readStringByBytes(content[index:])
 	if err != nil{
 		fmt.Println("读取RefreshToken错误")
@@ -334,6 +385,7 @@ func bytesToToken(content []byte)(*entitys.Token, error){
 	}
 	index += 4 + size
 
+	fmt.Println("读取LoginID")
 	token.LoginID, size, err = readStringByBytes(content[index:])
 	if err != nil{
 		fmt.Println("读取LoginID错误")
@@ -341,9 +393,11 @@ func bytesToToken(content []byte)(*entitys.Token, error){
 	}
 	index += 4 + size
 
+	fmt.Println("读取timestamp")
 	token.TimeStamp = binary.BigEndian.Uint64(content[index:])
-	index += 4
+	index += 8
 
+	fmt.Println("读取ServerIp")
 	token.ServerIp, size, err = readStringByBytes(content[index:])
 	if err != nil{
 		fmt.Println("读取ServerIp错误")
@@ -351,6 +405,7 @@ func bytesToToken(content []byte)(*entitys.Token, error){
 	}
 	index += 4 + size
 
+	fmt.Println("读取Domain")
 	token.Domain, size, err = readStringByBytes(content[index:])
 	if err != nil{
 		fmt.Println("读取Domain错误")
@@ -361,44 +416,47 @@ func bytesToToken(content []byte)(*entitys.Token, error){
 	return  token, nil
 }
 
-// int转bytes
-func uint64ToBytes(v int)[]byte{
-	b := make([]byte, 4)
-	binary.BigEndian.PutUint32(b, uint32(v))
-
-	return b
-}
-
-// 转bytes
+// 转bytes,包括token开头
 func tokenToBytes(token *entitys.Token)[]byte{
 	buf := bytes.NewBuffer([]byte{})
 
 	t := []byte(token.UserId)
-	buf.Write(uint64ToBytes(len(t)))
+	buf.Write(uint32ToBytes(len(t)))
 	buf.Write(t)
 
 	t = []byte(token.AccessToken)
-	buf.Write(uint64ToBytes(len(t)))
+	buf.Write(uint32ToBytes(len(t)))
 	buf.Write(t)
 
 	t = []byte(token.RefreshToken)
-	buf.Write(uint64ToBytes(len(t)))
+	buf.Write(uint32ToBytes(len(t)))
 	buf.Write(t)
 
 	t = []byte(token.LoginID)
-	buf.Write(uint64ToBytes(len(t)))
+	buf.Write(uint32ToBytes(len(t)))
 	buf.Write(t)
 
 	buf.Write(uint64ToBytes(int(token.TimeStamp)))
 
+	fmt.Println(token.ServerIp)
 	t = []byte(token.ServerIp)
-	buf.Write(uint64ToBytes(len(t)))
+	buf.Write(uint32ToBytes(len(t)))
 	buf.Write(t)
 
+	fmt.Println(token.Domain)
 	t = []byte(token.Domain)
-	buf.Write(uint64ToBytes(len(t)))
+	buf.Write(uint32ToBytes(len(t)))
 	buf.Write(t)
 
+	bytess := buf.Bytes()
+	buf = bytes.NewBuffer([]byte{})		// 这里用reset是错误的
+
+	tokenstrbytes := []byte(token.AccessToken)
+	buf.Write(uint32ToBytes(len(tokenstrbytes)))
+	buf.Write(tokenstrbytes)
+	buf.Write(uint32ToBytes(len(bytess)))
+	buf.Write(bytess)
+
 	return buf.Bytes()
 }
 

+ 2 - 1
middleware/logger_cassandra.cql

@@ -2,6 +2,7 @@
  * 创建轨迹表
  */
 CREATE TABLE log_info (
+    app_name text,
     req_date text,
     req_time text,
     req_method text,
@@ -16,7 +17,7 @@ CREATE TABLE log_info (
     resp_msg text,
 	resp_data text,
 	cost_time double,
-    primary key(req_date, req_time)
+    primary key((app_name, req_date), req_time)
 ) WITH CLUSTERING ORDER BY (req_time DESC)
        AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
                      'compaction_window_unit': 'DAYS',

+ 31 - 15
middleware/logger_cassandra.go

@@ -4,14 +4,19 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
 	"github.com/gin-gonic/gin"
 	"github.com/gocql/gocql"
+	"strconv"
+	"strings"
 	"time"
 )
 
 var __logInfoChan chan *logInfo
 // cassandra 配置
 var _clusterCfg *gocql.ClusterConfig = nil
+// app名称
+var __appName string = ""
 
 func init(){
 	defer func() {
@@ -20,11 +25,16 @@ func init(){
 		}
 	}()
 
+	// 设置app名称
+	__appName = config.AppConfig.GetKey("app_name")
+
 	__logInfoChan = make(chan *logInfo, 1000)
 
 	// 设置cassandar 配置
-	_clusterCfg = gocql.NewCluster("218.14.81.38")
-	_clusterCfg.Keyspace = "i2_log"
+	cassandra := config.AppConfig.GetKey("cassandra")
+	keyspace := config.AppConfig.GetKey("keyspace")
+	_clusterCfg = gocql.NewCluster(strings.Split(cassandra, ",")...)
+	_clusterCfg.Keyspace = keyspace
 	_clusterCfg.Consistency = gocql.Quorum
 	//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
 	_clusterCfg.NumConns = 3
@@ -80,7 +90,7 @@ type logInfo struct {
 	RespMsg  string // 由 resp body 解析
 	RespData string // 由 resp body 解析
 
-	CostTime int
+	CostTime float64
 }
 
 func LoggerCassandra()gin.HandlerFunc{
@@ -133,32 +143,38 @@ func insertLogInfo(logInfo *logInfo){
 	defer session.Close()
 
 	var respBodyObj struct {
-		Code string
+		Code int
 		Msg  string
-		Data string
+		Data interface{}
 	}
 	if logInfo.RespBody != ""{
+		fmt.Println(logInfo.RespBody)
 		err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
 		if err != nil{
 			fmt.Println("json.Unmarshal 错误", err.Error())
 		}else {
-			logInfo.RespCode = respBodyObj.Code
+			logInfo.RespCode = strconv.Itoa(respBodyObj.Code)
 			logInfo.RespMsg = respBodyObj.Msg
-			logInfo.RespData = respBodyObj.Data
+			if respBodyObj.Data != nil{
+				str, err := json.Marshal(respBodyObj.Data)
+				if err == nil{
+					logInfo.RespData = string(str)
+				}
+			}
 		}
 	}
-	logInfo.CostTime = int(logInfo.RespTime.Sub(logInfo.ReqTime))
+	logInfo.CostTime = float64(logInfo.RespTime.Sub(logInfo.ReqTime).Seconds())
 
 	// 保存轨迹
-	fmt.Println("保存轨迹")
 	cqlformat := `insert into 
-		log_info(req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time) 
-		values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
+		log_info(app_name, req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time) 
+		values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
 	reqDate := logInfo.ReqTime.Format("20060102")
-	q := session.Query(cqlformat, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
-	err = q.Exec()		// 应该可以使用批量插入
-	if err != nil{
-		fmt.Println("插入日志错误:", err)
+	//q := session.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
+	batch := session.NewBatch(gocql.UnloggedBatch)		// 应该批量插入多条,例如20条
+	batch.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
+	if err := session.ExecuteBatch(batch); err != nil {
+		fmt.Println("批量插入轨迹错误:", err)
 		return
 	}
 }