|
@@ -45,7 +45,15 @@ func (m *Message) encode(pe packetEncoder) error {
|
|
|
pe.putInt8(attributes)
|
|
pe.putInt8(attributes)
|
|
|
|
|
|
|
|
if m.Version >= 1 {
|
|
if m.Version >= 1 {
|
|
|
- pe.putInt64(m.Timestamp.UnixNano() / int64(time.Millisecond))
|
|
|
|
|
|
|
+ timestamp := int64(-1)
|
|
|
|
|
+
|
|
|
|
|
+ if !m.Timestamp.Before(time.Unix(0, 0)) {
|
|
|
|
|
+ timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond)
|
|
|
|
|
+ } else if !m.Timestamp.IsZero() {
|
|
|
|
|
+ return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)}
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pe.putInt64(timestamp)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
err := pe.putBytes(m.Key)
|
|
err := pe.putBytes(m.Key)
|
|
@@ -125,7 +133,15 @@ func (m *Message) decode(pd packetDecoder) (err error) {
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
- m.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // negative timestamps are invalid, in these cases we should return
|
|
|
|
|
+ // a zero time
|
|
|
|
|
+ timestamp := time.Time{}
|
|
|
|
|
+ if millis >= 0 {
|
|
|
|
|
+ timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ m.Timestamp = timestamp
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
m.Key, err = pd.getBytes()
|
|
m.Key, err = pd.getBytes()
|