control_record.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package sarama
  2. type ControlRecordType int
  3. const (
  4. ControlRecordAbort ControlRecordType = iota
  5. ControlRecordCommit
  6. ControlRecordUnknown
  7. )
  8. // Control records are returned as a record by fetchRequest
  9. // However unlike "normal" records, they mean nothing application wise.
  10. // They only serve internal logic for supporting transactions.
  11. type ControlRecord struct {
  12. Version int16
  13. CoordinatorEpoch int32
  14. Type ControlRecordType
  15. }
  16. func (cr *ControlRecord) decode(key, value packetDecoder) error {
  17. {
  18. var err error
  19. cr.Version, err = value.getInt16()
  20. if err != nil {
  21. return err
  22. }
  23. cr.CoordinatorEpoch, err = value.getInt32()
  24. if err != nil {
  25. return err
  26. }
  27. }
  28. {
  29. var err error
  30. // There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
  31. // Either way, all these version can only be 0 for now
  32. cr.Version, err = key.getInt16()
  33. if err != nil {
  34. return err
  35. }
  36. recordType, err := key.getInt16()
  37. if err != nil {
  38. return err
  39. }
  40. switch recordType {
  41. case 0:
  42. cr.Type = ControlRecordAbort
  43. case 1:
  44. cr.Type = ControlRecordCommit
  45. default:
  46. // from JAVA implementation:
  47. // UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
  48. cr.Type = ControlRecordUnknown
  49. }
  50. }
  51. return nil
  52. }
  53. func (cr *ControlRecord) encode(key, value packetEncoder) {
  54. value.putInt16(cr.Version)
  55. value.putInt32(cr.CoordinatorEpoch)
  56. key.putInt16(cr.Version)
  57. switch cr.Type {
  58. case ControlRecordAbort:
  59. key.putInt16(0)
  60. case ControlRecordCommit:
  61. key.putInt16(1)
  62. }
  63. }