| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package middleware
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "github.com/gin-gonic/gin"
- "github.com/gocql/gocql"
- "time"
- )
- var __logInfoChan chan *logInfo
- // cassandra 配置
- var _clusterCfg *gocql.ClusterConfig = nil
- func init(){
- defer func() {
- if p := recover(); p != nil {
- fmt.Println("ecover", p)
- }
- }()
- __logInfoChan = make(chan *logInfo, 1000)
- // 设置cassandar 配置
- _clusterCfg = gocql.NewCluster("218.14.81.38")
- _clusterCfg.Keyspace = "i2_log"
- _clusterCfg.Consistency = gocql.Quorum
- //设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
- _clusterCfg.NumConns = 3
- session, err := _clusterCfg.CreateSession()
- if err != nil{
- fmt.Println("create session err", err.Error())
- return
- }
- time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
- defer session.Close()
- // 日志处理
- logProcess()
- }
- /**
- * 自定义日志响应类,主要是把resp的响应内容写入到body中
- */
- type loggerRespWriter struct{
- gin.ResponseWriter
- respBody *bytes.Buffer
- }
- // 写字节数组
- func (w loggerRespWriter) Write(b []byte) (int, error) {
- w.respBody.Write(b) // 保存
- return w.ResponseWriter.Write(b)
- }
- // 写字符串
- func (w loggerRespWriter) WriteString(s string) (int, error) {
- w.respBody.WriteString(s) // 保存
- return w.ResponseWriter.WriteString(s)
- }
- /**
- * 日志信息
- */
- type logInfo struct {
- ReqClientIp string
- ReqTime time.Time
- ReqMethod string
- ReqUrl string
- ReqProto string
- ReqUa string
- ReqReferer string
- ReqPostData string
- RespTime time.Time
- RespBody string
- RespCode string // 由 resp body 解析
- RespMsg string // 由 resp body 解析
- RespData string // 由 resp body 解析
- CostTime int
- }
- func LoggerCassandra()gin.HandlerFunc{
- return func(c *gin.Context){
- respWriter := &loggerRespWriter{
- ResponseWriter: c.Writer,
- respBody: bytes.NewBuffer([]byte{}),
- }
- c.Writer = respWriter // 注入自定writer
- logInfo := &logInfo{}
- logInfo.ReqTime = time.Now()
- // 下一个请求
- c.Next()
- // 设置对象
- logInfo.RespBody = respWriter.respBody.String()
- logInfo.RespTime = time.Now()
- logInfo.ReqMethod = c.Request.Method
- logInfo.ReqUrl = c.Request.RequestURI
- logInfo.ReqProto = c.Request.Proto
- logInfo.ReqUa = c.Request.UserAgent()
- logInfo.ReqReferer = c.Request.Referer()
- logInfo.ReqPostData = c.Request.PostForm.Encode()
- logInfo.ReqClientIp = c.ClientIP()
- __logInfoChan <- logInfo
- }
- }
- func logProcess(){
- go func(){
- for{
- select {
- case logInfo := <- __logInfoChan:
- insertLogInfo(logInfo)
- }
- }
- }()
- }
- func insertLogInfo(logInfo *logInfo){
- session, err := _clusterCfg.CreateSession()
- if err != nil{
- fmt.Println("创建cassandra session错误", err.Error())
- return
- }
- defer session.Close()
- var respBodyObj struct {
- Code string
- Msg string
- Data string
- }
- if logInfo.RespBody != ""{
- err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
- if err != nil{
- fmt.Println("json.Unmarshal 错误", err.Error())
- }else {
- logInfo.RespCode = respBodyObj.Code
- logInfo.RespMsg = respBodyObj.Msg
- logInfo.RespData = respBodyObj.Data
- }
- }
- logInfo.CostTime = int(logInfo.RespTime.Sub(logInfo.ReqTime))
- // 保存轨迹
- fmt.Println("保存轨迹")
- cqlformat := `insert into
- log_info(req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time)
- values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
- reqDate := logInfo.ReqTime.Format("20060102")
- q := session.Query(cqlformat, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
- err = q.Exec() // 应该可以使用批量插入
- if err != nil{
- fmt.Println("插入日志错误:", err)
- return
- }
- }
|