为了在不重新启动 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题分区订阅模式下定期发现新分区。要启用分区发现,请为属性partition.discovery.interval.ms设置一个非负值。
参数:scan.topic-partition-discovery.interval
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
Connector Options:
Option | Required | Default | Type | Description |
---|---|---|---|---|
scan.topic-partition-discovery.interval | optional | (none) | Duration | 消费者定期发现动态创建的Kafka主题和分区的时间间隔。 |
参数:partition.discovery.interval.ms
Java
KafkaSource.builder()
.setProperty("partition.discovery.interval.ms", "10000");
// discover new partitions per 10 seconds
Python
KafkaSource.builder() \
.set_property("partition.discovery.interval.ms", "10000")
# discover new partitions per 10 seconds
阅读量:860
点赞量:0
收藏量:0