http_server.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "os"
  13. "strings"
  14. "time"
  15. )
  16. var (
  17. addr = flag.String("addr", ":8080", "The address to bind to")
  18. brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
  19. verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
  20. certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
  21. keyFile = flag.String("key", "", "The optional key file for client authentication")
  22. caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
  23. verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
  24. )
  25. func main() {
  26. flag.Parse()
  27. if *verbose {
  28. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  29. }
  30. if *brokers == "" {
  31. flag.PrintDefaults()
  32. os.Exit(1)
  33. }
  34. brokerList := strings.Split(*brokers, ",")
  35. log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
  36. server := &Server{
  37. DataCollector: newDataCollector(brokerList),
  38. AccessLogProducer: newAccessLogProducer(brokerList),
  39. }
  40. defer func() {
  41. if err := server.Close(); err != nil {
  42. log.Println("Failed to close server", err)
  43. }
  44. }()
  45. log.Fatal(server.Run(*addr))
  46. }
  47. func createTlsConfiguration() (t *tls.Config) {
  48. if *certFile != "" && *keyFile != "" && *caFile != "" {
  49. cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. caCert, err := ioutil.ReadFile(*caFile)
  54. if err != nil {
  55. log.Fatal(err)
  56. }
  57. caCertPool := x509.NewCertPool()
  58. caCertPool.AppendCertsFromPEM(caCert)
  59. t = &tls.Config{
  60. Certificates: []tls.Certificate{cert},
  61. RootCAs: caCertPool,
  62. InsecureSkipVerify: *verifySsl,
  63. }
  64. }
  65. // will be nil by default if nothing is provided
  66. return t
  67. }
  68. type Server struct {
  69. DataCollector sarama.SyncProducer
  70. AccessLogProducer sarama.AsyncProducer
  71. }
  72. func (s *Server) Close() error {
  73. if err := s.DataCollector.Close(); err != nil {
  74. log.Println("Failed to shut down data collector cleanly", err)
  75. }
  76. if err := s.AccessLogProducer.Close(); err != nil {
  77. log.Println("Failed to shut down access log producer cleanly", err)
  78. }
  79. return nil
  80. }
  81. func (s *Server) Handler() http.Handler {
  82. return s.withAccessLog(s.collectQueryStringData())
  83. }
  84. func (s *Server) Run(addr string) error {
  85. httpServer := &http.Server{
  86. Addr: addr,
  87. Handler: s.Handler(),
  88. }
  89. log.Printf("Listening for requests on %s...\n", addr)
  90. return httpServer.ListenAndServe()
  91. }
  92. func (s *Server) collectQueryStringData() http.Handler {
  93. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  94. if r.URL.Path != "/" {
  95. http.NotFound(w, r)
  96. return
  97. }
  98. // We are not setting a message key, which means that all messages will
  99. // be distributed randomly over the different partitions.
  100. partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
  101. Topic: "important",
  102. Value: sarama.StringEncoder(r.URL.RawQuery),
  103. })
  104. if err != nil {
  105. w.WriteHeader(http.StatusInternalServerError)
  106. fmt.Fprintf(w, "Failed to store your data:, %s", err)
  107. } else {
  108. // The tuple (topic, partition, offset) can be used as a unique identifier
  109. // for a message in a Kafka cluster.
  110. fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
  111. }
  112. })
  113. }
  114. type accessLogEntry struct {
  115. Method string `json:"method"`
  116. Host string `json:"host"`
  117. Path string `json:"path"`
  118. IP string `json:"ip"`
  119. ResponseTime float64 `json:"response_time"`
  120. encoded []byte
  121. err error
  122. }
  123. func (ale *accessLogEntry) ensureEncoded() {
  124. if ale.encoded == nil && ale.err == nil {
  125. ale.encoded, ale.err = json.Marshal(ale)
  126. }
  127. }
  128. func (ale *accessLogEntry) Length() int {
  129. ale.ensureEncoded()
  130. return len(ale.encoded)
  131. }
  132. func (ale *accessLogEntry) Encode() ([]byte, error) {
  133. ale.ensureEncoded()
  134. return ale.encoded, ale.err
  135. }
  136. func (s *Server) withAccessLog(next http.Handler) http.Handler {
  137. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  138. started := time.Now()
  139. next.ServeHTTP(w, r)
  140. entry := &accessLogEntry{
  141. Method: r.Method,
  142. Host: r.Host,
  143. Path: r.RequestURI,
  144. IP: r.RemoteAddr,
  145. ResponseTime: float64(time.Since(started)) / float64(time.Second),
  146. }
  147. // We will use the client's IP address as key. This will cause
  148. // all the access log entries of the same IP address to end up
  149. // on the same partition.
  150. s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
  151. Topic: "access_log",
  152. Key: sarama.StringEncoder(r.RemoteAddr),
  153. Value: entry,
  154. }
  155. })
  156. }
  157. func newDataCollector(brokerList []string) sarama.SyncProducer {
  158. // For the data collector, we are looking for strong consistency semantics.
  159. // Because we don't change the flush settings, sarama will try to produce messages
  160. // as fast as possible to keep latency low.
  161. config := sarama.NewConfig()
  162. config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
  163. config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
  164. tlsConfig := createTlsConfiguration()
  165. if tlsConfig != nil {
  166. config.Net.TLS.Config = tlsConfig
  167. config.Net.TLS.Enable = true
  168. }
  169. // On the broker side, you may want to change the following settings to get
  170. // stronger consistency guarantees:
  171. // - For your broker, set `unclean.leader.election.enable` to false
  172. // - For the topic, you could increase `min.insync.replicas`.
  173. producer, err := sarama.NewSyncProducer(brokerList, config)
  174. if err != nil {
  175. log.Fatalln("Failed to start Sarama producer:", err)
  176. }
  177. return producer
  178. }
  179. func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
  180. // For the access log, we are looking for AP semantics, with high throughput.
  181. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
  182. config := sarama.NewConfig()
  183. tlsConfig := createTlsConfiguration()
  184. if tlsConfig != nil {
  185. config.Net.TLS.Enable = true
  186. config.Net.TLS.Config = tlsConfig
  187. }
  188. config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
  189. config.Producer.Compression = sarama.CompressionSnappy // Compress messages
  190. config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
  191. producer, err := sarama.NewAsyncProducer(brokerList, config)
  192. if err != nil {
  193. log.Fatalln("Failed to start Sarama producer:", err)
  194. }
  195. // We will just log to STDOUT if we're not able to produce messages.
  196. // Note: messages will only be returned here after all retry attempts are exhausted.
  197. go func() {
  198. for err := range producer.Errors() {
  199. log.Println("Failed to write access log entry:", err)
  200. }
  201. }()
  202. return producer
  203. }