logger_cassandra.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package middleware
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "git.qianqiusoft.com/qianqiusoft/light-apiengine/config"
  7. "github.com/gin-gonic/gin"
  8. "github.com/gocql/gocql"
  9. "strings"
  10. "time"
  11. )
  12. var __logInfoChan chan *logInfo
  13. // cassandra 配置
  14. var _clusterCfg *gocql.ClusterConfig = nil
  15. // app名称
  16. var __appName string = ""
  17. func init(){
  18. defer func() {
  19. if p := recover(); p != nil {
  20. fmt.Println("ecover", p)
  21. }
  22. }()
  23. // 设置app名称
  24. __appName = config.AppConfig.GetKey("app_name")
  25. __logInfoChan = make(chan *logInfo, 1000)
  26. // 设置cassandar 配置
  27. cassandra := config.AppConfig.GetKey("cassandra")
  28. keyspace := config.AppConfig.GetKey("keyspace")
  29. _clusterCfg = gocql.NewCluster(strings.Split(cassandra, ",")...)
  30. _clusterCfg.Keyspace = keyspace
  31. _clusterCfg.Consistency = gocql.Quorum
  32. //设置连接池的数量,默认是2个(针对每一个host,都建立起NumConns个连接)
  33. _clusterCfg.NumConns = 3
  34. session, err := _clusterCfg.CreateSession()
  35. if err != nil{
  36. fmt.Println("create session err", err.Error())
  37. return
  38. }
  39. time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
  40. defer session.Close()
  41. // 日志处理
  42. logProcess()
  43. }
  44. /**
  45. * 自定义日志响应类,主要是把resp的响应内容写入到body中
  46. */
  47. type loggerRespWriter struct{
  48. gin.ResponseWriter
  49. respBody *bytes.Buffer
  50. }
  51. // 写字节数组
  52. func (w loggerRespWriter) Write(b []byte) (int, error) {
  53. w.respBody.Write(b) // 保存
  54. return w.ResponseWriter.Write(b)
  55. }
  56. // 写字符串
  57. func (w loggerRespWriter) WriteString(s string) (int, error) {
  58. w.respBody.WriteString(s) // 保存
  59. return w.ResponseWriter.WriteString(s)
  60. }
  61. /**
  62. * 日志信息
  63. */
  64. type logInfo struct {
  65. ReqClientIp string
  66. ReqTime time.Time
  67. ReqMethod string
  68. ReqUrl string
  69. ReqProto string
  70. ReqUa string
  71. ReqReferer string
  72. ReqPostData string
  73. RespTime time.Time
  74. RespBody string
  75. RespCode string // 由 resp body 解析
  76. RespMsg string // 由 resp body 解析
  77. RespData string // 由 resp body 解析
  78. CostTime int
  79. }
  80. func LoggerCassandra()gin.HandlerFunc{
  81. return func(c *gin.Context){
  82. respWriter := &loggerRespWriter{
  83. ResponseWriter: c.Writer,
  84. respBody: bytes.NewBuffer([]byte{}),
  85. }
  86. c.Writer = respWriter // 注入自定writer
  87. logInfo := &logInfo{}
  88. logInfo.ReqTime = time.Now()
  89. // 下一个请求
  90. c.Next()
  91. // 设置对象
  92. logInfo.RespBody = respWriter.respBody.String()
  93. logInfo.RespTime = time.Now()
  94. logInfo.ReqMethod = c.Request.Method
  95. logInfo.ReqUrl = c.Request.RequestURI
  96. logInfo.ReqProto = c.Request.Proto
  97. logInfo.ReqUa = c.Request.UserAgent()
  98. logInfo.ReqReferer = c.Request.Referer()
  99. logInfo.ReqPostData = c.Request.PostForm.Encode()
  100. logInfo.ReqClientIp = c.ClientIP()
  101. __logInfoChan <- logInfo
  102. }
  103. }
  104. func logProcess(){
  105. go func(){
  106. for{
  107. select {
  108. case logInfo := <- __logInfoChan:
  109. insertLogInfo(logInfo)
  110. }
  111. }
  112. }()
  113. }
  114. func insertLogInfo(logInfo *logInfo){
  115. session, err := _clusterCfg.CreateSession()
  116. if err != nil{
  117. fmt.Println("创建cassandra session错误", err.Error())
  118. return
  119. }
  120. defer session.Close()
  121. var respBodyObj struct {
  122. Code string
  123. Msg string
  124. Data string
  125. }
  126. if logInfo.RespBody != ""{
  127. err = json.Unmarshal([]byte(logInfo.RespBody), &respBodyObj)
  128. if err != nil{
  129. fmt.Println("json.Unmarshal 错误", err.Error())
  130. }else {
  131. logInfo.RespCode = respBodyObj.Code
  132. logInfo.RespMsg = respBodyObj.Msg
  133. logInfo.RespData = respBodyObj.Data
  134. }
  135. }
  136. logInfo.CostTime = int(logInfo.RespTime.Sub(logInfo.ReqTime))
  137. // 保存轨迹
  138. fmt.Println("保存轨迹")
  139. cqlformat := `insert into
  140. log_info(app_name, req_date, req_time, req_method, req_url, req_proto, req_ua, req_referer, req_post_data, resp_time, resp_body, resp_code, resp_msg, resp_data, cost_time)
  141. values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);`
  142. reqDate := logInfo.ReqTime.Format("20060102")
  143. q := session.Query(cqlformat, __appName, reqDate, logInfo.ReqTime.Format("2006-01-02 15:04:05"), logInfo.ReqMethod, logInfo.ReqUrl, logInfo.ReqProto, logInfo.ReqUa, logInfo.ReqReferer, logInfo.ReqPostData, logInfo.RespTime.Format("2006-01-02 15:04:05"), logInfo.RespBody, logInfo.RespCode, logInfo.RespMsg, logInfo.RespData, logInfo.CostTime)
  144. err = q.Exec() // 应该可以使用批量插入
  145. if err != nil{
  146. fmt.Println("插入日志错误:", err)
  147. return
  148. }
  149. }