diff --git a/connectors-common/debezium-bucket/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java b/connectors-common/debezium-bucket/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java index 9725bcd55..bef582636 100644 --- a/connectors-common/debezium-bucket/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java +++ b/connectors-common/debezium-bucket/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java @@ -322,33 +322,34 @@ protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream i * (3 bytes in total) * * + fractional-seconds storage (size depends on meta) + * * The fractional part: + * read 1 byte, if meta is 1 or 2 + * read 2 bytes, if meta is 3 or 4 + * read 3 bytes, if meta is 5 or 6 */ - long time = bigEndianLong(inputStream.read(3), 0, 3); - boolean is_negative = bitSlice(time, 0, 1, 24) == 0; - int hours = bitSlice(time, 2, 10, 24); - int minutes = bitSlice(time, 12, 6, 24); - int seconds = bitSlice(time, 18, 6, 24); - int nanoSeconds; - if (is_negative) { // mysql binary arithmetic for negative encoded values - hours = ~hours & MASK_10_BITS; - hours = hours & ~(1 << 10); // unset sign bit - minutes = ~minutes & MASK_6_BITS; - minutes = minutes & ~(1 << 6); // unset sign bit - seconds = ~seconds & MASK_6_BITS; - seconds = seconds & ~(1 << 6); // unset sign bit - nanoSeconds = deserializeFractionalSecondsInNanosNegative(meta, inputStream); - if (nanoSeconds == 0 && seconds < 59) { // weird java Duration behavior - ++seconds; - } - hours = -hours; - minutes = -minutes; - seconds = -seconds; - nanoSeconds = -nanoSeconds; - } - else { - nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream); - } - return Duration.ofHours(hours).plusMinutes(minutes).plusSeconds(seconds).plusNanos(nanoSeconds); + int fractionBytes = (meta + 1) / 2; + int payloadBytes = 3 + fractionBytes; + int payloadBits = payloadBytes * 8; + long time = bigEndianLong(inputStream.read(payloadBytes), 0, payloadBytes); + boolean is_negative = bitSlice(time, 0, 1, payloadBits) == 0; + + if (is_negative) { + /* + * Negative numbers are stored in two's complement form. + * To get the positive value of a negative number in two's complement form, + * we should invert the bits of the number and add 1 to the result. + * Then we can take the number from the corresponding bits on the final result. + */ + time = ~time + 1; + } + + int hours = bitSlice(time, 2, 10, payloadBits); + int minutes = bitSlice(time, 12, 6, payloadBits); + int seconds = bitSlice(time, 18, 6, payloadBits); + int fraction = bitSlice(time, 24, fractionBytes * 8, payloadBits); + long nanoSeconds = (long) (fraction / (0.0000001 * Math.pow(100, fractionBytes - 1))); + final Duration duration = Duration.ofHours(hours).plusMinutes(minutes).plusSeconds(seconds).plusNanos(nanoSeconds); + return is_negative && !duration.isNegative() ? duration.negated() : duration; } /**