DEV Community

Francesco Tisiot
Francesco Tisiot

Posted on • Originally published at ftisiot.net

Convert Epoch to Timestamps in Apache Flink with TO_TIMESTAMP_LTZ

Sometimes when defining an Apache Flink® table using SQL we need to map an epoch timestamp and use it as record/message timestamp.

Doing it properly is not complex, but some attention needs to be paid since small changes can have huge impact (ask me how I know... lost a good hour on it today).

The TO_TIMESTAMP_LTZ function

When dealing with epoch timestamps, Flink offers the TO_TIMESTAMP_LTZ function, with a good set of documentation around it. Basically the function can be used with epoch timestamps in either seconds or milliseconds.

  • For epoch timestamp in seconds use: TO_TIMESTAMP_LTZ(timestamp_column, 0)
  • For epoch timestamp in milliseconds use: TO_TIMESTAMP_LTZ(timestamp_column, 3)

What about the timestamp_column definition?

Ok, ok, that works! What about the definition of the timestamp_column? You need to pay attention to it as well!

If the epoch timestamps in seconds you can have it working also by declaring the column as INT.
But if you do the same when dealing with epoch timestamps in milliseconds you'll face an error like

Caused by: java.lang.NumberFormatException: For input string: "1661760207278"
Enter fullscreen mode Exit fullscreen mode

This is because the number is greater than the INT range (-2,147,483,648 to 2,147,483,647). Therefore is better to have the column declared as BIGINT, like the following

ts BIGINT,
name string,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
Enter fullscreen mode Exit fullscreen mode

Top comments (0)