logger_cassandra.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package middleware
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gocql/gocql"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. var __logInfoChan chan *logInfo
  14. // cassandra 配置
  15. var _clusterCfg *gocql.ClusterConfig = nil
  16. // app名称
  17. var __appName string = ""
  18. func init(){
  19. defer func() {
  20. if p := recover(); p != nil {
  21. fmt.Println("ecover", p)
  22. }
  23. }()
  24. // 设置app名称
  25. __appName = config.AppConfig.GetKey("app_name")
  26. __logInfoChan = make(chan *logInfo, 1000)
  27. // 设置cassandar 配置
  28. cassandra := config.AppConfig.GetKey("cassandra")
  29. keyspace := config.AppConfig.GetKey("keyspace")
  30. _clusterCfg = gocql.NewCluster(strings.Split(cassandra, ",")...)
  31. _clusterCfg.Keyspace = keyspace
  32. _clusterCfg.Consistency = gocql.Quorum
  33. //设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
  34. _clusterCfg.NumConns = 3
  35. session, err := _clusterCfg.CreateSession()
  36. if err != nil{
  37. fmt.Println("create session err", err.Error())
  38. return
  39. }
  40. time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
  41. defer session.Close()
  42. // 日志处理
  43. logProcess()
  44. }
  45. /**
  46. * 自定义日志响应类,主要是把resp的响应内容写入到body中
  47. */
  48. type loggerRespWriter struct{
  49. gin.ResponseWriter
  50. respBody *bytes.Buffer
  51. }
  52. // 写字节数组
  53. func (w loggerRespWriter) Write(b []byte) (int, error) {
  54. w.respBody.Write(b) // 保存
  55. return w.ResponseWriter.Write(b)
  56. }
  57. // 写字符串
  58. func (w loggerRespWriter) WriteString(s string) (int, error) {
  59. w.respBody.WriteString(s) // 保存
  60. return w.ResponseWriter.WriteString(s)
  61. }
  62. /**
  63. * 日志信息
  64. */
  65. type logInfo struct {
  66. ReqClientIp string
  67. ReqTime time.Time
  68. ReqMethod string
  69. ReqUrl string
  70. ReqProto string
  71. ReqUa string
  72. ReqReferer string
  73. ReqPostData string
  74. RespTime time.Time
  75. RespBody string
  76. RespCode string // 由 resp body 解析
  77. RespMsg string // 由 resp body 解析
  78. RespData string // 由 resp body 解析
  79. CostTime float64
  80. }
  81. func LoggerCassandra()gin.HandlerFunc{
  82. return func(c *gin.Context){
  83. respWriter := &loggerRespWriter{
  84. ResponseWriter: c.Writer,
  85. respBody: bytes.NewBuffer([]byte{}),
  86. }
  87. c.Writer = respWriter // 注入自定writer
  88. logInfo := &logInfo{}
  89. logInfo.ReqTime = time.Now()
  90. // 下一个请求
  91. c.Next()
  92. // 设置对象
  93. logInfo.RespBody = respWriter.respBody.String()
  94. logInfo.RespTime = time.Now()
  95. logInfo.ReqMethod = c.Request.Method
  96. logInfo.ReqUrl = c.Request.RequestURI
  97. logInfo.ReqProto = c.Request.Proto
  98. logInfo.ReqUa = c.Request.UserAgent()
  99. logInfo.ReqReferer = c.Request.Referer()
  100. logInfo.ReqPostData = c.Request.PostForm.Encode()
  101. logInfo.ReqClientIp = c.ClientIP()
  102. __logInfoChan <- logInfo
  103. }
  104. }
  105. func logProcess(){
  106. go func(){
  107. for{
  108. select {
  109. case logInfo := <- __logInfoChan:
  110. insertLogInfo(logInfo)
  111. }
  112. }
  113. }()
  114. }
  115. func insertLogInfo(logInfo *logInfo){
  116. session, err := _clusterCfg.CreateSession()
  117. if err != nil{
  118. fmt.Println("创建cassandra session错误", err.Error())
  119. return
  120. }
  121. defer session.Close()
  122. var respBodyObj struct {
  123. Code int
  124. Msg string
  125. Data interface{}
  126. }
  127. if logInfo.RespBody != ""{
  128. err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
  129. if err != nil{
  130. fmt.Println("json.Unmarshal 错误", err.Error())
  131. }else {
  132. logInfo.RespCode = strconv.Itoa(respBodyObj.Code)
  133. logInfo.RespMsg = respBodyObj.Msg
  134. if respBodyObj.Data != nil{
  135. str, err := json.Marshal(respBodyObj.Data)
  136. if err == nil{
  137. logInfo.RespData = string(str)
  138. }
  139. }
  140. }
  141. }
  142. logInfo.CostTime = float64(logInfo.RespTime.Sub(logInfo.ReqTime).Seconds())
  143. cqlformat := `insert into
  144. log_info(app_name, req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time)
  145. values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
  146. reqDate := logInfo.ReqTime.Format("20060102")
  147. //q := session.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
  148. batch := session.NewBatch(gocql.UnloggedBatch) // 应该批量插入多条,例如20条
  149. batch.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
  150. if err := session.ExecuteBatch(batch); err != nil {
  151. fmt.Println("批量插入日志错误:", err)
  152. return
  153. }
  154. }