推荐 最新
打酱油的后端

Flink系列Table API和SQL之:开窗Over聚合

一、开窗(Over)聚合与标准SQL中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值,比如说,可以以每一行数据为基准,计算它之前1小时内所有数据的平均值,也可以计算它之前10个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的"开窗函数"。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口TVF聚合都是多对一的关系,将数据分组之后每组只会得到一个聚合结果。开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个多对多的关系。与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫做OVER聚合(Over Aggregation)。基本语法如下:SELECT <聚合函数> OVER ( [PARTITION BY <字段1>[,<字段2>,...]] ORDER BY <时间属性字段> <开窗范围>), ... FROM ...这里OVER关键字前面是一个聚合函数,会应用在后面OVER定义的窗口上。在OVER子句中主要有以下几个部分:PARTITION BY(可选):用来指定分区的键key,类似于GROUP BY的分组,这部分是可选的ORDER BY:OVER窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论哪种定义,数据都应该是以某种顺序排列好的,而表中的数据本身是无序的。所以在OVER子句中必须用ORDER BY明确地指出数据基于哪个字段排序。在Flink的流处理中,目前只支持按照时间属性的升序排列,所以这里ORDER BY后面的字段必须是定义好的时间属性。二、开窗范围对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由BETWEEN<下界> AND <上界> 来定义的,也就是"从下界到上界"的范围。目前支持的上界只能是CURRENT ROW,也就是定义一个从之前某一行到当前行的范围,所以一般的形式为:BETWEEN ... PRECEDING AND CURRENT ROW前面提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间作出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。范围间隔:范围间隔以RANGE为前缀,就是基于ORDER BY指定的时间字段去选择一个范围,一般就是当前行时间戳之前的一段时间,例如开窗范围选择当前行之前1小时的数据:RANGE BETWEENT INTERVAL '1' HOUR PRECEDING AND CURRENT ROW行间隔:行间隔以ROWS为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了,例如开窗范围选择当前行之前的5行数据(最终聚合会包括当前行,所以一共6条数据)ROWS BETWEEN 5 PRECEDING AND CURRENT ROW三、开窗(Over)聚合代码示例import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import static org.apache.flink.table.api.Expressions.$; /** * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved * <p> * Project: FlinkTutorial * <p> * Created by wushengran */ public class TimeAndWindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1. 在创建表的DDL中直接定义时间属性 String createDDL = "CREATE TABLE clickTable (" + " user_name STRING, " + " url STRING, " + " ts BIGINT, " + " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " + " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + ") WITH (" + " 'connector' = 'filesystem', " + " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " + " 'format' = 'csv' " + ")"; tableEnv.executeSql(createDDL); // 2. 在流转换成Table时定义时间属性 SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource()) .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event event, long l) { return event.timestamp; } })); Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime()); clickTable.printSchema(); // 聚合查询转换 // 4. 开窗聚合 Table overWindowResultTable = tableEnv.sqlQuery("SELECT user_name, " + " avg(ts) OVER (" + " PARTITION BY user_name " + " ORDER BY et " + " ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" + ") AS avg_ts " + "FROM clickTable"); // 结果表转换成流打印输出 tableEnv.toDataStream(overWindowResultTable).print("over window: "); env.execute(); } }over window: > +I[Mary, 1000] over window: > +I[Bob, 2000] over window: > +I[Alice, 3000] over window: > +I[Bob, 2500] over window: > +I[Alice, 3500] over window: > +I[Bob, 6000] over window: > +I[Alice, 10333]

0
0
0
浏览量2009
打酱油的后端

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据一、引入flink相关依赖二、properties保存连接kafka的配置三、构建flink实时消费环境四、添加Kafka源和处理数据五、完整代码六、执行程序查看消费到的数据一、引入flink相关依赖 <groupId>com.bigdata</groupId> <artifactId>flink</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink.version>1.13.1</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>二、properties保存连接kafka的配置 //用properties保存kafka连接的相关配置 val properties = new Properties() properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092") properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"swlfalfal\";") properties.setProperty("security.protocol","SASL_PLAINTEXT") properties.setProperty("sasl.mechanism", "PLAIN") properties.setProperty("group.id","flink-test") properties.setProperty("auto.offset.reset","earliest")三、构建flink实时消费环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setRestartStrategy(RestartStrategies.noRestart())四、添加Kafka源和处理数据 val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String] ("debezium-test-optics_uds",new SimpleStringSchema(),properties)) lines.print() //触发执行 env.execute()五、完整代码import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import java.util.Properties object SourceKafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setRestartStrategy(RestartStrategies.noRestart()) //用properties保存kafka连接的相关配置 val properties = new Properties() properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092") properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"******\";") properties.setProperty("security.protocol","SASL_PLAINTEXT") properties.setProperty("sasl.mechanism", "PLAIN") properties.setProperty("group.id","flink-test") properties.setProperty("auto.offset.reset","earliest") //添加kafka源,并打印数据 val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String] ("debezium-test-optics_uds",new SimpleStringSchema(),properties)) lines.print() //触发执行 env.execute() } }六、执行程序查看消费到的数据{ "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"sid" }, { "type":"string", "optional":false, "field":"sname" }, { "type":"int64", "optional":false, "name":"io.debezium.time.Timestamp", "version":1, "field":"updatetime" }, { "type":"string", "optional":false, "field":"ssex" } ], "optional":true, "name":"debezium_test_optics_uds.Value", "field":"before" }, { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"sid" }, { "type":"string", "optional":false, "field":"sname" }, { "type":"int64", "optional":false, "name":"io.debezium.time.Timestamp", "version":1, "field":"updatetime" }, { "type":"string", "optional":false, "field":"ssex" } ], "optional":true, "name":"debezium_test_optics_uds.Value", "field":"after" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"version" }, { "type":"string", "optional":false, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"ts_ms" }, { "type":"string", "optional":true, "name":"io.debezium.data.Enum", "version":1, "parameters":{ "allowed":"true,last,false,incremental" }, "default":"false", "field":"snapshot" }, { "type":"string", "optional":false, "field":"db" }, { "type":"string", "optional":true, "field":"sequence" }, { "type":"string", "optional":true, "field":"table" }, { "type":"int64", "optional":false, "field":"server_id" }, { "type":"string", "optional":true, "field":"gtid" }, { "type":"string", "optional":false, "field":"file" }, { "type":"int64", "optional":false, "field":"pos" }, { "type":"int32", "optional":false, "field":"row" }, { "type":"int64", "optional":true, "field":"thread" }, { "type":"string", "optional":true, "field":"query" } ], "optional":false, "name":"io.debezium.connector.mysql.Source", "field":"source" }, { "type":"string", "optional":false, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"id" }, { "type":"int64", "optional":false, "field":"total_order" }, { "type":"int64", "optional":false, "field":"data_collection_order" } ], "optional":true, "field":"transaction" } ], "optional":false, "name":"debezium_test_optics_uds.Envelope" }, "payload":{ "before":null, "after":{ "sid":3600, "sname":"f", "updatetime":1661126400000, "ssex":"a" }, "source":{ "version":"1.9.6.Final", "connector":"mysql", "name":"debezium-uds8-optics8-test_1h", "ts_ms":1665155935000, "snapshot":"false", "db":"dw", "sequence":null, "table":"student", "server_id":223344, "gtid":null, "file":"mysql-bin.000012", "pos":6193972, "row":0, "thread":66072, "query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')" }, "op":"c", "ts_ms":1665155935640, "transaction":null } }

0
0
0
浏览量2017
打酱油的后端

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

一、深入理解Flink TTLFlink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。窗口:对于窗口操作,可以将TTL应用于窗口中的数据。当窗口中的数据过期时,Flink会自动丢弃这些数据,从而保持窗口中的数据只包含最新的和有效的内容。这样可以减少内存的使用,同时提高窗口操作的计算性能。状态:对于有状态的操作,如键控状态或算子状态,可以为状态设置TTL。当状态中的数据过期时,Flink会自动清理过期的状态,释放资源。这对于长时间运行的应用程序特别有用,可以避免状态无限增长,消耗过多的内存。表:在Flink中,TTL也可以应用于表。可以通过在CREATE TABLE语句的WITH子句中指定TTL的选项来设置表的过期时间。当表中的数据过期时,Flink会自动删除过期的数据行。这对于处理具有实效性(例如日志)的数据特别有用,可以自动清理过期的数据,保持表的内容的新鲜和有效。TTL在实际应用中的作用主要体现在以下几个方面:节省资源:通过设置合适的TTL,可以有效地管理和控制内存和状态的使用。过期的数据会被自动清理,释放资源。这样可以避免无效或过时的数据占用过多的资源,提高应用程序的性能和可扩展性。数据清理:对于具有实效性的数据,如日志数据,可以使用TTL自动清理过期的数据。这可以减少手动管理和维护数据的工作量,保持数据的新鲜和有效。数据一致性:通过设置合适的TTL,可以确保数据在一定时间内保持一致性。过期的数据不再被读取或使用,可以避免数据不一致性的问题。性能优化:TTL可以通过自动清理过期数据来优化查询和计算的性能。只有最新和有效的数据被保留,可以减少数据的处理量,提高计算效率。总而言之,TTL是Flink中一种重要的机制,用于控制数据的过期时间和生命周期。通过适当配置TTL,可以优化资源使用、提高系统性能,并保持数据的一致性和有效性。二、Flink SQL设置TTLFlink SQL中可以使用TTL(Time To Live)来设置数据的过期时间,以控制数据在内存或状态中的存留时间。通过设置TTL,可以自动删除过期的数据,从而节省资源并提高性能。要在Flink SQL中设置TTL,可以使用CREATE TABLE语句的WITH选项来指定TTL的配置。以下是一个示例:CREATE TABLE myTable ( id INT, name STRING, eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime - INTERVAL '5' MINUTE -- 定义Watermark ) WITH ( 'connector' = 'kafka', 'topic' = 'myTopic', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'ttl' = '10m' -- 设置TTL为10分钟 );在上述示例中,通过在CREATE TABLE语句的WITH子句中的’ttl’选项中指定TTL的值(10m),即设置数据在内存中的存活时间为10分钟。过期的数据会自动被删除。需要注意的是,引入TTL机制会增加一定的性能和资源开销。因此,在使用TTL时需要权衡好过期时间和系统的性能需求。三、Flink设置TTL在需要设置TTL的数据源或状态上,使用相应的API(例如DataStream API或KeyedState API)设置TTL值。// DataStream API dataStream.keyBy(<key_selector>).mapStateDescriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>)); // KeyedState API descriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>));在Flink作业中配置TTL检查间隔(默认值为每分钟一次):state.backend.rocksdb.ttl.compaction.interval: <interval_in_milliseconds>四、深入理解checkpointFlink的Checkpoint是一种容错机制,用于在Flink作业执行过程中定期保存数据的一致性检查点。它可以保证作业在发生故障时能够从检查点恢复,并继续进行。下面是一些深入介绍Checkpoint的关键概念和特性:一致性保证:Flink的Checkpoint机制通过保存作业状态的快照来实现一致性保证。在Checkpoint期间,Flink会确保所有的输入数据都已经被处理,并将结果写入状态后再进行检查点的保存。这样可以确保在恢复时,从检查点恢复的作业状态仍然是一致的。保存顺序:Flink的Checkpoint机制保证了保存检查点的顺序。检查点的保存是有序的,即在一个检查点完成之前,不会开始下一个检查点的保存。这种有序的保存方式能够保证在恢复时按照检查点的顺序进行恢复。并行度一致性:Flink的Checkpoint机制能够保证在作业的不同并行任务之间保持一致性。即使在分布式的情况下,Flink也能够确保所有并行任务在某个检查点的位置上都能保持一致。这是通过分布式快照算法和超时机制来实现的。可靠性保证:Flink的Checkpoint机制对于作业的故障恢复非常可靠。当一个任务发生故障时,Flink会自动从最近的检查点进行恢复。如果某个检查点无法满足一致性要求,Flink会自动选择前一个检查点进行恢复,以确保作业能够在一个一致的状态下继续执行。容错机制:Flink的Checkpoint机制提供了容错机制来应对各种故障情况。例如,如果某个任务在保存检查点时失败,Flink会尝试重新保存检查点,直到成功为止。此外,Flink还支持增量检查点,它可以在不保存整个作业状态的情况下只保存修改的部分状态,从而提高了保存检查点的效率。高可用性:Flink的Checkpoint机制还提供了高可用性的选项。可以将检查点数据保存在分布式文件系统中,以防止单点故障。此外,还可以配置备份作业管理器(JobManager)和任务管理器(TaskManager)以确保在某个节点发生故障时能够快速恢复。总结起来,Flink的Checkpoint机制是一种强大且可靠的容错机制,它能够确保作业在发生故障时能够从一致性检查点恢复,并继续进行。通过保存作业状态的快照,Flink能够保证作业的一致性,并提供了高可用性和高效率的保存和恢复机制。Checkpoint是Flink中一种重要的容错机制,用于保证作业在发生故障时能够从上一次检查点恢复,并继续进行处理,从而实现容错性。以下是Checkpoint的主要用途:容错和故障恢复:Checkpoint可以将作业的状态和数据进行持久化,当发生故障时,Flink可以使用最近的检查点来恢复作业的状态和数据,从而避免数据丢失,并继续处理未完成的任务。Exactly-Once语义:通过将检查点和事务(如果应用程序使用Flink的事务支持)结合起来,Flink可以实现Exactly-Once语义,确保结果的一致性和准确性。当作业从检查点恢复时,它将只会处理一次输入数据,并产生一次输出,避免了重复和丢失的数据写入。冷启动和部署:可以使用检查点来实现作业的冷启动,即在作业启动时,从最近的检查点恢复状态和数据,并从上一次检查点的位置继续处理。这对于在作业启动或重新部署时非常有用,可以快速恢复到之前的状态,减少恢复所需的时间。跨版本迁移:当使用不同版本的Flink或更改作业的代码时,可以使用检查点将作业从旧的版本转移到新的版本,从而实现跨版本迁移。总之,Checkpoint是Flink中的关键机制,其用途包括容错和故障恢复、Exactly-Once语义、冷启动和部署以及跨版本迁移。通过使用Checkpoint,可以提高作业的可靠性、一致性和可恢复性。五、Flink设置Checkpoint要设置Flink的Checkpoint和TTL,可以按照以下步骤进行操作:设置Checkpoint:在Flink作业中启用Checkpoint:可以通过在Flink配置文件(flink-conf.yaml)中设置以下属性来开启Checkpoint:execution.checkpointing.enabled: true设置Checkpoint间隔:可以通过以下属性设置Checkpoint的间隔时间(默认值为10秒):execution.checkpointing.interval: <interval_in_milliseconds>设置Checkpoint保存路径:可以通过以下属性设置Checkpoint文件的保存路径(默认为jobmanager根路径):state.checkpoints.dir: <checkpoint_directory_path>六、Flink SQL关联多张表在Flink SQL中,可以通过使用窗口操作来保证在一段时间内多张表的数据总能关联到。窗口操作可以用于基于时间的数据处理,将数据划分为窗口,并在每个窗口上执行关联操作。 下面是一个示例,演示如何在一段时间内关联多张表的数据: ```sql -- 创建两个输入表 CREATE TABLE table1 ( id INT, name STRING, eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'topic1', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' ); CREATE TABLE table2 ( id INT, value STRING, eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'topic2', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' ); -- 执行关联操作 SELECT t1.id, t1.name, t2.value FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id AND t1.eventTime BETWEEN t2.eventTime - INTERVAL '5' MINUTE AND t2.eventTime + INTERVAL '5' MINUTE在上面的例子中,首先创建了两个输入表table1和table2,并分别指定了输入源(此处使用了Kafka作为示例输入源)。然后,在执行关联操作时,使用了通过窗口操作进行时间范围的过滤条件,即"t1.eventTime BETWEEN t2.eventTime - INTERVAL ‘5’ MINUTE AND t2.eventTime + INTERVAL ‘5’ MINUTE",确保了在一段时间内两张表的数据能够关联到。通过使用窗口操作,可以根据具体的时间范围来进行数据关联,从而保证在一段时间内多张表的数据总能关联到。七、Flink SQL使用TTL关联多表Flink还提供了Time-To-Live (TTL)功能,可以用于在表中定义数据的生存时间。当数据的时间戳超过定义的TTL时,Flink会自动将其从表中删除。这在处理实时数据时非常有用,可以自动清理过期的数据。在Flink中使用TTL可以通过创建表时指定TTL属性来实现,如下所示:CREATE TABLE myTable ( id INT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED, TTL (event_time) AS event_time + INTERVAL '1' HOUR ) WITH ( 'connector.type' = 'kafka', ... )在这个例子中,表myTable定义了一个event_time列,并使用TTL函数指定了数据的生存时间为event_time加上1小时。当数据的event_time超过1小时时,Flink会自动删除这些数据。通过在Flink SQL中同时使用JOIN和TTL,你可以实现多张表的关联,并根据指定的条件删除过期的数据,从而更灵活地处理和管理数据。

0
0
0
浏览量2009
打酱油的后端

Flink系列之:Table API Connectors之Raw Format

一、Raw FormatRaw format 允许读写原始(基于字节)值作为单个列。注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。Raw format 连接器是内置的。二、示例例如,你可能在 Kafka 中具有原始日志数据,并希望使用 Flink SQL 读取和分析此类数据。47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"下面的代码创建了一张表,使用 raw format 以 UTF-8 编码的形式从中读取(也可以写入)底层的 Kafka topic 作为匿名字符串值:CREATE TABLE nginx_log ( log STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'nginx_log', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'raw' )然后,你可以将原始数据读取为纯字符串,之后使用用户自定义函数将其分为多个字段进行进一步分析。例如 示例中的 my_split。SELECT t.hostname, t.datetime, t.url, t.browser, ... FROM( SELECT my_split(log) as t FROM nginx_log );相对应的,你也可以将一个 STRING 类型的列以 UTF-8 编码的匿名字符串值写入 Kafka topic。三、Format 参数参数 是否必选 默认值类型描述format必选 (none)String指定要使用的格式, 这里应该是 ‘raw’。raw.charset可选 UTF-8String指定字符集来编码文本字符串。raw.endianness可选 big-endianString指定字节序来编码数字值的字节。有效值为’big-endian’和’little-endian’。四、数据类型映射下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。

0
0
0
浏览量1975
打酱油的后端

Flink系列之:集合操作

一、集合操作适用于流、批操作二、UNIONUNION 和 UNION ALL 返回两个表中的数据。 UNION 会去重,UNION ALL 不会去重。Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c'); Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b'); Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2); +---+ | s| +---+ | c| | a| | b| | d| | e| +---+ Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2); +---+ | c| +---+ | c| | a| | b| | b| | c| | d| | e| | a| | b| | b| +---+三、INTERSECTINTERSECT 和 INTERSECT ALL 返回两个表中共有的数据。 INTERSECT 会去重,INTERSECT ALL 不会去重。Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2); +---+ | s| +---+ | a| | b| +---+ Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2); +---+ | s| +---+ | a| | b| | b| +---+四、EXCEPTEXCEPT 和 EXCEPT ALL 返回在一个表中存在,但在另一个表中不存在数据。 EXCEPT 会去重,EXCEPT ALL不会去重。Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2); +---+ | s | +---+ | c | +---+ Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2); +---+ | s | +---+ | c | | c | +---+五、IN如果表达式(可以是列,也可以是函数等)存在于子查询的结果中,则返回 true。子查询的表结果必须由一列组成。此列必须与表达式具有相同的数据类型。SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts )优化器会把 IN 条件重写为 join 和 group 操作。对于流式查询,计算查询结果所需的状态可能会根据输入行数而无限增长。你可以设置一个合适的状态 time-to-live(TTL)来淘汰过期数据以防止状态过大。注意:这可能会影响查询结果的正确性。六、XISTSSELECT user, amount FROM Orders WHERE product EXISTS ( SELECT product FROM NewProducts )如果子查询返回至少一行,则为 true。只支持能被重写为 join 和 group 的操作。优化器会把 EXIST 重写为 join 和 group 操作.对于流式查询,计算查询结果所需的状态可能会根据输入行数而无限增长。你可以设置一个合适的状态 time-to-live(TTL)来淘汰过期数据以防止状态过大。注意:这可能会影响查询结果的正确性。

0
0
0
浏览量1989
打酱油的后端

PyFlink系列之一:PyFlink安装和PyFlink使用的详细技术

一、下载PyFlink命令行下载PyFlink:pip install apache-flinkPycharm下载PyFlink:二、创建TableEnvironment创建 TableEnvironment 的推荐方式是通过 EnvironmentSettings 对象创建:from pyflink.table import EnvironmentSettings, TableEnvironment # create a streaming TableEnvironment env_settings = EnvironmentSettings.in_streaming_mode() # or a batch TableEnvironment # env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) 或者,用户可以从现有的 StreamExecutionEnvironment 创建 StreamTableEnvironment,以与 DataStream API 进行互操作。from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment # create a streaming TableEnvironment from a StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) 三、TableEnvironment API1.Table/SQL 操作这些 APIs 用来创建或者删除 Table API/SQL 表和写查询:APIs描述from_elements(elements, schema=None, verify_schema=True)通过元素集合来创建表。from_pandas(pdf, schema=None, split_num=1)通过 pandas DataFrame 来创建表。from_path(path)通过指定路径下已注册的表来创建一个表,例如通过 create_temporary_view 注册表。sql_query(query)执行一条 SQL 查询,并将查询的结果作为一个 Table 对象。create_temporary_view(view_path, table) 将一个 Table 对象注册为一张临时表,类似于 SQL 的临时表。drop_temporary_view(view_path) 删除指定路径下已注册的临时表drop_temporary_table(table_path)删除指定路径下已注册的临时表。 你可以使用这个接口来删除临时 source 表和临时 sink 表。execute_sql(stmt)执行指定的语句并返回执行结果。 执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。注意,对于 “INSERT INTO” 语句,这是一个异步操作,通常在向远程集群提交作业时才需要使用。 但是,如果在本地集群或者 IDE 中执行作业时,你需要等待作业执行完成2.执行/解释作业这些 APIs 是用来执行/解释作业。注意,execute_sql API 也可以用于执行作业。APIs 描述explain_sql(stmt, *extra_details) 返回指定语句的抽象语法树和执行计划。create_statement_set() 创建一个可接受 DML 语句或表的 StatementSet 实例。 它可用于执行包含多个 sink 的作业。3.创建/删除用户自定义函数这些 APIs 用来注册 UDFs 或者 删除已注册的 UDFs。 注意,execute_sql API 也可以用于注册/删除 UDFs。APIs描述create_temporary_function(path, function)将一个 Python 用户自定义函数注册为临时 catalog 函数。create_temporary_system_function(name, function) 将一个 Python 用户自定义函数注册为临时系统函数。 如果临时系统函数的名称与临时 catalog 函数名称相同,优先使用临时系统函数。create_java_function(path, function_class_name, ignore_if_exists=None)将 Java 用户自定义函数注册为指定路径下的 catalog 函数。 如果 catalog 是持久化的,则可以跨多个 Flink 会话和集群使用已注册的 catalog 函数。create_java_temporary_function(path, function_class_name)将 Java 用户自定义函数注册为临时 catalog 函数。create_java_temporary_system_function(name, function_class_name)将 Java 用户定义的函数注册为临时系统函数。drop_function(path)删除指定路径下已注册的 catalog 函数。drop_temporary_function(path)删除指定名称下已注册的临时系统函数。drop_temporary_system_function(name)删除指定名称下已注册的临时系统函数。4.依赖管理这些 APIs 用来管理 Python UDFs 所需要的 Python 依赖。APIs 描述add_python_file(file_path)添加 Python 依赖,可以是 Python 文件,Python 包或者本地目录。 它们将会被添加到 Python UDF 工作程序的 PYTHONPATH 中。set_python_requirements(requirements_file_path, requirements_cache_dir=None) 指定一个 requirements.txt 文件,该文件定义了第三方依赖关系。 这些依赖项将安装到一个临时 catalog 中,并添加到 Python UDF 工作程序的 PYTHONPATH 中。add_python_archive(archive_path, target_dir=None) 添加 Python 归档文件。该文件将被解压到 Python UDF 程序的工作目录中。5.配置APIs描述get_config()返回 table config,可以通过 table config 来定义 Table API 的运行时行为。下面的代码示例展示了如何通过这个 API 来设置配置选项:# set the parallelism to 8 table_env.get_config().get_configuration().set_string("parallelism.default", "8") 四、Catalog APIs这些 APIs 用于访问 catalog 和模块五、Statebackend,Checkpoint 以及重启策略在 Flink 1.10 之前,你可以通过 StreamExecutionEnvironment 来配置 statebackend,checkpointing 以及重启策略。 现在你可以通过在 TableConfig 中,通过设置键值选项来配置它们。下面代码示例展示了如何通过 Table API 来配置 statebackend,checkpoint 以及重启策略:# 设置重启策略为 "fixed-delay" table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay") table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3") table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s") # 设置 checkpoint 模式为 EXACTLY_ONCE table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE") table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min") # 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager" # 你也可以将这个属性设置为 StateBackendFactory 的完整类名 # e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory table_env.get_config().get_configuration().set_string("state.backend", "rocksdb") # 设置 RocksDB statebackend 所需要的 checkpoint 目录 table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/") 六、Table APITable API 是批处理和流处理的统一的关系型 API。Table API 的查询不需要修改代码就可以采用批输入或流输入来运行。Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样使用字符串类型的值来指定查询。Table API 和 Flink SQL 共享许多概念以及部分集成的 API。通过查看公共概念 & API来学习如何注册表或如何创建一个表对象。流概念页面讨论了诸如动态表和时间属性等流特有的概念。下面的例子中假定有一张叫 Orders 的表,表中有属性 (a, b, c, rowtime) 。rowtime 字段是流任务中的逻辑时间属性或是批任务中的普通时间戳字段。概述 & 示例Table API 支持 Scala, Java 和 Python 语言。Scala 语言的 Table API 利用了 Scala 表达式,Java 语言的 Table API 支持 DSL 表达式和解析并转换为等价表达式的字符串,Python 语言的 Table API 仅支持解析并转换为等价表达式的字符串。下面的例子展示了 Scala、Java 和 Python 语言的 Table API 的不同之处。表程序是在批环境下执行的。程序扫描了 Orders 表,通过字段 a 进行分组,并计算了每组结果的行数。JavaJava 的 Table API 通过引入 org.apache.flink.table.api.java.* 来使用。下面的例子展示了如何创建一个 Java 的 Table API 程序,以及表达式是如何指定为字符串的。 使用DSL表达式时也需要引入静态的 org.apache.flink.table.api.Expressions.*。import org.apache.flink.table.api.*; import static org.apache.flink.table.api.Expressions.*; EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 在表环境中注册 Orders 表 // ... // 指定表程序 Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) Table counts = orders .groupBy($("a")) .select($("a"), $("b").count().as("cnt")); // 打印 counts.execute().print(); ScalaScala 的 Table API 通过引入 org.apache.flink.table.api.、org.apache.flink.api.scala. 和 org.apache.flink.table.api.bridge.scala._(开启数据流的桥接支持)来使用。下面的例子展示了如何创建一个 Scala 的 Table API 程序。通过 Scala 的带美元符号($)的字符串插值来实现表字段引用。import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // 环境配置 val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); val tEnv = TableEnvironment.create(settings); // 在表环境中注册 Orders 表 // ... // 指定表程序 val orders = tEnv.from("Orders") // schema (a, b, c, rowtime) val result = orders .groupBy($"a") .select($"a", $"b".count as "cnt") .execute() .print() Python下面的例子展示了如何创建一个 Python 的 Table API 程序,以及表达式是如何指定为字符串的。from pyflink.table import * # 环境配置 t_env = TableEnvironment.create( environment_settings=EnvironmentSettings.in_batch_mode()) # 在表环境中注册 Orders 表和结果 sink 表 source_data_path = "/path/to/source/directory/" result_data_path = "/path/to/result/directory/" source_ddl = f""" create table Orders( a VARCHAR, b BIGINT, c BIGINT, rowtime TIMESTAMP(3), WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{source_data_path}' ) """ t_env.execute_sql(source_ddl) sink_ddl = f""" create table `Result`( a VARCHAR, cnt BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{result_data_path}' ) """ t_env.execute_sql(sink_ddl) # 指定表程序 orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait() 下一个例子展示了一个更加复杂的 Table API 程序。这个程序也扫描 Orders 表。程序过滤了空值,使字符串类型的字段 a 标准化,并且每个小时进行一次计算并返回 a 的平均账单金额 b。Java// 环境配置 // ... // 指定表程序 Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) Table result = orders .filter( and( $("a").isNotNull(), $("b").isNotNull(), $("c").isNotNull() )) .select($("a").lowerCase().as("a"), $("b"), $("rowtime")) .window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow")) .groupBy($("hourlyWindow"), $("a")) .select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount")); Scala// 环境配置 // ... // 指定表程序 val orders: Table = tEnv.from("Orders") // schema (a, b, c, rowtime) val result: Table = orders .filter($"a".isNotNull && $"b".isNotNull && $"c".isNotNull) .select($"a".lowerCase() as "a", $"b", $"rowtime") .window(Tumble over 1.hour on $"rowtime" as "hourlyWindow") .groupBy($"hourlyWindow", $"a") .select($"a", $"hourlyWindow".end as "hour", $"b".avg as "avgBillingAmount") Python# 指定表程序 from pyflink.table.expressions import col, lit orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & orders.c.is_not_null) \ .select(orders.a.lower_case.alias('a'), orders.b, orders.rowtime) \ .window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \ .group_by(col('hourly_window'), col('a')) \ .select(col('a'), col('hourly_window').end.alias('hour'), b.avg.alias('avg_billing_amount')) 因为 Table API 的批数据 API 和流数据 API 是统一的,所以这两个例子程序不需要修改代码就可以运行在流输入或批输入上。在这两种情况下,只要流任务没有数据延时,程序将会输出相同的结果。七、OperationsTable API支持如下操作。请注意不是所有的操作都可以既支持流也支持批;这些操作都具有相应的标记。1.From:Batch Streaming和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。Java:Table orders = tableEnv.from("Orders");Scala:val orders = tableEnv.from("Orders")Python:orders = t_env.from_path("Orders")2.FromValues:Batch Streaming和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。你可以使用 row(…) 表达式创建复合行:Java:Table table = tEnv.fromValues( row(1, "ABC"), row(2L, "ABCDE") ); Scala:table = tEnv.fromValues( row(1, "ABC"), row(2L, "ABCDE") ) Python:table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')]) 这将生成一张结构如下的表:root |-- f0: BIGINT NOT NULL // original types INT and BIGINT are generalized to BIGINT |-- f1: VARCHAR(5) NOT NULL // original types CHAR(3) and CHAR(5) are generalized // to VARCHAR(5). VARCHAR is used instead of CHAR so that // no padding is applied 这个方法会根据输入的表达式自动获取类型。如果在某一个特定位置的类型不一致,该方法会尝试寻找一个所有类型的公共超类型。如果公共超类型不存在,则会抛出异常。你也可以明确指定所需的类型。指定如 DECIMAL 这样的一般类型或者给列命名可能是有帮助的。Java:Table table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("name", DataTypes.STRING()) ), row(1, "ABC"), row(2L, "ABCDE") ); Scala:val table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("name", DataTypes.STRING()) ), row(1, "ABC"), row(2L, "ABCDE") ) Python:table = t_env.from_elements( [(1, 'ABC'), (2, 'ABCDE')], schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)), DataTypes.FIELD('name', DataTypes.STRING())]))这将生成一张结构如下的表:root |-- id: DECIMAL(10, 2) |-- name: STRING 3.Select:Batch Streaming和 SQL 的 SELECT 子句类似。 执行一个 select 操作。JavaTable orders = tableEnv.from("Orders"); Table result = orders.select($("a"), $("c").as("d"));Scalaval orders = tableEnv.from("Orders") Table result = orders.select($"a", $"c" as "d");Pythonorders = t_env.from_path("Orders") result = orders.select(orders.a, orders.c.alias('d'))可以选择星号(*)作为通配符,select 表中的所有列。JavaTable result = orders.select($("*")); ScalaTable result = orders.select($"*")Pythonfrom pyflink.table.expressions import col result = orders.select(col("*"))4.As:Batch Streaming重命名字段。JavaTable orders = tableEnv.from("Orders"); Table result = orders.as("x, y, z, t");scalaval orders: Table = tableEnv.from("Orders").as("x", "y", "z", "t")Pythonorders = t_env.from_path("Orders") result = orders.alias("x, y, z, t")5.Where / Filter:Batch Streaming和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。Java:Table orders = tableEnv.from("Orders"); Table result = orders.where($("b").isEqual("red")); Table orders = tableEnv.from("Orders"); Table result = orders.filter($("b").isEqual("red")); Scala:val orders: Table = tableEnv.from("Orders") val result = orders.filter($"a" % 2 === 0) val orders: Table = tableEnv.from("Orders") val result = orders.filter($"a" % 2 === 0) Python:orders = t_env.from_path("Orders") result = orders.where(orders.a == 'red') orders = t_env.from_path("Orders") result = orders.filter(orders.a == 'red') 八、列操作1.AddColumns:Batch Streaming执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。JavaTable orders = tableEnv.from("Orders"); Table result = orders.addColumns(concat($("c"), "sunny")); Scalaval orders = tableEnv.from("Orders"); val result = orders.addColumns(concat($"c", "Sunny")) Pythonfrom pyflink.table.expressions import concat orders = t_env.from_path("Orders") result = orders.add_columns(concat(orders.c, 'sunny')) 2.AddOrReplaceColumns:Batch Streaming执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。JavaTable orders = tableEnv.from("Orders"); Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc")); Scalaval orders = tableEnv.from("Orders"); val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc") Pythonfrom pyflink.table.expressions import concat orders = t_env.from_path("Orders") result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc')) 3.DropColumns:Batch StreamingJavaable orders = tableEnv.from("Orders"); Table result = orders.dropColumns($("b"), $("c")); Scalaval orders = tableEnv.from("Orders"); val result = orders.dropColumns($"b", $"c") Pythonorders = t_env.from_path("Orders") result = orders.drop_columns(orders.b, orders.c) 4.RenameColumns:Batch Streaming执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。JavaTable orders = tableEnv.from("Orders"); Table result = orders.renameColumns($("b").as("b2"), $("c").as("c2")); Scalaval orders = tableEnv.from("Orders"); val result = orders.renameColumns($"b" as "b2", $"c" as "c2") Pythonorders = t_env.from_path("Orders") result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2')) 九、Aggregations1.GroupBy Aggregation:Batch Streaming和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。JavaTable orders = tableEnv.from("Orders"); Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d")); Scalaval orders: Table = tableEnv.from("Orders") val result = orders.groupBy($"a").select($"a", $"b".sum().as("d")) Pythonorders = t_env.from_path("Orders") result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d')) 2.GroupBy Window Aggregation:Batch Streaming使用分组窗口结合单个或者多个分组键对表进行分组和聚合。JavaTable orders = tableEnv.from("Orders"); Table result = orders .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口 .groupBy($("a"), $("w")) // 按窗口和键分组 // 访问窗口属性并聚合 .select( $("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").sum().as("d") ); Scalaval orders: Table = tableEnv.from("Orders") val result: Table = orders .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口 .groupBy($"a", $"w") // 按窗口和键分组 .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".sum as "d") // 访问窗口属性并聚合 Pythonfrom pyflink.table.window import Tumble from pyflink.table.expressions import lit, col orders = t_env.from_path("Orders") result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ .group_by(orders.a, col('w')) \ .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d')) 3.Over Window Aggregation和 SQL 的 OVER 子句类似。JavaTable orders = tableEnv.from("Orders"); Table result = orders // 定义窗口 .window( Over .partitionBy($("a")) .orderBy($("rowtime")) .preceding(UNBOUNDED_RANGE) .following(CURRENT_RANGE) .as("w")) // 滑动聚合 .select( $("a"), $("b").avg().over($("w")), $("b").max().over($("w")), $("b").min().over($("w")) ); Scalaval orders: Table = tableEnv.from("Orders") val result: Table = orders // 定义窗口 .window( Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE following CURRENT_RANGE as "w") .select($"a", $"b".avg over $"w", $"b".max().over($"w"), $"b".min().over($"w")) // 滑动聚合 Pythonfrom pyflink.table.window import Over from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE orders = t_env.from_path("Orders") result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime) .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE) .alias("w")) \ .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) 所有的聚合必须定义在同一个窗口上,比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围(无界或有界)的窗口。尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。4.Distinct Aggregation:Batch Streaming和 SQL DISTINCT 聚合子句类似,例如 COUNT(DISTINCT a)。 Distinct 聚合声明的聚合函数(内置或用户定义的)仅应用于互不相同的输入值。 Distinct 可以应用于 GroupBy Aggregation、GroupBy Window Aggregation 和 Over Window Aggregation。Java:Table orders = tableEnv.from("Orders"); // 按属性分组后的的互异(互不相同、去重)聚合 Table groupByDistinctResult = orders .groupBy($("a")) .select($("a"), $("b").sum().distinct().as("d")); // 按属性、时间窗口分组后的互异(互不相同、去重)聚合 Table groupByWindowDistinctResult = orders .window(Tumble .over(lit(5).minutes()) .on($("rowtime")) .as("w") ) .groupBy($("a"), $("w")) .select($("a"), $("b").sum().distinct().as("d")); // over window 上的互异(互不相同、去重)聚合 Table result = orders .window(Over .partitionBy($("a")) .orderBy($("rowtime")) .preceding(UNBOUNDED_RANGE) .as("w")) .select( $("a"), $("b").avg().distinct().over($("w")), $("b").max().over($("w")), $("b").min().over($("w")) ); Scala:val orders: Table = tableEnv.from("Orders"); // 按属性分组后的的互异(互不相同、去重)聚合 val groupByDistinctResult = orders .groupBy($"a") .select($"a", $"b".sum.distinct as "d") // 按属性、时间窗口分组后的互异(互不相同、去重)聚合 val groupByWindowDistinctResult = orders .window(Tumble over 5.minutes on $"rowtime" as "w").groupBy($"a", $"w") .select($"a", $"b".sum.distinct as "d") // over window 上的互异(互不相同、去重)聚合 val result = orders .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w") .select($"a", $"b".avg.distinct over $"w", $"b".max over $"w", $"b".min over $"w") Python:from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE orders = t_env.from_path("Orders") # 按属性分组后的的互异(互不相同、去重)聚合 group_by_distinct_result = orders.group_by(orders.a) \ .select(orders.a, orders.b.sum.distinct.alias('d')) # 按属性、时间窗口分组后的互异(互不相同、去重)聚合 group_by_window_distinct_result = orders.window( Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \ .select(orders.a, orders.b.sum.distinct.alias('d')) # over window 上的互异(互不相同、去重)聚合 result = orders.over_window(Over .partition_by(orders.a) .order_by(orders.rowtime) .preceding(UNBOUNDED_RANGE) .alias("w")) \ .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) 用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果计算不同(互异、去重的)值的聚合结果,则只需向聚合函数添加 distinct 修饰符即可。JavaTable orders = tEnv.from("Orders"); // 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合 tEnv.registerFunction("myUdagg", new MyUdagg()); orders.groupBy("users") .select( $("users"), call("myUdagg", $("points")).distinct().as("myDistinctResult") ); Scalaval orders: Table = tableEnv.from("Orders") val result = orders.distinct() Pythonorders = t_env.from_path("Orders") result = orders.distinct() 十、Joins1.Inner Join:Batch Streaming和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名,并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。Java:Table orders = tableEnv.from("Orders"); Table result = orders.distinct(); Scala:val left = tableEnv.from("MyTable").select($"a", $"b", $"c") val right = tableEnv.from("MyTable").select($"d", $"e", $"f") val result = left.join(right).where($"a" === $"d").select($"a", $"b", $"e") Python:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e) 2.Outer Join:Batch Streaming和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。Java:Table left = tableEnv.from("MyTable).select($("a"), $("b"), $("c")); Table right = tableEnv.from("MyTable).select($("d"), $("e"), $("f")); Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d"))) .select($("a"), $("b"), $("e")); Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d"))) .select($("a"), $("b"), $("e")); Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d"))) .select($("a"), $("b"), $("e")); Scala:val left = tableEnv.from("MyTable").select($"a", $"b", $"c") val right = tableEnv.from("MyTable").select($"d", $"e", $"f") val leftOuterResult = left.leftOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e") val rightOuterResult = left.rightOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e") val fullOuterResult = left.fullOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e") Python:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) 3.Interval Join:Batch StreamingInterval join 是可以通过流模式处理的常规 join 的子集。Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。Java:Table left = tableEnv.from("MyTable).select($("a"), $("b"), $("c"), $("ltime")); Table right = tableEnv.from("MyTable).select($("d"), $("e"), $("f"), $("rtime")); Table result = left.join(right) .where( and( $("a").isEqual($("d")), $("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())), $("ltime").isLess($("rtime").plus(lit(10).minutes())) )) .select($("a"), $("b"), $("e"), $("ltime")); Scala:val left = tableEnv.from("MyTable").select($"a", $"b", $"c", $"ltime") val right = tableEnv.from("MyTable").select($"d", $"e", $"f", $"rtime") val result = left.join(right) .where($"a" === $"d" && $"ltime" >= $"rtime" - 5.minutes && $"ltime" < $"rtime" + 10.minutes) .select($"a", $"b", $"e", $"ltime") Python:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), col('rowtime1')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), col('rowtime2')) joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + lit(2).seconds)) result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, joined_table.rowtime1) 4.Inner Join with Table Function (UDTF):Batch Streamingjoin 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。 如果表函数调用返回空结果,则删除左侧(外部)表的一行。Java:Table orders = tableEnv.from("Orders"); Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d")); Scala Python Scala:// 实例化 User-Defined Table Function val split: TableFunction[_] = new MySplitUDTF() // join val result: Table = table .joinLateral(split($"c") as ("s", "t", "v")) .select($"a", $"b", $"s", $"t", $"v") Python:# 注册 User-Defined Table Function @udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()]) def split(x): return [Row(1, 2, 3)] # join orders = t_env.from_path("Orders") joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v")) result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v) 5.Join with Temporal TableTemporal table 是跟踪随时间变化的表。Temporal table 函数提供对特定时间点 temporal table 状态的访问。表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。目前仅支持与 temporal table 的 inner join。Java:Table ratesHistory = tableEnv.from("RatesHistory"); // 注册带有时间属性和主键的 temporal table function TemporalTableFunction rates = ratesHistory.createTemporalTableFunction( "r_proctime", "r_currency"); tableEnv.registerFunction("rates", rates); // 基于时间属性和键与“Orders”表关联 Table orders = tableEnv.from("Orders"); Table result = orders .joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency"))); Python:目前不支持 Python 的 Table API。 十一、Set Operations1.Union:Batch和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.union(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.union(right) 2.UnionAll:Batch Streaming和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.unionAll(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.unionAll(right) 3.Intersect:Batch和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.intersect(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.intersect(right) 4.IntersectAll:Batch和 SQL INTERSECT ALL 子句类似。IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.intersectAll(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.intersectAll(right) 5.Minus:Batch和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。两张表必须具有相同的字段类型。JavaTable left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.minus(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.minus(right) 6.MinusAll:Batch和 SQL EXCEPT ALL 子句类似。MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.minusAll(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.minusAll(right) 7.In:Batch Streaming和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。子查询表必须由一列组成。这个列必须与表达式具有相同的数据类型。Java:Table left = tableEnv.from("Orders1") Table right = tableEnv.from("Orders2"); Table result = left.select($("a"), $("b"), $("c")).where($("a").in(right)); Scala:val left = tableEnv.from("Orders1") val right = tableEnv.from("Orders2"); val result = left.select($"a", $"b", $"c").where($"a".in(right)) Python:left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('a')) result = left.select(left.a, left.b, left.c).where(left.a.in_(right)) 十二、OrderBy, Offset & Fetch1.Order By:Batch Streaming和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。Java:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) Scala:val result = in.orderBy($"a".asc)Python:result = in.order_by(in.a.asc)2.Offset & Fetch:Batch Streaming和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。Java:// 从已排序的结果集中返回前5条记录 Table result1 = in.orderBy($("a").asc()).fetch(5); // 从已排序的结果集中返回跳过3条记录之后的所有记录 Table result2 = in.orderBy($("a").asc()).offset(3); // 从已排序的结果集中返回跳过10条记录之后的前5条记录 Table result3 = in.orderBy($("a").asc()).offset(10).fetch(5); Scala:// 从已排序的结果集中返回前5条记录 val result1: Table = in.orderBy($"a".asc).fetch(5) // 从已排序的结果集中返回跳过3条记录之后的所有记录 val result2: Table = in.orderBy($"a".asc).offset(3) // 从已排序的结果集中返回跳过10条记录之后的前5条记录 val result3: Table = in.orderBy($"a".asc).offset(10).fetch(5) Python:# 从已排序的结果集中返回前5条记录 result1 = table.order_by(table.a.asc).fetch(5) # 从已排序的结果集中返回跳过3条记录之后的所有记录 result2 = table.order_by(table.a.asc).offset(3) # 从已排序的结果集中返回跳过10条记录之后的前5条记录 result3 = table.order_by(table.a.asc).offset(10).fetch(5) 3.Insert:Batch Streaming和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。executeInsert() 方法将立即提交执行插入操作的 Flink job。输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。Java:Table orders = tableEnv.from("Orders"); orders.executeInsert("OutOrders");Scala:val orders = tableEnv.from("Orders") orders.executeInsert("OutOrders")Python:orders = t_env.from_path("Orders") orders.execute_insert("OutOrders")十三、Windows1.Group WindowsGroup window 聚合根据时间或行计数间隔将行分为有限组,并为每个分组进行一次聚合函数计算。对于批处理表,窗口是按时间间隔对记录进行分组的便捷方式。窗口是使用 window(GroupWindow w) 子句定义的,并且需要使用 as 子句来指定别名。为了按窗口对表进行分组,窗口别名必须像常规分组属性一样在 groupBy(…) 子句中引用。 以下示例展示了如何在表上定义窗口聚合Java:Table table = input .window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w .groupBy($("w")) // 以窗口 w 对表进行分组 .select($("b").sum()); // 聚合Scala:val table = input .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w .groupBy($"w") // 以窗口 w 对表进行分组 .select($"b".sum) // 聚合Python:# 定义窗口并指定别名为 w,以窗口 w 对表进行分组,然后再聚合 table = input.window([w: GroupWindow].alias("w")) \ .group_by(col('w')).select(input.b.sum)在流环境中,如果窗口聚合除了窗口之外还根据一个或多个属性进行分组,则它们只能并行计算,例如,groupBy(…) 子句引用了一个窗口别名和至少一个附加属性。仅引用窗口别名(例如在上面的示例中)的 groupBy(…) 子句只能由单个非并行任务进行计算。 以下示例展示了如何定义有附加分组属性的窗口聚合。Java:Table table = input .window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w .groupBy($("w"), $("a")) // 以属性 a 和窗口 w 对表进行分组 .select($("a"), $("b").sum()); // 聚合Scala:val table = input .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w .groupBy($"w", $"a") // 以属性 a 和窗口 w 对表进行分组 .select($"a", $"b".sum) // 聚合Python:# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组, # 然后再聚合 table = input.window([w: GroupWindow].alias("w")) \ .group_by(col('w'), input.a).select(input.b.sum)时间窗口的开始、结束或行时间戳等窗口属性可以作为窗口别名的属性添加到 select 子句中,如 w.start、w.end 和 w.rowtime。窗口开始和行时间戳是包含的上下窗口边界。相反,窗口结束时间戳是唯一的上窗口边界。例如,从下午 2 点开始的 30 分钟滚动窗口将 “14:00:00.000” 作为开始时间戳,“14:29:59.999” 作为行时间时间戳,“14:30:00.000” 作为结束时间戳。Java:able table = input .window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w .groupBy($("w"), $("a")) // 以属性 a 和窗口 w 对表进行分组 .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); // 聚合并添加窗口开始、结束和 rowtime 时间戳Scala:val table = input .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w .groupBy($"w", $"a") // 以属性 a 和窗口 w 对表进行分组 .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count) // 聚合并添加窗口开始、结束和 rowtime 时间戳Python:# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组, # 然后再聚合并添加窗口开始、结束和 rowtime 时间戳 table = input.window([w: GroupWindow].alias("w")) \ .group_by(col('w'), input.a) \ .select(input.a, col('w').start, col('w').end, col('w').rowtime, input.b.count)Window 参数定义了如何将行映射到窗口。 Window 不是用户可以实现的接口。相反,Table API 提供了一组具有特定语义的预定义 Window 类。下面列出了支持的窗口定义。2.Tumble (Tumbling Windows)滚动窗口将行分配给固定长度的非重叠连续窗口。例如,一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。滚动窗口是通过 Tumble 类定义的,具体如下:MethodDescriptionover将窗口的长度定义为时间或行计数间隔。on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。alias指定窗口的别名。别名用于在 group_by() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。Java:// Tumbling Event-time Window .window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w")); // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w")); // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));Scala:// Tumbling Event-time Window .window(Tumble over 10.minutes on $"rowtime" as $"w") // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.minutes on $"proctime" as $"w") // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.rows on $"proctime" as $"w")Python:# Tumbling Event-time Window .window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias("w")) # Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble.over(lit(10).minutes).on(col('proctime')).alias("w")) # Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble.over(row_interval(10)).on(col('proctime')).alias("w"))3.Slide (Sliding Windows)滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,行可能分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口,以 5 分钟的间隔进行一次计算。滑动窗口可以定义在事件时间、处理时间或行数上。滑动窗口是通过 Slide 类定义的,具体如下:MethodDescriptionover 将窗口的长度定义为时间或行计数间隔。every将窗口的长度定义为时间或行计数间隔。滑动间隔的类型必须与窗口长度的类型相同。on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。Java:// Sliding Event-time Window .window(Slide.over(lit(10).minutes()) .every(lit(5).minutes()) .on($("rowtime")) .as("w")); // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide.over(lit(10).minutes()) .every(lit(5).minutes()) .on($("proctime")) .as("w")); // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));Scala:// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w") // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w") // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on $"proctime" as $"w")Python:# Sliding Event-time Window .window(Slide.over(lit(10).minutes).every(lit(5).minutes).on(col('rowtime')).alias("w")) # Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide.over(lit(10).minutes).every(lit(5).minutes).on(col('proctime')).alias("w")) # Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide.over(row_interval(10)).every(row_interval(5)).on(col('proctime')).alias("w"))4.Session (Session Windows)会话窗口没有固定的大小,其边界是由不活动的间隔定义的,例如,如果在定义的间隔期内没有事件出现,则会话窗口将关闭。例如,定义30 分钟间隔的会话窗口,当观察到一行在 30 分钟内不活动(否则该行将被添加到现有窗口中)且30 分钟内没有添加新行,窗口会关闭。会话窗口支持事件时间和处理时间。MethodDescriptionwithGap将两个窗口之间的间隙定义为时间间隔。on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。Java:// Session Event-time Window .window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w")); // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));Scala:// Session Event-time Window .window(Session withGap 10.minutes on $"rowtime" as $"w") // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on $"proctime" as $"w")Python:# Session Event-time Window .window(Session.with_gap(lit(10).minutes).on(col('rowtime')).alias("w")) # Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session.with_gap(lit(10).minutes).on(col('proctime')).alias("w"))5.Over WindowsOver window 聚合聚合来自在标准的 SQL(OVER 子句),可以在 SELECT 查询子句中定义。与在“GROUP BY”子句中指定的 group window 不同, over window 不会折叠行。相反,over window 聚合为每个输入行在其相邻行的范围内计算聚合。Over windows 使用 window(w: OverWindow*) 子句(在 Python API 中使用 over_window(*OverWindow))定义,并通过 select() 方法中的别名引用。以下示例显示如何在表上定义 over window 聚合。Java:Table table = input .window([OverWindow w].as("w")) // define over window with alias w .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w"))); // aggregate over the over window wScala:val table = input .window([w: OverWindow] as $"w") // define over window with alias w .select($"a", $"b".sum over $"w", $"c".min over $"w") // aggregate over the over window wPython:# define over window with alias w and aggregate over the over window w table = input.over_window([w: OverWindow].alias("w")) \ .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))OverWindow 定义了计算聚合的行范围。OverWindow 不是用户可以实现的接口。相反,Table API 提供了Over 类来配置 over window 的属性。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围内定义 over window 。可以通过 Over 类(和其他类)上的方法来定义 over window,具体如下:Partition By可选的在一个或多个属性上定义输入的分区。每个分区单独排序,聚合函数分别应用于每个分区。注意:在流环境中,如果窗口包含 partition by 子句,则只能并行计算 over window 聚合。如果没有 partitionBy(…),数据流将由单个非并行任务处理。Order By必须的定义每个分区内行的顺序,从而定义聚合函数应用于行的顺序。注意:对于流处理查询,必须声明事件时间或处理时间属性。目前,仅支持单个排序属性。Preceding可选的定义了包含在窗口中并位于当前行之前的行的间隔。间隔可以是时间或行计数间隔。有界 over window 用间隔的大小指定,例如,时间间隔为10分钟或行计数间隔为10行。无界 over window 通过常量来指定,例如,用UNBOUNDED_RANGE指定时间间隔或用 UNBOUNDED_ROW 指定行计数间隔。无界 over windows 从分区的第一行开始。如果省略前面的子句,则使用 UNBOUNDED_RANGE 和 CURRENT_RANGE 作为窗口前后的默认值。Following可选的定义包含在窗口中并在当前行之后的行的窗口间隔。间隔必须以与前一个间隔(时间或行计数)相同的单位指定。目前,不支持在当前行之后有行的 over window。相反,你可以指定两个常量之一:CURRENT_ROW 将窗口的上限设置为当前行。CURRENT_RANGE 将窗口的上限设置为当前行的排序键,例如,与当前行具有相同排序键的所有行都包含在窗口中。如果省略后面的子句,则时间间隔窗口的上限定义为 CURRENT_RANGE,行计数间隔窗口的上限定义为CURRENT_ROW。As必须的为 over window 指定别名。别名用于在之后的 select() 子句中引用该 over window。注意:目前,同一个 select() 调用中的所有聚合函数必须在同一个 over window 上计算。6.Unbounded Over WindowsJava// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w")); // 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w")); // 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w")); // 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));Scala// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as "w") // 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_RANGE as "w") // 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_ROW as "w") // 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_ROW as "w")Python# 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(UNBOUNDED_RANGE).alias("w")) # 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(UNBOUNDED_RANGE).alias("w")) # 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(UNBOUNDED_ROW).alias("w")) # 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(UNBOUNDED_ROW).alias("w"))7.Bounded Over WindowsJava// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w")); // 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as("w")); // 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w")); // 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));Scala:// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as "w") // 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding 1.minutes as "w") // 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding 10.rows as "w") // 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding 10.rows as "w")Python:# 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(lit(1).minutes).alias("w")) # 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(lit(1).minutes).alias("w")) # 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(row_interval(10)).alias("w")) # 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(row_interval(10)).alias("w")) 十四、Row-based Operations基于行生成多列输出的操作。1.Map:Batch Streaming使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。Java:public class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } @Override public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.ROW(Types.STRING(), Types.STRING()); } } ScalarFunction func = new MyMapFunction(); tableEnv.registerFunction("func", func); Table table = input .map(call("func", $("c")).as("a", "b")); Scala:class MyMapFunction extends ScalarFunction { def eval(a: String): Row = { Row.of(a, "pre-" + a) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW(Types.STRING, Types.STRING) } val func = new MyMapFunction() val table = input .map(func($"c")).as("a", "b") Python:使用 python 的通用标量函数或向量化标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。from pyflink.common import Row from pyflink.table import DataTypes from pyflink.table.udf import udf def map_function(a: Row) -> Row: return Row(a.a + 1, a.b * a.b) # 使用 python 通用标量函数进行 map 操作 func = udf(map_function, result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT())])) table = input.map(func).alias('a', 'b') # 使用 python 向量化标量函数进行 map 操作 pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()))]), func_type='pandas') table = input.map(pandas_func).alias('a', 'b') 2.FlatMap:Batch StreamingJava:使用表函数执行 flatMap 操作。public class MyFlatMapFunction extends TableFunction<Row> { public void eval(String str) { if (str.contains("#")) { String[] array = str.split("#"); for (int i = 0; i < array.length; ++i) { collect(Row.of(array[i], array[i].length())); } } } @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING(), Types.INT()); } } TableFunction func = new MyFlatMapFunction(); tableEnv.registerFunction("func", func); Table table = input .flatMap(call("func", $("c")).as("a", "b")); Scala:class MyFlatMapFunction extends TableFunction[Row] { def eval(str: String): Unit = { if (str.contains("#")) { str.split("#").foreach({ s => val row = new Row(2) row.setField(0, s) row.setField(1, s.length) collect(row) }) } } override def getResultType: TypeInformation[Row] = { Types.ROW(Types.STRING, Types.INT) } } val func = new MyFlatMapFunction val table = input .flatMap(func($"c")).as("a", "b") Python:from pyflink.table.udf import udtf from pyflink.table import DataTypes from pyflink.common import Row @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) def split(x: Row) -> Row: for s in x.b.split(","): yield x.a, s input.flat_map(split) 3.Aggregate:Batch StreamingJava:使用聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。public class MyMinMaxAcc { public int min = 0; public int max = 0; } public class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> { public void accumulate(MyMinMaxAcc acc, int value) { if (value < acc.min) { acc.min = value; } if (value > acc.max) { acc.max = value; } } @Override public MyMinMaxAcc createAccumulator() { return new MyMinMaxAcc(); } public void resetAccumulator(MyMinMaxAcc acc) { acc.min = 0; acc.max = 0; } @Override public Row getValue(MyMinMaxAcc acc) { return Row.of(acc.min, acc.max); } @Override public TypeInformation<Row> getResultType() { return new RowTypeInfo(Types.INT, Types.INT); } } AggregateFunction myAggFunc = new MyMinMax(); tableEnv.registerFunction("myAggFunc", myAggFunc); Table table = input .groupBy($("key")) .aggregate(call("myAggFunc", $("a")).as("x", "y")) .select($("key"), $("x"), $("y")); Scala:case class MyMinMaxAcc(var min: Int, var max: Int) class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { def accumulate(acc: MyMinMaxAcc, value: Int): Unit = { if (value < acc.min) { acc.min = value } if (value > acc.max) { acc.max = value } } override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0, 0) def resetAccumulator(acc: MyMinMaxAcc): Unit = { acc.min = 0 acc.max = 0 } override def getValue(acc: MyMinMaxAcc): Row = { Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max)) } override def getResultType: TypeInformation[Row] = { new RowTypeInfo(Types.INT, Types.INT) } } val myAggFunc = new MyMinMax val table = input .groupBy($"key") .aggregate(myAggFunc($"a") as ("x", "y")) .select($"key", $"x", $"y") Python:使用 python 的通用聚合函数或 向量化聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate ,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。from pyflink.common import Row from pyflink.table import DataTypes from pyflink.table.udf import AggregateFunction, udaf class CountAndSumAggregateFunction(AggregateFunction): def get_value(self, accumulator): return Row(accumulator[0], accumulator[1]) def create_accumulator(self): return Row(0, 0) def accumulate(self, accumulator, row: Row): accumulator[0] += 1 accumulator[1] += row.b def retract(self, accumulator, row: Row): accumulator[0] -= 1 accumulator[1] -= row.b def merge(self, accumulator, accumulators): for other_acc in accumulators: accumulator[0] += other_acc[0] accumulator[1] += other_acc[1] def get_accumulator_type(self): return DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT())]) def get_result_type(self): return DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT())]) function = CountAndSumAggregateFunction() agg = udaf(function, result_type=function.get_result_type(), accumulator_type=function.get_accumulator_type(), name=str(function.__class__.__name__)) # 使用 python 通用聚合函数进行聚合 result = t.group_by(t.a) \ .aggregate(agg.alias("c", "d")) \ .select("a, c, d") # 使用 python 向量化聚合函数进行聚合 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") t.aggregate(pandas_udaf.alias("a", "b")) \ .select("a, b") 4.Group Window Aggregate:Batch Streaming在 group window 和可能的一个或多个分组键上对表进行分组和聚合。你必须使用 select 子句关闭 aggregate。并且 select 子句不支持“*“或聚合函数。Java:AggregateFunction myAggFunc = new MyMinMax(); tableEnv.registerFunction("myAggFunc", myAggFunc); Table table = input .window(Tumble.over(lit(5).minutes()) .on($("rowtime")) .as("w")) // 定义窗口 .groupBy($("key"), $("w")) // 以键和窗口分组 .aggregate(call("myAggFunc", $("a")).as("x", "y")) .select($("key"), $("x"), $("y"), $("w").start(), $("w").end()); // 访问窗口属性与聚合结果Scala:val myAggFunc = new MyMinMax val table = input .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口 .groupBy($"key", $"w") // 以键和窗口分组 .aggregate(myAggFunc($"a") as ("x", "y")) .select($"key", $"x", $"y", $"w".start, $"w".end) // 访问窗口属性与聚合结果Python:from pyflink.table import DataTypes from pyflink.table.udf import AggregateFunction, udaf pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") tumble_window = Tumble.over(expr.lit(1).hours) \ .on(expr.col("rowtime")) \ .alias("w") t.select(t.b, t.rowtime) \ .window(tumble_window) \ .group_by("w") \ .aggregate(pandas_udaf.alias("d", "e")) \ .select("w.rowtime, d, e")5.FlatAggregate和 GroupBy Aggregation 类似。使用运行中的表之后的聚合算子对分组键上的行进行分组,以按组聚合行。和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。你必须使用 select 子句关闭 flatAggregate。并且 select 子句不支持聚合函数。除了使用 emitValue 输出结果,你还可以使用 emitUpdateWithRetract 方法。和 emitValue 不同的是,emitUpdateWithRetract 用于下发已更新的值。此方法在retract 模式下增量输出数据,例如,一旦有更新,我们必须在发送新的更新记录之前收回旧记录。如果在表聚合函数中定义了这两个方法,则将优先使用 emitUpdateWithRetract 方法而不是 emitValue 方法,这是因为该方法可以增量输出值,因此被视为比 emitValue 方法更有效。Java:/** * Top2 Accumulator。 */ public class Top2Accum { public Integer first; public Integer second; } /** * 用户定义的聚合函数 top2。 */ public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> { @Override public Top2Accum createAccumulator() { Top2Accum acc = new Top2Accum(); acc.first = Integer.MIN_VALUE; acc.second = Integer.MIN_VALUE; return acc; } public void accumulate(Top2Accum acc, Integer v) { if (v > acc.first) { acc.second = acc.first; acc.first = v; } else if (v > acc.second) { acc.second = v; } } public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) { for (Top2Accum otherAcc : iterable) { accumulate(acc, otherAcc.first); accumulate(acc, otherAcc.second); } } public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) { // 下发 value 与 rank if (acc.first != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.first, 1)); } if (acc.second != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.second, 2)); } } } tEnv.registerFunction("top2", new Top2()); Table orders = tableEnv.from("Orders"); Table result = orders .groupBy($("key")) .flatAggregate(call("top2", $("a")).as("v", "rank")) .select($("key"), $("v"), $("rank");Scala:import java.lang.{Integer => JInteger} import org.apache.flink.table.api.Types import org.apache.flink.table.functions.TableAggregateFunction /** * Top2 Accumulator。 */ class Top2Accum { var first: JInteger = _ var second: JInteger = _ } /** * 用户定义的聚合函数 top2。 */ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] { override def createAccumulator(): Top2Accum = { val acc = new Top2Accum acc.first = Int.MinValue acc.second = Int.MinValue acc } def accumulate(acc: Top2Accum, v: Int) { if (v > acc.first) { acc.second = acc.first acc.first = v } else if (v > acc.second) { acc.second = v } } def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = { val iter = its.iterator() while (iter.hasNext) { val top2 = iter.next() accumulate(acc, top2.first) accumulate(acc, top2.second) } } def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = { // 下发 value 与 rank if (acc.first != Int.MinValue) { out.collect(JTuple2.of(acc.first, 1)) } if (acc.second != Int.MinValue) { out.collect(JTuple2.of(acc.second, 2)) } } } val top2 = new Top2 val orders: Table = tableEnv.from("Orders") val result = orders .groupBy($"key") .flatAggregate(top2($"a") as ($"v", $"rank")) .select($"key", $"v", $"rank")Python:使用 python 通用 Table Aggregate Function 执行 flat_aggregate 操作。和 GroupBy Aggregation 类似。使用运行中的表之后的聚合运算符对分组键上的行进行分组,以按组聚合行。和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。你必须使用 select 子句关闭 flat_aggregate。并且 select 子句不支持聚合函数。from pyflink.common import Row from pyflink.table.udf import TableAggregateFunction, udtaf from pyflink.table import DataTypes class Top2(TableAggregateFunction): def emit_value(self, accumulator): yield Row(accumulator[0]) yield Row(accumulator[1]) def create_accumulator(self): return [None, None] def accumulate(self, accumulator, row: Row): if row.a is not None: if accumulator[0] is None or row.a > accumulator[0]: accumulator[1] = accumulator[0] accumulator[0] = row.a elif accumulator[1] is None or row.a > accumulator[1]: accumulator[1] = row.a def merge(self, accumulator, accumulators): for other_acc in accumulators: self.accumulate(accumulator, other_acc[0]) self.accumulate(accumulator, other_acc[1]) def get_accumulator_type(self): return DataTypes.ARRAY(DataTypes.BIGINT()) def get_result_type(self): return DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT())]) mytop = udtaf(Top2()) t = t_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), (2, 'Hi', 'Hello')], ['a', 'b', 'c']) result = t.select(t.a, t.c) \ .group_by(t.c) \ .flat_aggregate(mytop) \ .select(t.a) \ .flat_aggregate(mytop.alias("b"))

0
0
0
浏览量2010
打酱油的后端

Flink系列之:背压下的检查点

一、Checkpointing under backpressure通常情况下,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响。 然而,当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间。这在 checkpointing process) 的概述中有说明原因。并且可以通过高 alignment time and start delay metrics 观察到。 当这种情况发生并成为一个问题时,有三种方法可以解决这个问题:消除背压源头,通过优化 Flink 作业,通过调整 Flink 或 JVM 参数,抑或是通过扩容。减少 Flink 作业中缓冲在 In-flight 数据的数据量。启用非对齐 Checkpoints。 这些选项并不是互斥的,可以组合在一起。本文档重点介绍后两个选项。二、缓冲区 DebloatingFlink 1.14 引入了一个新的工具,用于自动控制在 Flink 算子/子任务之间缓冲的 In-flight 数据的数据量。缓冲区 Debloating 机 制可以通过将属性taskmanager.network.memory.buffer-debloat.enabled设置为true来启用。此特性对对齐和非对齐 Checkpoint 都生效,并且在这两种情况下都能缩短 Checkpointing 的时间,不过 Debloating 的效果对于 对齐 Checkpoint 最明显。 当在非对齐 Checkpoint 情况下使用缓冲区 Debloating 时,额外的好处是 Checkpoint 大小会更小,并且恢复时间更快 (需要保存 和恢复的 In-flight 数据更少)。有关缓冲区 Debloating 功能如何工作以及如何配置的更多信息,可以参考 network memory tuning guide。 请注意,您仍然可以继续使用在前面调优指南中介绍过的方式来手动减少缓冲在 In-flight 数据的数据量。三、非对齐 Checkpoints从Flink 1.11开始,Checkpoint 可以是非对齐的。 Unaligned checkpoints 包含 In-flight 数据(例如,存储在缓冲区中的数据)作为 Checkpoint State的一部分,允许 Checkpoint Barrier 跨越这些缓冲区。因此, Checkpoint 时长变得与当前吞吐量无关,因为 Checkpoint Barrier 实际上已经不再嵌入到数据流当中。如果您的 Checkpointing 由于背压导致周期非常的长,您应该使用非对齐 Checkpoint。这样,Checkpointing 时间基本上就与 端到端延迟无关。请注意,非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真 正的瓶颈时,您不应当使用非对齐 Checkpointing。为了启用非对齐 Checkpoint,您可以:Java代码StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 启用非对齐 Checkpoint env.getCheckpointConfig().enableUnalignedCheckpoints();Scala代码:val env = StreamExecutionEnvironment.getExecutionEnvironment() // 启用非对齐 Checkpoint env.getCheckpointConfig.enableUnalignedCheckpoints()Python代码:env = StreamExecutionEnvironment.get_execution_environment() # 启用非对齐 Checkpoint env.get_checkpoint_config().enable_unaligned_checkpoints()或者在 flink-conf.yml 配置文件中增加配置:execution.checkpointing.unaligned: true四、对齐 Checkpoint 的超时在启用非对齐 Checkpoint 后,你依然可以通过编程的方式指定对齐 Checkpoint 的超时:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));或是在 flink-conf.yml 配置文件中配置:execution.checkpointing.aligned-checkpoint-timeout: 30 s在启动时,每个 Checkpoint 仍然是 aligned checkpoint,但是当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时, 如果 aligned checkpoint 还没完成,那么 Checkpoint 将会转换为 Unaligned Checkpoint。五、限制并发 CheckpointFlink 当前并不支持并发的非对齐 Checkpoint。然而,由于更可预测的和更短的 Checkpointing 时长,可能也根本就不需要并发的 Checkpoint。此外,Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费稍长的时间。与 Watermark 的相互影响非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证。目前,Flink 确保了 Watermark 作为恢复的第一步, 而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,Flink 会在恢复 In-flight 数据后再生成 Watermark。如果您的 Pipeline 中使用了对每条记录都应用最新的 Watermark 的算子将会相对于 使用对齐 Checkpoint产生不同的结果。如果您的 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中。在这种情况下,Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。与长时间运行的记录处理的相互作用尽管未对齐的检查点障碍仍然能够超越队列中的所有其他记录。如果当前记录需要花费大量时间来处理,则此屏障的处理仍然可能会被延迟。当同时触发多个计时器时(例如在窗口操作中),可能会发生这种情况。当系统在处理单个输入记录时被阻塞等待多个网络缓冲区可用性时,可能会出现第二种有问题的情况。 Flink 无法中断单个输入记录的处理,未对齐的检查点必须等待当前处理的记录被完全处理。这可能会在两种情况下导致问题。由于不适合单个网络缓冲区的大记录的序列化或在 flatMap 操作中,会为一个输入记录生成许多输出记录。在这种情况下,背压可能会阻止未对齐的检查点,直到处理单个输入记录所需的所有网络缓冲区都可用。当处理单个记录需要一段时间时,它也可能发生在任何其他情况下。因此,检查点的时间可能会比预期的时间长,或者可能会有所不同。某些数据分布模式没有检查点有些属性包含的连接无法与 Channel 中的数据一样保存在 Checkpoint 中。为了保留这些功能并确保没有状态冲突或非预期的行为,非同一 Checkpoint 对于这些类型的连接是禁用的。所有其他的交换仍然执行非单色检查点。点对点连接我们目前没有任何对于点对点连接中有关数据有序性的强保证。然而,由于数据已经被以前置的 Source 或是 KeyBy 相同的方式隐式 组织,一些用户会依靠这种特性在提供的有序性保证的同时将计算敏感型的任务划分为更小的块。只要并行度不变,非对齐 Checkpoint(UC) 将会保留这些特性。但是如果加上UC的伸缩容,这些特性将会被改变。针对如下任务如果我们想将并行度从 p=2 扩容到 p=3,那么需要根据 KeyGroup 将 KeyBy 的 Channel 中的数据突然的划分到3个 Channel 中去。这 很容易做到,通过使用 Operator 的 KeyGroup 范围和确定记录属于某个 Key(group) 的方法(不管实际使用的是什么方法)。对于 Forward 的 Channel,我们根本没有 KeyContext。Forward Channel 里也没有任何记录被分配了任何 KeyGroup;也无法计算它,因为无法保证 Key仍然存在。广播 Connections广播 Connection 带来了另一个问题。无法保证所有 Channel 中的记录都以相同的速率被消费。这可能导致某些 Task 已经应用了与 特定广播事件对应的状态变更,而其他任务则没有,如图所示。广播分区通常用于实现广播状态,它应该跨所有 Operator 都相同。Flink 实现广播状态,通过仅 Checkpointing 有状态算子的 SubTask 0 中状态的单份副本。在恢复时,我们将该份副本发往所有的 Operator。因此,可能会发生以下情况:某个算子将很快从它的 Checkpointed Channel 消费数据并将修改应有于记录来获得状态。六、故障排除Corrupted in-flight data以下描述的操作是最后采取的手段,因为它们将会导致数据的丢失。为了防止 In-flight 数据损坏,或者由于其他原因导致作业应该在没有 In-flight 数据的情况下恢复,可以使用 recover-without-channel-state.checkpoint-id 属性。该属性需要指定一个 Checkpoint Id,对它来说 In-flight 中的数据将会被忽略。除非已经持久化的 In-flight 数据内部的损坏导致无 法恢复的情况,否则不要设置该属性。只有在重新部署作业后该属性才会生效,这就意味着只有启用 externalized checkpoint时,此操作才有意义。

0
0
0
浏览量1927
打酱油的后端

Flink系列Table API和SQL之:创建表环境和创建表

一、快速上手Table API和SQL创建表环境TableEnvironment tableEnv = ...;创建输入表,连接外部系统读取数据tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ('connector' = ... )");注册一个表,连接到外部系统,用于输出tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ('connector' = ... )");执行SQL对表进行查询转换,得到一个新的表Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");使用Table API对表进行查询转换,得到一个新的表Table table2 = tableEnv.from("inputTable").select(...);将得到的结果写入输出表TableResult tableResult = table1.executeInsert("outputTable");二、创建表环境对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的表环境(TableEnvironment)。主要负责:注册Catalog和表执行SQL查询注册用户自定义函数(UDF)DataStream和表之间的转换这里的Catalog就是目录,与标准SQL中的概念是一致的,主要用来管理所有数据库(database)和表(table)的元数据(metadata)。通过Catalog可以方便地对数据库和表进行查询的管理,所以可以认为我们所定义的表都会挂靠在某个目录下,这样就可以快速检索。在表环境中可以由用户自定义Catalog,并在其中注册表和自定义函数(UDF)。默认的Catalog就叫作default_catalog。每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式。计划器默认使用blink planner。import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class CommonApiTest { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment talbeEnv = StreamTableEnvironment.create(env); //1。定义环境配置来创建表执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //2.基于blink版本planner进行批处理 EnvironmentSettings settings2 = EnvironmentSettings.newInstance() .inBatchMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv3 = TableEnvironment.create(settings2); } }三、创建表表(Table)是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。Flink中的表是由多个行数据构成的,每个行(Row)又可以有定义好的多个列(Column)字段。整体来看,表就是固定类型的数据组成的二维矩阵。为了方便的查询表,表环境中会维护一个目录(Catalog)和表的对应关系。所以表都是通过Catalog来进行注册创建的。表在环境中有一个唯一的ID,由三部分组成:目录(catalog)名,数据库(database)名,以及表名。在默认情况下,目录名为default_catalog,数据库名为default_database。所以如果我们直接创建一个叫做MyTable的表,它的ID就是:default_catalog.default_database.MyTable具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。1.连接器表(Connector Tables)最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。例如我们可以连接到Kafka或者文件系统,将存储在这些外部系统的数据以表的形式定义出来,这样对表的读写就可以通过连接器转换成对外部系统的读写了。当我们在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换。而当我们向这张表写入数据,连接器就会将数据输出(Sink)到外部系统中。在代码中,可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ('connector' = ... )");这里的TEMPORARY关键字可以省略。这里没有定义Catalog和Database,所以都是默认的,表的完整ID就是default_catalog.default_database.MyTable。如果希望使用自定义的目录名和库名,可以在环境中进行设置:tEnv.useCatalog("custom_catalog"); tEnv.useDatabase("custom_database");这样创建的表完整ID就变成了custom_catalog.custom_database.MyTable。之后在表环境中创建的所有表,ID也会都以custom_catalog.custom_database作为前缀。 //2。创建表 String creatDDL = "CREATE TABLE clickTable (" + "user STRING, " + "url STRING, " + "ts BIGINT " + ") WITH (" + " 'connector' = 'filesystem',"+ " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink" + "/input/clicks.txt'," + " 'format' = 'csv'" + ")"; tableEnv.executeSql(creatDDL); //创建一张用于输出的表 //2。创建表 String creatOutDDL = "CREATE TABLE outTable (" + "user STRING, " + "url STRING, " + "ts BIGINT " + ") WITH (" + " 'connector' = 'filesystem',"+ " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink" + "/input/output.txt'," + " 'format' = 'csv'" + ")"; tableEnv.executeSql(creatOutDDL);2.虚拟表(Virtual Tables)在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable ...");这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。得到的newTable是一个中间转换结果,如果之后又希望直接使用这个表执行SQL,又该怎么做呢?由于newTable是一个Table对象,并没有在表环境中注册。所以我们还需要将这个中间结果表注册到环境中,才能在SQL中使用:tableEnv.createTemporaryView("NewTable",newTable);我们发现,这里的注册其实是创建了一个虚拟表(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建虚拟视图(createTemporaryView)。视图之所以是虚拟的,是因为我们并不会直接保存这个表的内容,并没有实体。只是在用到这张表的时候,会将它对应的查询语句嵌入到SQL中。

0
0
0
浏览量2010
打酱油的后端

Flink系列Table API和SQL之:普通Top N和窗口Top N

一、普通Top N在Flink SQL中,是通过OVER聚合和一个条件筛选来实现Top N的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来。然后再用WHERE子句筛选行号小于等于N的那些行返回。基本语法如下:SELECT ... FROM ( SELECT ..., ROW_NUMBER() OVER( [PARTITION BY <字段1>[,<字段1>...]] ORDER BY <排序字段1> [asc|desc][,<排序字段2> [asc|desc]...] ) AS row_num FROM ...) WHERE row_num <= N [AND <其他条件>]OVER窗口定义,目的就是利用ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为row_num,并在外层的查询中以row_num<=N作为条件进行筛选,就可以得到根据排序字段统计的Top N结果了。二、普通Top N代码实现import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TopNExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //在创建表的DDL中直接定义时间属性 String createDDL = "create table clickTable (" + " `user` String, " + " url String, " + " ts bigint, " + " et AS TO_TIMESTAMP( from_unixtime(ts/1000))," + " watermark for et as et-interval '1' second " + ") with ( " + " 'connector' = 'filesystem'," + " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " + " 'format' = 'csv'" + ") "; tableEnv.executeSql(createDDL); //普通topN 选取当前所有用户中浏览量最大的两个 Table topNResultTable = tableEnv.sqlQuery("select user,cnt,row_num " + "from (" + " select*,row_number() over(" + " order by cnt desc" + " ) as row_num" + " from ( select user,count(url) as cnt from clickTable group by user)" + " ) where row_num<=2"); tableEnv.toChangelogStream(topNResultTable).print("top 2: "); env.execute(); } }top 2: > +I[Mary, 1, 1] top 2: > +I[Bob, 1, 2] top 2: > -U[Mary, 1, 1] top 2: > +U[Bob, 2, 1] top 2: > -U[Bob, 1, 2] top 2: > +U[Mary, 1, 2] top 2: > -U[Mary, 1, 2] top 2: > +U[Alice, 2, 2] top 2: > -U[Bob, 2, 1] top 2: > +U[Bob, 3, 1] top 2: > -U[Alice, 2, 2] top 2: > +U[Alice, 3, 2]三、窗口Top N除了直接对数据进行Top N的选取,也可以针对窗口来做Top N。具体来说,可以先做一个窗口聚合,将窗口信息window_start、window_end连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的Top N那样定义OVER窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前N行就可以得到结果。所以窗口Top N的实现就是窗口聚合与OVER聚合的结合使用。四、窗口Top N代码实现import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TopNExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //在创建表的DDL中直接定义时间属性 String createDDL = "create table clickTable (" + " `user` String, " + " url String, " + " ts bigint, " + " et AS TO_TIMESTAMP( from_unixtime(ts/1000))," + " watermark for et as et-interval '1' second " + ") with ( " + " 'connector' = 'filesystem'," + " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " + " 'format' = 'csv'" + ") "; tableEnv.executeSql(createDDL); //窗口topN 一段时间内活跃用户统计 String subQuery = " select user, count(url) as cnt, window_start, window_end " + " from table ( " + " tumble( table clickTable, descriptor(et), interval '10' second )" + ") " + " group by user, window_start, window_end "; Table windowTopNResultTable = tableEnv.sqlQuery("select user, cnt, row_num, window_end " + "from (" + " select * , row_number() over(" + " partition by window_start, window_end " + // 固定写法 " order by cnt desc" + " ) as row_num" + " from ( " + subQuery + " ) " + ") where row_num <= 2"); tableEnv.toDataStream(windowTopNResultTable).print("window Top N :"); env.execute(); } }window Top N :> +I[Bob, 2, 1, 1970-01-01T08:00:10] window Top N :> +I[Alice, 2, 2, 1970-01-01T08:00:10] window Top N :> +I[Bob, 1, 1, 1970-01-01T08:00:20] window Top N :> +I[Alice, 1, 1, 1970-01-01T08:00:30]

0
0
0
浏览量1862
打酱油的后端

Flink系列之:Savepoints

一、SavepointsSavepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork 或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,…) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(相对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。二、分配算子ID强烈建议你按照本节所述调整你的程序,以便将来能够升级你的程序。主要通过 uid(String) 方法手动指定算子 ID 。这些 ID 将用于恢复每个算子的状态。DataStream<String> stream = env. // Stateful source (e.g. Kafka) with ID .addSource(new StatefulSource()) .uid("source-id") // ID for the source operator .shuffle() // Stateful mapper with ID .map(new StatefulMapper()) .uid("mapper-id") // ID for the mapper // Stateless printing sink .print(); // Auto-generated ID这段代码使用Apache Flink框架创建了一个名为stream的数据流(DataStream类型)。代码中的每一个操作都代表了流处理的一步。首先,代码通过调用addSource方法添加一个名为StatefulSource的有状态源。这个源可以是任何有状态的数据源,比如Kafka。uid(“source-id”)方法用于给源操作符指定一个唯一的ID。接下来,代码调用shuffle方法对流进行一次shuffle操作,该操作可用于重新分区数据。然后,代码调用map方法,使用一个名为StatefulMapper的有状态映射函数对流进行映射操作。同样地,uid(“mapper-id”)方法用于给映射操作符指定一个唯一的ID。最后,代码调用print方法通过无状态的打印sink将流的内容输出到控制台。这个操作会自动生成一个ID。整个代码段构建了一个有状态的数据流处理图,其中包含了源操作符、映射操作符和一个打印sink。该数据流会根据源生成的数据进行一系列的转换和处理,并将结果打印输出。如果不手动指定 ID ,则会自动生成 ID 。只要这些 ID 不变,就可以从 Savepoint 自动恢复。生成的 ID 取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些 ID 。三、Savepoint 状态可以将 Savepoint 想象为每个有状态的算子保存一个映射“算子 ID ->状态”:Operator ID | State ------------+------------------------ source-id | State of StatefulSource mapper-id | State of StatefulMapper在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。四、算子你可以使用命令行客户端来触发 Savepoint,触发 Savepoint 并取消作业,从 Savepoint 恢复,以及删除 Savepoint。从 Flink 1.2.0 开始,还可以使用 webui 从 Savepoint 恢复。五、触发Savepoint当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录(参见:targetDirectory参数来控制该目录的位置。注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统(或者对象存储系统)上的位置。以 FsStateBackend 或 RocksDBStateBackend 为例:# Savepoint 目标目录 /savepoint/ # Savepoint 目录 /savepoint/savepoint-:shortjobid-:savepointid/ # Savepoint 文件包含 Checkpoint元数据 /savepoint/savepoint-:shortjobid-:savepointid/_metadata # Savepoint 状态 /savepoint/savepoint-:shortjobid-:savepointid/...从 1.11.0 开始,你可以通过移动(拷贝)savepoint 目录到任意地方,然后再进行恢复。在如下两种情况中不支持 savepoint 目录的移动:1)如果启用了 entropy injection :这种情况下,savepoint 目录不包含所有的数据文件,因为注入的路径会分散在各个路径中。 由于缺乏一个共同的根目录,因此 savepoint 将包含绝对路径,从而导致无法支持 savepoint 目录的迁移。2)作业包含了 task-owned state(比如 GenericWriteAhreadLog sink)。和 savepoint 不同,checkpoint 不支持任意移动文件,因为 checkpoint 可能包含一些文件的绝对路径。如果你使用 MemoryStateBackend 的话,metadata 和 savepoint 的数据都会保存在 _metadata 文件中,因此不要因为没看到目录下没有数据文件而困惑。注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对精确一次的接收器有副作用,为了确保精确一次的语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。六、Savepoint 格式你可以在 savepoint 的两种二进制格式之间进行选择:标准格式 - 一种在所有 state backends 间统一的格式,允许你使用一种状态后端创建 savepoint 后,使用另一种状态后端恢复这个 savepoint。这是最稳定的格式,旨在与之前的版本、模式、修改等保持最大兼容性。原生格式 - 标准格式的缺点是它的创建和恢复速度通常很慢。原生格式以特定于使用的状态后端的格式创建快照(例如 RocksDB 的 SST 文件)。以原生格式创建 savepoint 的能力在 Flink 1.15 中引入,在那之前 savepoint 都是以标准格式创建的。七、触发 Savepoint$ bin/flink savepoint :jobId [:targetDirectory这将触发 ID 为 :jobId 的作业的 Savepoint,并返回创建的 Savepoint 路径。 你需要此路径来恢复和删除 Savepoint 。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。$ bin/flink savepoint --type [native/canonical] :jobId [:targetDirectory]八、使用 YARN 触发 Savepoint$ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId这将触发 ID 为 :jobId 和 YARN 应用程序 ID :yarnAppId 的作业的 Savepoint,并返回创建的 Savepoint 的路径。九、使用 Savepoint 停止作业$ bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId这将自动触发 ID 为 :jobid 的作业的 Savepoint,并停止该作业。此外,你可以指定一个目标文件系统目录来存储 Savepoint 。该目录需要能被 JobManager(s) 和 TaskManager(s) 访问。你也可以指定创建 Savepoint 的格式。如果没有指定,会采用标准格式创建 Savepoint。十、从 Savepoint 恢复$ bin/flink run -s :savepointPath [:runArgs]这将提交作业并指定要从中恢复的 Savepoint 。 你可以给出 Savepoint 目录或 _metadata 文件的路径。十一、跳过无法映射的状态恢复默认情况下,resume 操作将尝试将 Savepoint 的所有状态映射回你要还原的程序。 如果删除了运算符,则可以通过 --allowNonRestoredState(short:-n)选项跳过无法映射到新程序的状态:十二、Restore 模式Restore 模式 决定了在 restore 之后谁拥有Savepoint 或者 externalized checkpoint的文件的所有权。在这种语境下 Savepoint 和 externalized checkpoint 的行为相似。 这里我们将它们都称为“快照”,除非另有明确说明。如前所述,restore 模式决定了谁来接管我们从中恢复的快照文件的所有权。快照可被用户或者 Flink 自身拥有。如果快照归用户所有,Flink 不会删除其中的文件,而且 Flink 不能依赖该快照中文件的存在,因为它可能在 Flink 的控制之外被删除。每种 restore 模式都有特定的用途。尽管如此,我们仍然认为默认的 NO_CLAIM 模式在大多数情况下是一个很好的折中方案,因为它在提供明确的所有权归属的同时只给恢复后第一个 checkpoint 带来较小的代价。你可以通过如下方式指定 restore 模式:$ bin/flink run -s :savepointPath -restoreMode :mode -n [:runArgs]十三、NO_CLAIM (默认的)在 NO_CLAIM 模式下,Flink 不会接管快照的所有权。它会将快照的文件置于用户的控制之中,并且永远不会删除其中的任何文件。该模式下可以从同一个快照上启动多个作业。为保证 Flink 不会依赖于该快照的任何文件,它会强制第一个(成功的) checkpoint 为全量 checkpoint 而不是增量的。这仅对state.backend: rocksdb 有影响,因为其他 backend 总是创建全量 checkpoint。一旦第一个全量的 checkpoint 完成后,所有后续的 checkpoint 会照常创建。所以,一旦一个 checkpoint 成功制作,就可以删除原快照。在此之前不能删除原快照,因为没有任何完成的 checkpoint,Flink 会在故障时尝试从初始的快照恢复。十四、CLAIM另一个可选的模式是 CLAIM 模式。该模式下 Flink 将声称拥有快照的所有权,并且本质上将其作为 checkpoint 对待:控制其生命周期并且可能会在其永远不会被用于恢复的时候删除它。因此,手动删除快照和从同一个快照上启动两个作业都是不安全的。Flink 会保持配置数量的 checkpoint。注意:Retained checkpoints 被存储在 <checkpoint_dir>/<job_id>/chk- 这样的目录中。Flink 不会接管 <checkpoint_dir>/<job_id> 目录的所有权,而只会接管 chk- 的所有权。Flink 不会删除旧作业的目录。Native 格式支持增量的 RocksDB savepoints。对于这些 savepoints,Flink 将所有 SST 存储在 savepoints 目录中。这意味着这些 savepoints 是自包含和目录可移动的。然而,在 CLAIM 模式下恢复时,后续的 checkpoints 可能会复用一些 SST 文件,这反过来会阻止在 savepoints 被清理时删除 savepoints 目录。 Flink 之后运行期间可能会删除复用的SST 文件,但不会删除 savepoints 目录。因此,如果在 CLAIM 模式下恢复,Flink 可能会留下一个空的 savepoints 目录。十五、LEGACYLegacy 模式是 Flink 在 1.15 之前的工作方式。该模式下 Flink 永远不会删除初始恢复的 checkpoint。同时,用户也不清楚是否可以删除它。导致该的问题原因是, Flink 会在用来恢复的 checkpoint 之上创建增量的 checkpoint,因此后续的 checkpoint 都有可能会依赖于用于恢复的那个 checkpoint。总而言之,恢复的 checkpoint 的所有权没有明确的界定。十六、删除 Savepoint$ bin/flink savepoint -d :savepointPath这将删除存储在 :savepointPath 中的 Savepoint。请注意,还可以通过常规文件系统操作手动删除 Savepoint ,而不会影响其他 Savepoint 或 Checkpoint(请记住,每个 Savepoint 都是自包含的)。 在 Flink 1.2 之前,使用上面的 Savepoint 命令执行是一个更乏味的任务。十七、配置你可以通过 state.savepoints.dir 配置 savepoint 的默认目录。 触发 savepoint 时,将使用此目录来存储 savepoint。 你可以通过使用触发器命令指定自定义目标目录来覆盖缺省值# 默认 Savepoint 目标目录 state.savepoints.dir: hdfs:///flink/savepoints如果既未配置缺省值也未指定自定义目标目录,则触发 Savepoint 将失败。注意: 目标目录必须是 JobManager(s) 和 TaskManager(s) 可访问的位置,例如,分布式文件系统上的位置。

0
0
0
浏览量1807