Selaa lähdekoodia

feat: logs middles

2637309949@qq.com 5 vuotta sitten
vanhempi
commit
015228cfd3

+ 2 - 2
engine/apiengine.go

@@ -13,7 +13,7 @@ import (
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/entitys"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/env"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/logs"
-	"git.qianqiusoft.com/qianqiusoft/light-apiengine/middleware/logger"
+	"git.qianqiusoft.com/qianqiusoft/light-apiengine/middleware"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/models"
 	"git.qianqiusoft.com/qianqiusoft/light-apiengine/utils/auth"
 	"github.com/gin-contrib/cors"
@@ -59,7 +59,7 @@ func NewApiEngine(driverName, dataSourceName string, addr string) *ApiEngine {
 	engine.GinEngine = gin.Default()
 
 	engine.GinEngine.Use(gzip.Gzip(gzip.DefaultCompression))
-	engine.GinEngine.Use(logger.LoggerMiddleware())
+	engine.GinEngine.Use(middleware.LoggerMiddleware())
 
 	//启动session
 	//store := sessions.NewCookieStore([]byte("secret"))

+ 36 - 17
middleware/logger/logger.go → middleware/logger.go

@@ -1,7 +1,8 @@
-package logger
+package middleware
 
 import (
 	"bytes"
+	"fmt"
 	"time"
 
 	"git.qianqiusoft.com/public/glog"
@@ -41,18 +42,18 @@ type LogInfo struct {
 	ReqProto    string
 	ReqUa       string
 	ReqReferer  string
-	ReqPostData string
+	Header      []byte
 	LoginId     string
-
-	RespTime time.Time
-	RespBody string
-	RespCode string // 由 resp body 解析
-	RespMsg  string // 由 resp body 解析
-	RespData string // 由 resp body 解析
+	ReqHeader   []byte
+	ReqBody     []byte
+	RespTime    time.Time
+	RespBody    []byte
 
 	CostTime float64
 }
 
+var bucket utils.SyncSlice
+
 // 日志chan
 var __logInfoChan chan *LogInfo
 
@@ -60,7 +61,7 @@ var __logInfoChan chan *LogInfo
 var __appName string = ""
 
 // 日志处理函数
-var __logHandler func(info *LogInfo) = nil
+var __logHandler func(info []LogInfo) = nil
 
 func init() {
 	defer func() {
@@ -78,7 +79,7 @@ func init() {
 }
 
 // 设置日志处理接口
-func SetLogHandler(handler func(info *LogInfo)) {
+func SetLogHandler(handler func(info []LogInfo)) {
 	__logHandler = handler
 }
 
@@ -96,18 +97,25 @@ func LoggerMiddleware() gin.HandlerFunc {
 
 		// 下一个请求
 		c.Next()
+		hByte := bytes.Buffer{}
+		bByte := make([]byte, 1024)
+
+		cReq := c.Request.Clone(c)
+		cReq.Header.Write(&hByte)
+		rbn, _ := cReq.Body.Read(bByte)
 
 		// 设置对象
-		logInfo.RespBody = respWriter.respBody.String()
+		logInfo.RespBody = respWriter.respBody.Bytes()
 		logInfo.RespTime = time.Now()
-
 		logInfo.ReqMethod = c.Request.Method
 		logInfo.ReqUrl = c.Request.RequestURI
 		logInfo.ReqProto = c.Request.Proto
 		logInfo.ReqUa = c.Request.UserAgent()
 		logInfo.ReqReferer = c.Request.Referer()
-		logInfo.ReqPostData = c.Request.PostForm.Encode()
+		logInfo.ReqBody = bByte[0:rbn]
+		logInfo.ReqHeader = hByte.Bytes()
 		logInfo.ReqClientIp = c.ClientIP()
+		logInfo.CostTime = float64(logInfo.RespTime.Sub(logInfo.ReqTime).Seconds())
 
 		token := c.GetHeader("token")
 		if token != "" {
@@ -127,13 +135,24 @@ func LoggerMiddleware() gin.HandlerFunc {
 // 日志处理进程
 func logProcess() {
 	go func() {
+		defer func() {
+			if rec := recover(); rec != nil {
+				fmt.Println(rec)
+			}
+		}()
 		for {
 			select {
+			case <-time.Tick(6 * time.Second):
+				if __logHandler != nil {
+					logs := []LogInfo{}
+					for _, v := range bucket.Reset() {
+						logs = append(logs, *v.(*LogInfo))
+					}
+					__logHandler(logs)
+				}
 			case logInfo := <-__logInfoChan:
-				if __logHandler == nil {
-					cassandraLogHandler(logInfo)
-				} else {
-					__logHandler(logInfo)
+				if __logHandler != nil {
+					bucket.Append(logInfo)
 				}
 			}
 		}

+ 0 - 129
middleware/logger/cassandra.go

@@ -1,129 +0,0 @@
-package logger
-
-import (
-	"encoding/json"
-	"strconv"
-	"strings"
-	"time"
-
-	"git.qianqiusoft.com/public/glog"
-	"git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
-	"github.com/gocql/gocql"
-)
-
-// cassandra 配置
-var _clusterCfg *gocql.ClusterConfig = nil
-
-func init() {
-	// 设置cassandar 配置
-	cassandra := config.AppConfig.GetKey("cassandra")
-	keyspace := config.AppConfig.GetKey("keyspace")
-	_clusterCfg = gocql.NewCluster(strings.Split(cassandra, ",")...)
-	_clusterCfg.Keyspace = keyspace
-	_clusterCfg.Consistency = gocql.Quorum
-	//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
-	_clusterCfg.NumConns = 3
-
-	session, err := _clusterCfg.CreateSession()
-	if err != nil {
-		glog.Errorln("create session err", err.Error())
-		return
-	}
-	time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
-	defer session.Close()
-}
-
-// cassandra 日志 处理接口
-func cassandraLogHandler(logInfo *LogInfo) {
-	session, err := _clusterCfg.CreateSession()
-	if err != nil {
-		glog.Errorln("创建cassandra session错误", err.Error())
-		return
-	}
-	defer session.Close()
-
-	var respBodyObj struct {
-		Code int
-		Msg  string
-		Data interface{}
-	}
-	if logInfo.RespBody != "" {
-		err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
-		if err != nil {
-			glog.Errorln("json.Unmarshal 错误", err.Error())
-		} else {
-			logInfo.RespCode = strconv.Itoa(respBodyObj.Code)
-			logInfo.RespMsg = respBodyObj.Msg
-			if respBodyObj.Data != nil {
-				str, err := json.Marshal(respBodyObj.Data)
-				if err == nil {
-					logInfo.RespData = string(str)
-				}
-			}
-		}
-	}
-	logInfo.CostTime = float64(logInfo.RespTime.Sub(logInfo.ReqTime).Seconds())
-
-	cqlformat := `insert into 
-		log_info(app_name, req_date, req_time, login_id, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time) 
-		values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
-	reqDate := logInfo.ReqTime.Format("20060102")
-	//q := session.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
-	batch := session.NewBatch(gocql.UnloggedBatch) // 应该批量插入多条,例如20条
-	batch.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.LoginId, logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, strings.TrimSpace(logInfo.ReqPostData), logInfo.RespTime.Format("2006-01-02 15:04:05"), strings.TrimSpace(logInfo.RespBody), logInfo.RespCode, strings.TrimSpace(logInfo.RespMsg), strings.TrimSpace(logInfo.RespData), logInfo.CostTime)
-	if err := session.ExecuteBatch(batch); err != nil {
-		glog.Errorln("批量插入日志错误:", err)
-		return
-	}
-}
-
-func GetLogByCassandra(appName, date, beginTime, endTime, loginId string) ([]map[string]interface{}, error) {
-	session, err := _clusterCfg.CreateSession()
-	if err != nil {
-		glog.Errorln("创建cassandra session错误", err.Error())
-		return nil, err
-	}
-	defer session.Close()
-
-	params := []interface{}{appName, date, beginTime, endTime}
-
-	cqlformat := `select app_name, req_date, req_time, login_id, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time
-			from log_info where app_name = ? and req_date = ? and req_time >= ? and req_time <= ?
-			`
-	if loginId != "" {
-		cqlformat += " and login_id = ?"
-		params = append(params, loginId)
-	}
-	var appName1 string
-	var reqDate string
-	var reqTime, loginId1, reqMethod, reqUrl, reqProto, reqUa, reqReferer, reqPostData, respTime, respBody, respCode, respMsg, respData string
-	var costTime float64
-
-	results := make([]map[string]interface{}, 0)
-	iter := session.Query(cqlformat, params...).Iter()
-	for iter.Scan(&appName1, &reqDate, &reqTime, &loginId1, &reqMethod, &reqUrl, &reqProto, &reqUa, &reqReferer, &reqPostData, &respTime, &respBody, &respCode, &respMsg, &respData, &costTime) {
-		result := make(map[string]interface{})
-		result["app_name"] = appName1
-		result["req_date"] = reqDate
-		result["req_time"] = reqTime
-		result["login_id"] = loginId1
-		result["req_method"] = reqMethod
-		result["req_url"] = reqUrl
-		result["req_proto"] = reqProto
-		result["req_ua"] = reqUa
-		result["req_referer"] = reqReferer
-		result["req_post_data"] = reqPostData
-		result["resp_time"] = respTime
-		result["resp_body"] = respBody
-		result["resp_code"] = respCode
-		result["resp_msg"] = respMsg
-		result["resp_data"] = respData
-		result["cost_time"] = costTime
-		results = append(results, result)
-	}
-	if err := iter.Close(); err != nil {
-		glog.Errorln("iter close err", err.Error())
-	}
-
-	return results, nil
-}

+ 0 - 25
middleware/logger/logger_cassandra.cql

@@ -1,25 +0,0 @@
-/**
- * 创建轨迹表
- */
-CREATE TABLE log_info (
-    app_name text,
-    req_date text,
-    req_time text,
-    login_id text,
-    req_method text,
-	req_url text,
-	req_proto text,
-	req_ua text,
-	req_referer text,
-	req_post_data text,
-	resp_time text,
-	resp_body text,
-	resp_code text,
-    resp_msg text,
-	resp_data text,
-	cost_time double,
-    primary key((app_name, req_date), req_time, login_id)
-) WITH CLUSTERING ORDER BY (req_time DESC)
-       AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
-                     'compaction_window_unit': 'DAYS',
-                     'compaction_window_size': 1};

+ 58 - 0
utils/safeslice.go

@@ -0,0 +1,58 @@
+package utils
+
+import "sync"
+
+// SyncSlice type that can be safely shared between goroutines
+type SyncSlice struct {
+	sync.RWMutex
+	items []interface{}
+}
+
+// SyncSliceItem slice item
+type SyncSliceItem struct {
+	Index int
+	Value interface{}
+}
+
+// Reset slice item to the concurrent slice
+func (cs *SyncSlice) Reset() []interface{} {
+	slice := cs.items
+	cs.Clear()
+	return slice
+}
+
+// Values an item to the concurrent slice
+func (cs *SyncSlice) Values() []interface{} {
+	return cs.items
+}
+
+// Clear an item to the concurrent slice
+func (cs *SyncSlice) Clear() {
+	cs.Lock()
+	defer cs.Unlock()
+	cs.items = []interface{}{}
+}
+
+// Append pends an item to the concurrent slice
+func (cs *SyncSlice) Append(item interface{}) {
+	cs.Lock()
+	defer cs.Unlock()
+	cs.items = append(cs.items, item)
+}
+
+// Iter over the items in the concurrent slice
+// Each item is sent over a channel, so that
+// we can iterate over the slice using the builin range keyword
+func (cs *SyncSlice) Iter() <-chan SyncSliceItem {
+	c := make(chan SyncSliceItem)
+	f := func() {
+		cs.Lock()
+		defer cs.Unlock()
+		for index, value := range cs.items {
+			c <- SyncSliceItem{index, value}
+		}
+		close(c)
+	}
+	go f()
+	return c
+}