Quellcode durchsuchen

添加cassandra日志收集

huangrf vor 5 Jahren
Ursprung
Commit
2224ec39f5
2 geänderte Dateien mit 176 neuen und 0 gelöschten Zeilen
  1. 23 0
      middleware/logger_cassandra.cql
  2. 153 0
      middleware/logger_cassandra.go

+ 23 - 0
middleware/logger_cassandra.cql

@@ -0,0 +1,23 @@
+/**
+ * 创建轨迹表
+ */
+CREATE TABLE log_info (
+    req_date text,
+    req_time text,
+    req_method text,
+	req_url text,
+	req_proto text,
+	req_ua text,
+	req_referer text,
+	req_post_data text,
+	resp_time text,
+	resp_body text,
+	resp_code text,
+    resp_msg text,
+	resp_data text,
+	cost_time double,
+    primary key(req_date, req_time)
+) WITH CLUSTERING ORDER BY (req_time DESC)
+       AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
+                     'compaction_window_unit': 'DAYS',
+                     'compaction_window_size': 1};

+ 153 - 0
middleware/logger_cassandra.go

@@ -0,0 +1,153 @@
+package middleware
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"github.com/gin-gonic/gin"
+	"github.com/gocql/gocql"
+	"time"
+)
+
+var __logInfoChan chan *logInfo
+// cassandra 配置
+var _clusterCfg *gocql.ClusterConfig = nil
+
+func init(){
+	__logInfoChan = make(chan *logInfo, 1000)
+
+	// 设置cassandar 配置
+	_clusterCfg = gocql.NewCluster("218.14.81.38")
+	_clusterCfg.Keyspace = "i2_log"
+	_clusterCfg.Consistency = gocql.Quorum
+	//设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
+	_clusterCfg.NumConns = 3
+
+	session, _ := _clusterCfg.CreateSession()
+	time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
+	defer session.Close()
+
+	// 日志处理
+	logProcess()
+}
+
+/**
+ * 自定义日志响应类,主要是把resp的响应内容写入到body中
+ */
+type loggerRespWriter struct{
+	gin.ResponseWriter
+	respBody *bytes.Buffer
+}
+
+// 写字节数组
+func (w loggerRespWriter) Write(b []byte) (int, error) {
+	w.respBody.Write(b)	// 保存
+	return w.ResponseWriter.Write(b)
+}
+
+// 写字符串
+func (w loggerRespWriter) WriteString(s string) (int, error) {
+	w.respBody.WriteString(s)	// 保存
+	return w.ResponseWriter.WriteString(s)
+}
+
+/**
+ * 日志信息
+ */
+type logInfo struct {
+	ReqClientIp string
+	ReqTime     time.Time
+	ReqMethod   string
+	ReqUrl      string
+	ReqProto    string
+	ReqUa       string
+	ReqReferer  string
+	ReqPostData string
+
+	RespTime time.Time
+	RespBody string
+	RespCode string // 由 resp body 解析
+	RespMsg  string // 由 resp body 解析
+	RespData string // 由 resp body 解析
+
+	CostTime int
+}
+
+func LoggerCassandra()gin.HandlerFunc{
+	return func(c *gin.Context){
+		respWriter := &loggerRespWriter{
+			ResponseWriter: c.Writer,
+			respBody: bytes.NewBuffer([]byte{}),
+		}
+		c.Writer = respWriter	// 注入自定writer
+
+		logInfo := &logInfo{}
+		logInfo.ReqTime = time.Now()
+
+		// 下一个请求
+		c.Next()
+
+		// 设置对象
+		logInfo.RespBody = respWriter.respBody.String()
+		logInfo.RespTime = time.Now()
+
+		logInfo.ReqMethod = c.Request.Method
+		logInfo.ReqUrl = c.Request.RequestURI
+		logInfo.ReqProto = c.Request.Proto
+		logInfo.ReqUa = c.Request.UserAgent()
+		logInfo.ReqReferer = c.Request.Referer()
+		logInfo.ReqPostData = c.Request.PostForm.Encode()
+		logInfo.ReqClientIp = c.ClientIP()
+
+		__logInfoChan <- logInfo
+	}
+}
+
+func logProcess(){
+	go func(){
+		for{
+			select {
+				case logInfo := <- __logInfoChan:
+					insertLogInfo(logInfo)
+			}
+		}
+	}()
+}
+
+func insertLogInfo(logInfo *logInfo){
+	session, err := _clusterCfg.CreateSession()
+	if err != nil{
+		fmt.Println("创建cassandra session错误", err.Error())
+		return
+	}
+	defer session.Close()
+
+	var respBodyObj struct {
+		Code string
+		Msg  string
+		Data string
+	}
+	if logInfo.RespBody != ""{
+		err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
+		if err != nil{
+			fmt.Println("json.Unmarshal 错误", err.Error())
+		}else {
+			logInfo.RespCode = respBodyObj.Code
+			logInfo.RespMsg = respBodyObj.Msg
+			logInfo.RespData = respBodyObj.Data
+		}
+	}
+
+	// 保存轨迹
+	fmt.Println("保存轨迹")
+	cqlformat := `insert into 
+		log_info(req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time) 
+		values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
+	reqDate := logInfo.ReqTime.Format("200601")
+	q := session.Query(cqlformat, reqDate, logInfo.ReqTime, logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime, logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
+	err = q.Exec()		// 应该可以使用批量插入
+	if err != nil{
+		fmt.Println("插入日志错误:", err)
+		return
+	}
+}