gssapi_kerberos.go 6.7 KB


  1. package sarama
  2. import (
  3. "encoding/asn1"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "time"
  9. "gopkg.in/jcmturner/gokrb5.v7/asn1tools"
  10. "gopkg.in/jcmturner/gokrb5.v7/gssapi"
  11. "gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
  12. "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
  13. "gopkg.in/jcmturner/gokrb5.v7/messages"
  14. "gopkg.in/jcmturner/gokrb5.v7/types"
  15. )
  16. const (
  17. TOK_ID_KRB_AP_REQ = 256
  18. GSS_API_GENERIC_TAG = 0x60
  19. KRB5_USER_AUTH = 1
  20. KRB5_KEYTAB_AUTH = 2
  21. GSS_API_INITIAL = 1
  22. GSS_API_VERIFY = 2
  23. GSS_API_FINISH = 3
  24. )
  25. type GSSAPIConfig struct {
  26. AuthType int
  27. KeyTabPath string
  28. KerberosConfigPath string
  29. ServiceName string
  30. Username string
  31. Password string
  32. Realm string
  33. }
  34. type GSSAPIKerberosAuth struct {
  35. Config *GSSAPIConfig
  36. ticket messages.Ticket
  37. encKey types.EncryptionKey
  38. NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
  39. step int
  40. }
  41. type KerberosClient interface {
  42. Login() error
  43. GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
  44. Domain() string
  45. CName() types.PrincipalName
  46. Destroy()
  47. }
  48. /*
  49. *
  50. * Appends length in big endian before payload, and send it to kafka
  51. *
  52. */
  53. func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) {
  54. length := len(payload)
  55. finalPackage := make([]byte, length+4) //4 byte length header + payload
  56. copy(finalPackage[4:], payload)
  57. binary.BigEndian.PutUint32(finalPackage, uint32(length))
  58. bytes, err := broker.conn.Write(finalPackage)
  59. if err != nil {
  60. return bytes, err
  61. }
  62. return bytes, nil
  63. }
  64. /*
  65. *
  66. * Read length (4 bytes) and then read the payload
  67. *
  68. */
  69. func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) {
  70. bytesRead := 0
  71. lengthInBytes := make([]byte, 4)
  72. bytes, err := io.ReadFull(broker.conn, lengthInBytes)
  73. if err != nil {
  74. return nil, bytesRead, err
  75. }
  76. bytesRead += bytes
  77. payloadLength := binary.BigEndian.Uint32(lengthInBytes)
  78. payloadBytes := make([]byte, payloadLength) // buffer for read..
  79. bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
  80. if err != nil {
  81. return payloadBytes, bytesRead, err
  82. }
  83. bytesRead += bytes
  84. return payloadBytes, bytesRead, nil
  85. }
  86. func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum() []byte {
  87. a := make([]byte, 24)
  88. flags := []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
  89. binary.LittleEndian.PutUint32(a[:4], 16)
  90. for _, i := range flags {
  91. f := binary.LittleEndian.Uint32(a[20:24])
  92. f |= uint32(i)
  93. binary.LittleEndian.PutUint32(a[20:24], f)
  94. }
  95. return a
  96. }
  97. /*
  98. *
  99. * Construct Kerberos AP_REQ package, conforming to RFC-4120
  100. * https://tools.ietf.org/html/rfc4120#page-84
  101. *
  102. */
  103. func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(
  104. domain string, cname types.PrincipalName,
  105. ticket messages.Ticket,
  106. sessionKey types.EncryptionKey) ([]byte, error) {
  107. auth, err := types.NewAuthenticator(domain, cname)
  108. if err != nil {
  109. return nil, err
  110. }
  111. auth.Cksum = types.Checksum{
  112. CksumType: chksumtype.GSSAPI,
  113. Checksum: krbAuth.newAuthenticatorChecksum(),
  114. }
  115. APReq, err := messages.NewAPReq(
  116. ticket,
  117. sessionKey,
  118. auth,
  119. )
  120. if err != nil {
  121. return nil, err
  122. }
  123. aprBytes := make([]byte, 2)
  124. binary.BigEndian.PutUint16(aprBytes, TOK_ID_KRB_AP_REQ)
  125. tb, err := APReq.Marshal()
  126. if err != nil {
  127. return nil, err
  128. }
  129. aprBytes = append(aprBytes, tb...)
  130. return aprBytes, nil
  131. }
  132. /*
  133. *
  134. * Append the GSS-API header to the payload, conforming to RFC-2743
  135. * Section 3.1, Mechanism-Independent Token Format
  136. *
  137. * https://tools.ietf.org/html/rfc2743#page-81
  138. *
  139. * GSSAPIHeader + <specific mechanism payload>
  140. *
  141. */
  142. func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) ([]byte, error) {
  143. oidBytes, err := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
  144. if err != nil {
  145. return nil, err
  146. }
  147. tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
  148. GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
  149. GSSHeader = append(GSSHeader, oidBytes...)
  150. GSSPackage := append(GSSHeader, payload...)
  151. return GSSPackage, nil
  152. }
  153. func (krbAuth *GSSAPIKerberosAuth) initSecContext(bytes []byte, kerberosClient KerberosClient) ([]byte, error) {
  154. switch krbAuth.step {
  155. case GSS_API_INITIAL:
  156. aprBytes, err := krbAuth.createKrb5Token(
  157. kerberosClient.Domain(),
  158. kerberosClient.CName(),
  159. krbAuth.ticket,
  160. krbAuth.encKey)
  161. if err != nil {
  162. return nil, err
  163. }
  164. krbAuth.step = GSS_API_VERIFY
  165. return krbAuth.appendGSSAPIHeader(aprBytes)
  166. case GSS_API_VERIFY:
  167. wrapTokenReq := gssapi.WrapToken{}
  168. if err := wrapTokenReq.Unmarshal(bytes, true); err != nil {
  169. return nil, err
  170. }
  171. // Validate response.
  172. isValid, err := wrapTokenReq.Verify(krbAuth.encKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
  173. if !isValid {
  174. return nil, err
  175. }
  176. wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encKey)
  177. if err != nil {
  178. return nil, err
  179. }
  180. krbAuth.step = GSS_API_FINISH
  181. return wrapTokenResponse.Marshal()
  182. }
  183. return nil, nil
  184. }
  185. /* This does the handshake for authorization */
  186. func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
  187. kerberosClient, err := krbAuth.NewKerberosClientFunc(krbAuth.Config)
  188. if err != nil {
  189. Logger.Printf("Kerberos client error: %s", err)
  190. return err
  191. }
  192. err = kerberosClient.Login()
  193. if err != nil {
  194. Logger.Printf("Kerberos client error: %s", err)
  195. return err
  196. }
  197. // Construct SPN using serviceName and host
  198. // SPN format: <SERVICE>/<FQDN>
  199. host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
  200. spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
  201. ticket, encKey, err := kerberosClient.GetServiceTicket(spn)
  202. if err != nil {
  203. Logger.Printf("Error getting Kerberos service ticket : %s", err)
  204. return err
  205. }
  206. krbAuth.ticket = ticket
  207. krbAuth.encKey = encKey
  208. krbAuth.step = GSS_API_INITIAL
  209. var receivedBytes []byte = nil
  210. defer kerberosClient.Destroy()
  211. for {
  212. packBytes, err := krbAuth.initSecContext(receivedBytes, kerberosClient)
  213. if err != nil {
  214. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  215. return err
  216. }
  217. requestTime := time.Now()
  218. bytesWritten, err := krbAuth.writePackage(broker, packBytes)
  219. if err != nil {
  220. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  221. return err
  222. }
  223. broker.updateOutgoingCommunicationMetrics(bytesWritten)
  224. if krbAuth.step == GSS_API_VERIFY {
  225. var bytesRead = 0
  226. receivedBytes, bytesRead, err = krbAuth.readPackage(broker)
  227. requestLatency := time.Since(requestTime)
  228. broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency)
  229. if err != nil {
  230. Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err)
  231. return err
  232. }
  233. } else if krbAuth.step == GSS_API_FINISH {
  234. return nil
  235. }
  236. }
  237. }