Ver código fonte

feat: logs middles

2637309949@qq.com 5 anos atrás
pai
commit
3acb7ad314

+ 5 - 2
engine/apiengine.go

@@ -6,18 +6,20 @@ import (
 	//"github.com/gin-contrib/sessions/memstore"
 	//"github.com/gin-contrib/sessions/cookie"
 
+	"os"
+
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine-client/client"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/entitys"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/env"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/logs"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/middleware/logger"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/models"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/utils/auth"
 	"github.com/gin-contrib/cors"
 	"github.com/gin-contrib/gzip"
 	"github.com/gin-gonic/gin"
 	"github.com/xormplus/xorm"
-	"os"
-	"git.qianqiusoft.com/qianqiusoft/light-apiengine/utils/auth"
 )
 
 type ApiEngine struct {
@@ -57,6 +59,7 @@ func NewApiEngine(driverName, dataSourceName string, addr string) *ApiEngine {
 	engine.GinEngine = gin.Default()
 
 	engine.GinEngine.Use(gzip.Gzip(gzip.DefaultCompression))
+	engine.GinEngine.Use(logger.LoggerMiddleware())
 
 	//启动session
 	//store := sessions.NewCookieStore([]byte("secret"))

+ 5 - 1
godep.yaml

@@ -33,4 +33,8 @@ import:
   - package: github.com/xormplus/core
     repo: https://git.qianqiusoft.com/github.com/xormplus__core.git
   - package: github.com/dchest/captcha
-    repo: https://github.com/dchest/captcha.git
+    repo: https://github.com/dchest/captcha.git
+  - package: git.qianqiusoft.com/public/glog
+    repo: https://git.qianqiusoft.com/public/glog.git
+  - package: github.com/gocql/gocql
+    repo: https://git.qianqiusoft.com/github.com/gocql_gocql.git

+ 129 - 0
middleware/logger/cassandra.go

@@ -0,0 +1,129 @@
+package logger
+
+import (
+	"encoding/json"
+	"strconv"
+	"strings"
+	"time"
+
+	"git.qianqiusoft.com/public/glog"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
+	"github.com/gocql/gocql"
+)
+
+// cassandra 配置
+var _clusterCfg *gocql.ClusterConfig = nil
+
+func init() {
+	// 设置cassandar 配置
+	cassandra := config.AppConfig.GetKey("cassandra")
+	keyspace := config.AppConfig.GetKey("keyspace")
+	_clusterCfg = gocql.NewCluster(strings.Split(cassandra, ",")...)
+	_clusterCfg.Keyspace = keyspace
+	_clusterCfg.Consistency = gocql.Quorum
+	//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
+	_clusterCfg.NumConns = 3
+
+	session, err := _clusterCfg.CreateSession()
+	if err != nil {
+		glog.Errorln("create session err", err.Error())
+		return
+	}
+	time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
+	defer session.Close()
+}
+
+// cassandra 日志 处理接口
+func cassandraLogHandler(logInfo *LogInfo) {
+	session, err := _clusterCfg.CreateSession()
+	if err != nil {
+		glog.Errorln("创建cassandra session错误", err.Error())
+		return
+	}
+	defer session.Close()
+
+	var respBodyObj struct {
+		Code int
+		Msg  string
+		Data interface{}
+	}
+	if logInfo.RespBody != "" {
+		err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
+		if err != nil {
+			glog.Errorln("json.Unmarshal 错误", err.Error())
+		} else {
+			logInfo.RespCode = strconv.Itoa(respBodyObj.Code)
+			logInfo.RespMsg = respBodyObj.Msg
+			if respBodyObj.Data != nil {
+				str, err := json.Marshal(respBodyObj.Data)
+				if err == nil {
+					logInfo.RespData = string(str)
+				}
+			}
+		}
+	}
+	logInfo.CostTime = float64(logInfo.RespTime.Sub(logInfo.ReqTime).Seconds())
+
+	cqlformat := `insert into 
+		log_info(app_name, req_date, req_time, login_id, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time) 
+		values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
+	reqDate := logInfo.ReqTime.Format("20060102")
+	//q := session.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
+	batch := session.NewBatch(gocql.UnloggedBatch) // 应该批量插入多条,例如20条
+	batch.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.LoginId, logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, strings.TrimSpace(logInfo.ReqPostData), logInfo.RespTime.Format("2006-01-02 15:04:05"), strings.TrimSpace(logInfo.RespBody), logInfo.RespCode, strings.TrimSpace(logInfo.RespMsg), strings.TrimSpace(logInfo.RespData), logInfo.CostTime)
+	if err := session.ExecuteBatch(batch); err != nil {
+		glog.Errorln("批量插入日志错误:", err)
+		return
+	}
+}
+
+func GetLogByCassandra(appName, date, beginTime, endTime, loginId string) ([]map[string]interface{}, error) {
+	session, err := _clusterCfg.CreateSession()
+	if err != nil {
+		glog.Errorln("创建cassandra session错误", err.Error())
+		return nil, err
+	}
+	defer session.Close()
+
+	params := []interface{}{appName, date, beginTime, endTime}
+
+	cqlformat := `select app_name, req_date, req_time, login_id, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time
+			from log_info where app_name = ? and req_date = ? and req_time >= ? and req_time <= ?
+			`
+	if loginId != "" {
+		cqlformat += " and login_id = ?"
+		params = append(params, loginId)
+	}
+	var appName1 string
+	var reqDate string
+	var reqTime, loginId1, reqMethod, reqUrl, reqProto, reqUa, reqReferer, reqPostData, respTime, respBody, respCode, respMsg, respData string
+	var costTime float64
+
+	results := make([]map[string]interface{}, 0)
+	iter := session.Query(cqlformat, params...).Iter()
+	for iter.Scan(&appName1, &reqDate, &reqTime, &loginId1, &reqMethod, &reqUrl, &reqProto, &reqUa, &reqReferer, &reqPostData, &respTime, &respBody, &respCode, &respMsg, &respData, &costTime) {
+		result := make(map[string]interface{})
+		result["app_name"] = appName1
+		result["req_date"] = reqDate
+		result["req_time"] = reqTime
+		result["login_id"] = loginId1
+		result["req_method"] = reqMethod
+		result["req_url"] = reqUrl
+		result["req_proto"] = reqProto
+		result["req_ua"] = reqUa
+		result["req_referer"] = reqReferer
+		result["req_post_data"] = reqPostData
+		result["resp_time"] = respTime
+		result["resp_body"] = respBody
+		result["resp_code"] = respCode
+		result["resp_msg"] = respMsg
+		result["resp_data"] = respData
+		result["cost_time"] = costTime
+		results = append(results, result)
+	}
+	if err := iter.Close(); err != nil {
+		glog.Errorln("iter close err", err.Error())
+	}
+
+	return results, nil
+}

+ 141 - 0
middleware/logger/logger.go

@@ -0,0 +1,141 @@
+package logger
+
+import (
+	"bytes"
+	"time"
+
+	"git.qianqiusoft.com/public/glog"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/utils"
+	"github.com/gin-gonic/gin"
+)
+
+/**
+ * 自定义日志响应类,主要是把resp的响应内容写入到body中
+ */
+type loggerRespWriter struct {
+	gin.ResponseWriter
+	respBody *bytes.Buffer
+}
+
+// 写字节数组
+func (w loggerRespWriter) Write(b []byte) (int, error) {
+	w.respBody.Write(b) // 保存
+	return w.ResponseWriter.Write(b)
+}
+
+// 写字符串
+func (w loggerRespWriter) WriteString(s string) (int, error) {
+	w.respBody.WriteString(s) // 保存
+	return w.ResponseWriter.WriteString(s)
+}
+
+/**
+ * 日志信息
+ */
+type LogInfo struct {
+	ReqClientIp string
+	ReqTime     time.Time
+	ReqMethod   string
+	ReqUrl      string
+	ReqProto    string
+	ReqUa       string
+	ReqReferer  string
+	ReqPostData string
+	LoginId     string
+
+	RespTime time.Time
+	RespBody string
+	RespCode string // 由 resp body 解析
+	RespMsg  string // 由 resp body 解析
+	RespData string // 由 resp body 解析
+
+	CostTime float64
+}
+
+// 日志chan
+var __logInfoChan chan *LogInfo
+
+// app名称
+var __appName string = ""
+
+// 日志处理函数
+var __logHandler func(info *LogInfo) = nil
+
+func init() {
+	defer func() {
+		if p := recover(); p != nil {
+			glog.Errorln("ecover", p)
+		}
+	}()
+
+	// 设置app名称
+	__appName = config.AppConfig.GetKey("app_name")
+	__logInfoChan = make(chan *LogInfo, 2000)
+
+	// 日志处理
+	logProcess()
+}
+
+// 设置日志处理接口
+func SetLogHandler(handler func(info *LogInfo)) {
+	__logHandler = handler
+}
+
+// 获取日志中间件
+func LoggerMiddleware() gin.HandlerFunc {
+	return func(c *gin.Context) {
+		respWriter := &loggerRespWriter{
+			ResponseWriter: c.Writer,
+			respBody:       bytes.NewBuffer([]byte{}),
+		}
+		c.Writer = respWriter // 注入自定writer
+
+		logInfo := &LogInfo{}
+		logInfo.ReqTime = time.Now()
+
+		// 下一个请求
+		c.Next()
+
+		// 设置对象
+		logInfo.RespBody = respWriter.respBody.String()
+		logInfo.RespTime = time.Now()
+
+		logInfo.ReqMethod = c.Request.Method
+		logInfo.ReqUrl = c.Request.RequestURI
+		logInfo.ReqProto = c.Request.Proto
+		logInfo.ReqUa = c.Request.UserAgent()
+		logInfo.ReqReferer = c.Request.Referer()
+		logInfo.ReqPostData = c.Request.PostForm.Encode()
+		logInfo.ReqClientIp = c.ClientIP()
+
+		token := c.GetHeader("token")
+		if token != "" {
+			tk := utils.GetGlobalTokenStore().Get(token)
+			if tk != nil {
+				logInfo.LoginId = tk.LoginID
+			}
+		}
+		if logInfo.LoginId == "" {
+			logInfo.LoginId = "__no_login__"
+		}
+
+		__logInfoChan <- logInfo
+	}
+}
+
+// 日志处理进程
+func logProcess() {
+	go func() {
+		for {
+			select {
+			case logInfo := <-__logInfoChan:
+				if __logHandler == nil {
+					cassandraLogHandler(logInfo)
+				} else {
+					__logHandler(logInfo)
+				}
+			}
+		}
+	}()
+}

+ 25 - 0
middleware/logger/logger_cassandra.cql

@@ -0,0 +1,25 @@
+/**
+ * 创建轨迹表
+ */
+CREATE TABLE log_info (
+    app_name text,
+    req_date text,
+    req_time text,
+    login_id text,
+    req_method text,
+	req_url text,
+	req_proto text,
+	req_ua text,
+	req_referer text,
+	req_post_data text,
+	resp_time text,
+	resp_body text,
+	resp_code text,
+    resp_msg text,
+	resp_data text,
+	cost_time double,
+    primary key((app_name, req_date), req_time, login_id)
+) WITH CLUSTERING ORDER BY (req_time DESC)
+       AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
+                     'compaction_window_unit': 'DAYS',
+                     'compaction_window_size': 1};

+ 7 - 7
models/model.go

@@ -1,11 +1,11 @@
-
 package models
 
 import (
 	"fmt"
+	"sync"
+
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
 	"github.com/xormplus/xorm"
-	"sync"
 )
 
 var beans []interface{}
@@ -23,17 +23,17 @@ func RegisterModel(models ...interface{}) {
 
 func AddTableName(tableName string) {
 	beansLock.Lock()
-    defer beansLock.Unlock()
+	defer beansLock.Unlock()
 
-    ModelNameList = append(ModelNameList, tableName)
+	ModelNameList = append(ModelNameList, tableName)
 }
 
 func SyncDb(db *xorm.Engine) {
 	if !config.AppConfig.SyncDb {
 		return
 	}
-	err := db.Sync2(beans...);
-    if err != nil {
+	err := db.Sync2(beans...)
+	if err != nil {
 		fmt.Println(err)
-    }
+	}
 }

+ 1 - 0
utils/page_util.go

@@ -2,6 +2,7 @@ package utils
 
 import (
 	"fmt"
+
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/models"
 	"github.com/xormplus/xorm"
 )