Sometimes when defining an Apache Flink® table using SQL we need to map an epoch timestamp and use it as record/message timestamp.
Doing it properly is not complex, but some attention needs to be paid since small changes can have huge impact (ask me how I know... lost a good hour on it today).
When dealing with epoch timestamps, Flink offers the
TO_TIMESTAMP_LTZ function, with a good set of documentation around it. Basically the function can be used with epoch timestamps in either seconds or milliseconds.
- For epoch timestamp in seconds use:
- For epoch timestamp in milliseconds use:
Ok, ok, that works! What about the definition of the
timestamp_column? You need to pay attention to it as well!
If the epoch timestamps in seconds you can have it working also by declaring the column as
But if you do the same when dealing with epoch timestamps in milliseconds you'll face an error like
Caused by: java.lang.NumberFormatException: For input string: "1661760207278"
This is because the number is greater than the
INT range (
2,147,483,647). Therefore is better to have the column declared as
BIGINT, like the following
ts BIGINT, name string, ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND