control_record.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package sarama
  2. //ControlRecordType ...
  3. type ControlRecordType int
  4. const (
  5. //ControlRecordAbort is a control record for abort
  6. ControlRecordAbort ControlRecordType = iota
  7. //ControlRecordCommit is a control record for commit
  8. ControlRecordCommit
  9. //ControlRecordUnknown is a control record of unknown type
  10. ControlRecordUnknown
  11. )
  12. // Control records are returned as a record by fetchRequest
  13. // However unlike "normal" records, they mean nothing application wise.
  14. // They only serve internal logic for supporting transactions.
  15. type ControlRecord struct {
  16. Version int16
  17. CoordinatorEpoch int32
  18. Type ControlRecordType
  19. }
  20. func (cr *ControlRecord) decode(key, value packetDecoder) error {
  21. var err error
  22. cr.Version, err = value.getInt16()
  23. if err != nil {
  24. return err
  25. }
  26. cr.CoordinatorEpoch, err = value.getInt32()
  27. if err != nil {
  28. return err
  29. }
  30. // There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
  31. // Either way, all these version can only be 0 for now
  32. cr.Version, err = key.getInt16()
  33. if err != nil {
  34. return err
  35. }
  36. recordType, err := key.getInt16()
  37. if err != nil {
  38. return err
  39. }
  40. switch recordType {
  41. case 0:
  42. cr.Type = ControlRecordAbort
  43. case 1:
  44. cr.Type = ControlRecordCommit
  45. default:
  46. // from JAVA implementation:
  47. // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
  48. cr.Type = ControlRecordUnknown
  49. }
  50. return nil
  51. }
  52. func (cr *ControlRecord) encode(key, value packetEncoder) {
  53. value.putInt16(cr.Version)
  54. value.putInt32(cr.CoordinatorEpoch)
  55. key.putInt16(cr.Version)
  56. switch cr.Type {
  57. case ControlRecordAbort:
  58. key.putInt16(0)
  59. case ControlRecordCommit:
  60. key.putInt16(1)
  61. }
  62. }