http_server.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package main
  2. import (
  3. "github.com/Shopify/sarama"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "io/ioutil"
  10. "log"
  11. "net/http"
  12. "os"
  13. "strings"
  14. "time"
  15. )
  16. var (
  17. addr = flag.String("addr", ":8080", "The address to bind to")
  18. brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
  19. verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
  20. cert_file = flag.String("certificate", "", "The certificate file for client authentication")
  21. key_file = flag.String("key", "", "The key file for client authentication")
  22. ca_file = flag.String("ca", "", "The certificate authority file for client authentication")
  23. tls_config *tls.Config
  24. )
  25. func main() {
  26. flag.Parse()
  27. if *cert_file != "" && *key_file != "" && *ca_file != "" {
  28. cert, err := tls.LoadX509KeyPair(*cert_file, *key_file)
  29. if err != nil {
  30. log.Fatal(err)
  31. }
  32. caCert, err := ioutil.ReadFile(*ca_file)
  33. if err != nil {
  34. log.Fatal(err)
  35. }
  36. caCertPool := x509.NewCertPool()
  37. caCertPool.AppendCertsFromPEM(caCert)
  38. tls_config = &tls.Config{
  39. Certificates: []tls.Certificate{cert},
  40. // RootCAs: caCertPool,
  41. InsecureSkipVerify: true,
  42. }
  43. }
  44. if *verbose {
  45. sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  46. }
  47. if *brokers == "" {
  48. flag.PrintDefaults()
  49. os.Exit(1)
  50. }
  51. brokerList := strings.Split(*brokers, ",")
  52. log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
  53. server := &Server{
  54. DataCollector: newDataCollector(brokerList),
  55. AccessLogProducer: newAccessLogProducer(brokerList),
  56. }
  57. defer func() {
  58. if err := server.Close(); err != nil {
  59. log.Println("Failed to close server", err)
  60. }
  61. }()
  62. log.Fatal(server.Run(*addr))
  63. }
  64. type Server struct {
  65. DataCollector sarama.SyncProducer
  66. AccessLogProducer sarama.AsyncProducer
  67. }
  68. func (s *Server) Close() error {
  69. if err := s.DataCollector.Close(); err != nil {
  70. log.Println("Failed to shut down data collector cleanly", err)
  71. }
  72. if err := s.AccessLogProducer.Close(); err != nil {
  73. log.Println("Failed to shut down access log producer cleanly", err)
  74. }
  75. return nil
  76. }
  77. func (s *Server) Handler() http.Handler {
  78. return s.withAccessLog(s.collectQueryStringData())
  79. }
  80. func (s *Server) Run(addr string) error {
  81. httpServer := &http.Server{
  82. Addr: addr,
  83. Handler: s.Handler(),
  84. }
  85. log.Printf("Listening for requests on %s...\n", addr)
  86. return httpServer.ListenAndServe()
  87. }
  88. func (s *Server) collectQueryStringData() http.Handler {
  89. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  90. if r.URL.Path != "/" {
  91. http.NotFound(w, r)
  92. return
  93. }
  94. // We are not setting a message key, which means that all messages will
  95. // be distributed randomly over the different partitions.
  96. partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
  97. Topic: "important",
  98. Value: sarama.StringEncoder(r.URL.RawQuery),
  99. })
  100. if err != nil {
  101. w.WriteHeader(http.StatusInternalServerError)
  102. fmt.Fprintf(w, "Failed to store your data:, %s", err)
  103. } else {
  104. // The tuple (topic, partition, offset) can be used as a unique identifier
  105. // for a message in a Kafka cluster.
  106. fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
  107. }
  108. })
  109. }
  110. type accessLogEntry struct {
  111. Method string `json:"method"`
  112. Host string `json:"host"`
  113. Path string `json:"path"`
  114. IP string `json:"ip"`
  115. ResponseTime float64 `json:"response_time"`
  116. encoded []byte
  117. err error
  118. }
  119. func (ale *accessLogEntry) ensureEncoded() {
  120. if ale.encoded == nil && ale.err == nil {
  121. ale.encoded, ale.err = json.Marshal(ale)
  122. }
  123. }
  124. func (ale *accessLogEntry) Length() int {
  125. ale.ensureEncoded()
  126. return len(ale.encoded)
  127. }
  128. func (ale *accessLogEntry) Encode() ([]byte, error) {
  129. ale.ensureEncoded()
  130. return ale.encoded, ale.err
  131. }
  132. func (s *Server) withAccessLog(next http.Handler) http.Handler {
  133. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  134. started := time.Now()
  135. next.ServeHTTP(w, r)
  136. entry := &accessLogEntry{
  137. Method: r.Method,
  138. Host: r.Host,
  139. Path: r.RequestURI,
  140. IP: r.RemoteAddr,
  141. ResponseTime: float64(time.Since(started)) / float64(time.Second),
  142. }
  143. // We will use the client's IP address as key. This will cause
  144. // all the access log entries of the same IP address to end up
  145. // on the same partition.
  146. s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
  147. Topic: "access_log",
  148. Key: sarama.StringEncoder(r.RemoteAddr),
  149. Value: entry,
  150. }
  151. })
  152. }
  153. func newDataCollector(brokerList []string) sarama.SyncProducer {
  154. // For the data collector, we are looking for strong consistency semantics.
  155. // Because we don't change the flush settings, sarama will try to produce messages
  156. // as fast as possible to keep latency low.
  157. config := sarama.NewConfig()
  158. config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
  159. config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
  160. if tls_config != nil {
  161. config.Net.TLS.Config = tls_config
  162. config.Net.TLS.Enable = true
  163. }
  164. // On the broker side, you may want to change the following settings to get
  165. // stronger consistency guarantees:
  166. // - For your broker, set `unclean.leader.election.enable` to false
  167. // - For the topic, you could increase `min.insync.replicas`.
  168. producer, err := sarama.NewSyncProducer(brokerList, config)
  169. if err != nil {
  170. log.Fatalln("Failed to start Sarama producer:", err)
  171. }
  172. return producer
  173. }
  174. func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
  175. // For the access log, we are looking for AP semantics, with high throughput.
  176. // By creating batches of compressed messages, we reduce network I/O at a cost of more latency.
  177. config := sarama.NewConfig()
  178. if tls_config != nil {
  179. config.Net.TLS.Enable = true
  180. config.Net.TLS.Config = tls_config
  181. }
  182. config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
  183. config.Producer.Compression = sarama.CompressionSnappy // Compress messages
  184. config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms
  185. producer, err := sarama.NewAsyncProducer(brokerList, config)
  186. if err != nil {
  187. log.Fatalln("Failed to start Sarama producer:", err)
  188. }
  189. // We will just log to STDOUT if we're not able to produce messages.
  190. // Note: messages will only be returned here after all retry attempts are exhausted.
  191. go func() {
  192. for err := range producer.Errors() {
  193. log.Println("Failed to write access log entry:", err)
  194. }
  195. }()
  196. return producer
  197. }