Setup:
- flink 1.19.1
- python 3.10
- Apache flink 1.19.1 python package
- java 11
- Kafka flink connector 3.4.0
I have am creating a KafkaSource
that's reading from a Kafka topic where messages have timestamps in their headers. I am passing it a deserialization schema created using the AvroRowDeserializationSchema
class. The KafkaSource
is then passed into the env.from_source
API along with a watermark strategy with a custom timestamp assigner, and a source name to create a data stream, however, it seems like the source is never using the custom timestamp assigner, and instead simply attaching the timestamp in the Kafka message header to each element. Is this expected?
Interestingly, when I call the assign_watermarks_and_timestamps
method on the stream returned by from_source
, it does use the custom timestamp assigner to get the event timestamp.