推荐 最新
打酱油的后端

Flink系列Table API和SQL之:开窗Over聚合

一、开窗(Over)聚合与标准SQL中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值,比如说,可以以每一行数据为基准,计算它之前1小时内所有数据的平均值,也可以计算它之前10个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的"开窗函数"。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口TVF聚合都是多对一的关系,将数据分组之后每组只会得到一个聚合结果。开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个多对多的关系。与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫做OVER聚合(Over Aggregation)。基本语法如下:SELECT <聚合函数> OVER ( [PARTITION BY <字段1>[,<字段2>,...]] ORDER BY <时间属性字段> <开窗范围>), ... FROM ...这里OVER关键字前面是一个聚合函数,会应用在后面OVER定义的窗口上。在OVER子句中主要有以下几个部分:PARTITION BY(可选):用来指定分区的键key,类似于GROUP BY的分组,这部分是可选的ORDER BY:OVER窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。不论哪种定义,数据都应该是以某种顺序排列好的,而表中的数据本身是无序的。所以在OVER子句中必须用ORDER BY明确地指出数据基于哪个字段排序。在Flink的流处理中,目前只支持按照时间属性的升序排列,所以这里ORDER BY后面的字段必须是定义好的时间属性。二、开窗范围对于开窗函数而言,还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由BETWEEN<下界> AND <上界> 来定义的,也就是"从下界到上界"的范围。目前支持的上界只能是CURRENT ROW,也就是定义一个从之前某一行到当前行的范围,所以一般的形式为:BETWEEN ... PRECEDING AND CURRENT ROW前面提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间作出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。范围间隔:范围间隔以RANGE为前缀,就是基于ORDER BY指定的时间字段去选择一个范围,一般就是当前行时间戳之前的一段时间,例如开窗范围选择当前行之前1小时的数据:RANGE BETWEENT INTERVAL '1' HOUR PRECEDING AND CURRENT ROW行间隔:行间隔以ROWS为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了,例如开窗范围选择当前行之前的5行数据(最终聚合会包括当前行,所以一共6条数据)ROWS BETWEEN 5 PRECEDING AND CURRENT ROW三、开窗(Over)聚合代码示例import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.time.Duration; import static org.apache.flink.table.api.Expressions.$; /** * Copyright (c) 2020-2030 尚硅谷 All Rights Reserved * <p> * Project: FlinkTutorial * <p> * Created by wushengran */ public class TimeAndWindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 1. 在创建表的DDL中直接定义时间属性 String createDDL = "CREATE TABLE clickTable (" + " user_name STRING, " + " url STRING, " + " ts BIGINT, " + " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " + " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + ") WITH (" + " 'connector' = 'filesystem', " + " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " + " 'format' = 'csv' " + ")"; tableEnv.executeSql(createDDL); // 2. 在流转换成Table时定义时间属性 SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource()) .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event event, long l) { return event.timestamp; } })); Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime()); clickTable.printSchema(); // 聚合查询转换 // 4. 开窗聚合 Table overWindowResultTable = tableEnv.sqlQuery("SELECT user_name, " + " avg(ts) OVER (" + " PARTITION BY user_name " + " ORDER BY et " + " ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" + ") AS avg_ts " + "FROM clickTable"); // 结果表转换成流打印输出 tableEnv.toDataStream(overWindowResultTable).print("over window: "); env.execute(); } }over window: > +I[Mary, 1000] over window: > +I[Bob, 2000] over window: > +I[Alice, 3000] over window: > +I[Bob, 2500] over window: > +I[Alice, 3500] over window: > +I[Bob, 6000] over window: > +I[Alice, 10333]

0
0
0
浏览量2013
打酱油的后端

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据一、引入flink相关依赖二、properties保存连接kafka的配置三、构建flink实时消费环境四、添加Kafka源和处理数据五、完整代码六、执行程序查看消费到的数据一、引入flink相关依赖 <groupId>com.bigdata</groupId> <artifactId>flink</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink.version>1.13.1</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>二、properties保存连接kafka的配置 //用properties保存kafka连接的相关配置 val properties = new Properties() properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092") properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"swlfalfal\";") properties.setProperty("security.protocol","SASL_PLAINTEXT") properties.setProperty("sasl.mechanism", "PLAIN") properties.setProperty("group.id","flink-test") properties.setProperty("auto.offset.reset","earliest")三、构建flink实时消费环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setRestartStrategy(RestartStrategies.noRestart())四、添加Kafka源和处理数据 val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String] ("debezium-test-optics_uds",new SimpleStringSchema(),properties)) lines.print() //触发执行 env.execute()五、完整代码import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import java.util.Properties object SourceKafka { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setRestartStrategy(RestartStrategies.noRestart()) //用properties保存kafka连接的相关配置 val properties = new Properties() properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092") properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"******\";") properties.setProperty("security.protocol","SASL_PLAINTEXT") properties.setProperty("sasl.mechanism", "PLAIN") properties.setProperty("group.id","flink-test") properties.setProperty("auto.offset.reset","earliest") //添加kafka源,并打印数据 val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String] ("debezium-test-optics_uds",new SimpleStringSchema(),properties)) lines.print() //触发执行 env.execute() } }六、执行程序查看消费到的数据{ "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"sid" }, { "type":"string", "optional":false, "field":"sname" }, { "type":"int64", "optional":false, "name":"io.debezium.time.Timestamp", "version":1, "field":"updatetime" }, { "type":"string", "optional":false, "field":"ssex" } ], "optional":true, "name":"debezium_test_optics_uds.Value", "field":"before" }, { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"sid" }, { "type":"string", "optional":false, "field":"sname" }, { "type":"int64", "optional":false, "name":"io.debezium.time.Timestamp", "version":1, "field":"updatetime" }, { "type":"string", "optional":false, "field":"ssex" } ], "optional":true, "name":"debezium_test_optics_uds.Value", "field":"after" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"version" }, { "type":"string", "optional":false, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"ts_ms" }, { "type":"string", "optional":true, "name":"io.debezium.data.Enum", "version":1, "parameters":{ "allowed":"true,last,false,incremental" }, "default":"false", "field":"snapshot" }, { "type":"string", "optional":false, "field":"db" }, { "type":"string", "optional":true, "field":"sequence" }, { "type":"string", "optional":true, "field":"table" }, { "type":"int64", "optional":false, "field":"server_id" }, { "type":"string", "optional":true, "field":"gtid" }, { "type":"string", "optional":false, "field":"file" }, { "type":"int64", "optional":false, "field":"pos" }, { "type":"int32", "optional":false, "field":"row" }, { "type":"int64", "optional":true, "field":"thread" }, { "type":"string", "optional":true, "field":"query" } ], "optional":false, "name":"io.debezium.connector.mysql.Source", "field":"source" }, { "type":"string", "optional":false, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"id" }, { "type":"int64", "optional":false, "field":"total_order" }, { "type":"int64", "optional":false, "field":"data_collection_order" } ], "optional":true, "field":"transaction" } ], "optional":false, "name":"debezium_test_optics_uds.Envelope" }, "payload":{ "before":null, "after":{ "sid":3600, "sname":"f", "updatetime":1661126400000, "ssex":"a" }, "source":{ "version":"1.9.6.Final", "connector":"mysql", "name":"debezium-uds8-optics8-test_1h", "ts_ms":1665155935000, "snapshot":"false", "db":"dw", "sequence":null, "table":"student", "server_id":223344, "gtid":null, "file":"mysql-bin.000012", "pos":6193972, "row":0, "thread":66072, "query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')" }, "op":"c", "ts_ms":1665155935640, "transaction":null } }

0
0
0
浏览量2022
打酱油的后端

Flink系列之:深入理解ttl和checkpoint,Flink SQL应用ttl案例

一、深入理解Flink TTLFlink TTL(Time To Live)是一种机制,用于设置数据的过期时间,控制数据在内存或状态中的存活时间。通过设置TTL,可以自动删除过期的数据,从而释放资源并提高性能。在Flink中,TTL可以应用于不同的组件和场景,包括窗口、状态和表。窗口:对于窗口操作,可以将TTL应用于窗口中的数据。当窗口中的数据过期时,Flink会自动丢弃这些数据,从而保持窗口中的数据只包含最新的和有效的内容。这样可以减少内存的使用,同时提高窗口操作的计算性能。状态:对于有状态的操作,如键控状态或算子状态,可以为状态设置TTL。当状态中的数据过期时,Flink会自动清理过期的状态,释放资源。这对于长时间运行的应用程序特别有用,可以避免状态无限增长,消耗过多的内存。表:在Flink中,TTL也可以应用于表。可以通过在CREATE TABLE语句的WITH子句中指定TTL的选项来设置表的过期时间。当表中的数据过期时,Flink会自动删除过期的数据行。这对于处理具有实效性(例如日志)的数据特别有用,可以自动清理过期的数据,保持表的内容的新鲜和有效。TTL在实际应用中的作用主要体现在以下几个方面:节省资源:通过设置合适的TTL,可以有效地管理和控制内存和状态的使用。过期的数据会被自动清理,释放资源。这样可以避免无效或过时的数据占用过多的资源,提高应用程序的性能和可扩展性。数据清理:对于具有实效性的数据,如日志数据,可以使用TTL自动清理过期的数据。这可以减少手动管理和维护数据的工作量,保持数据的新鲜和有效。数据一致性:通过设置合适的TTL,可以确保数据在一定时间内保持一致性。过期的数据不再被读取或使用,可以避免数据不一致性的问题。性能优化:TTL可以通过自动清理过期数据来优化查询和计算的性能。只有最新和有效的数据被保留,可以减少数据的处理量,提高计算效率。总而言之,TTL是Flink中一种重要的机制,用于控制数据的过期时间和生命周期。通过适当配置TTL,可以优化资源使用、提高系统性能,并保持数据的一致性和有效性。二、Flink SQL设置TTLFlink SQL中可以使用TTL(Time To Live)来设置数据的过期时间,以控制数据在内存或状态中的存留时间。通过设置TTL,可以自动删除过期的数据,从而节省资源并提高性能。要在Flink SQL中设置TTL,可以使用CREATE TABLE语句的WITH选项来指定TTL的配置。以下是一个示例:CREATE TABLE myTable ( id INT, name STRING, eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime - INTERVAL '5' MINUTE -- 定义Watermark ) WITH ( 'connector' = 'kafka', 'topic' = 'myTopic', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'ttl' = '10m' -- 设置TTL为10分钟 );在上述示例中,通过在CREATE TABLE语句的WITH子句中的’ttl’选项中指定TTL的值(10m),即设置数据在内存中的存活时间为10分钟。过期的数据会自动被删除。需要注意的是,引入TTL机制会增加一定的性能和资源开销。因此,在使用TTL时需要权衡好过期时间和系统的性能需求。三、Flink设置TTL在需要设置TTL的数据源或状态上,使用相应的API(例如DataStream API或KeyedState API)设置TTL值。// DataStream API dataStream.keyBy(<key_selector>).mapStateDescriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>)); // KeyedState API descriptor.enableTimeToLive(Duration.ofMillis(<ttl_in_milliseconds>));在Flink作业中配置TTL检查间隔(默认值为每分钟一次):state.backend.rocksdb.ttl.compaction.interval: <interval_in_milliseconds>四、深入理解checkpointFlink的Checkpoint是一种容错机制,用于在Flink作业执行过程中定期保存数据的一致性检查点。它可以保证作业在发生故障时能够从检查点恢复,并继续进行。下面是一些深入介绍Checkpoint的关键概念和特性:一致性保证:Flink的Checkpoint机制通过保存作业状态的快照来实现一致性保证。在Checkpoint期间,Flink会确保所有的输入数据都已经被处理,并将结果写入状态后再进行检查点的保存。这样可以确保在恢复时,从检查点恢复的作业状态仍然是一致的。保存顺序:Flink的Checkpoint机制保证了保存检查点的顺序。检查点的保存是有序的,即在一个检查点完成之前,不会开始下一个检查点的保存。这种有序的保存方式能够保证在恢复时按照检查点的顺序进行恢复。并行度一致性:Flink的Checkpoint机制能够保证在作业的不同并行任务之间保持一致性。即使在分布式的情况下,Flink也能够确保所有并行任务在某个检查点的位置上都能保持一致。这是通过分布式快照算法和超时机制来实现的。可靠性保证:Flink的Checkpoint机制对于作业的故障恢复非常可靠。当一个任务发生故障时,Flink会自动从最近的检查点进行恢复。如果某个检查点无法满足一致性要求,Flink会自动选择前一个检查点进行恢复,以确保作业能够在一个一致的状态下继续执行。容错机制:Flink的Checkpoint机制提供了容错机制来应对各种故障情况。例如,如果某个任务在保存检查点时失败,Flink会尝试重新保存检查点,直到成功为止。此外,Flink还支持增量检查点,它可以在不保存整个作业状态的情况下只保存修改的部分状态,从而提高了保存检查点的效率。高可用性:Flink的Checkpoint机制还提供了高可用性的选项。可以将检查点数据保存在分布式文件系统中,以防止单点故障。此外,还可以配置备份作业管理器(JobManager)和任务管理器(TaskManager)以确保在某个节点发生故障时能够快速恢复。总结起来,Flink的Checkpoint机制是一种强大且可靠的容错机制,它能够确保作业在发生故障时能够从一致性检查点恢复,并继续进行。通过保存作业状态的快照,Flink能够保证作业的一致性,并提供了高可用性和高效率的保存和恢复机制。Checkpoint是Flink中一种重要的容错机制,用于保证作业在发生故障时能够从上一次检查点恢复,并继续进行处理,从而实现容错性。以下是Checkpoint的主要用途:容错和故障恢复:Checkpoint可以将作业的状态和数据进行持久化,当发生故障时,Flink可以使用最近的检查点来恢复作业的状态和数据,从而避免数据丢失,并继续处理未完成的任务。Exactly-Once语义:通过将检查点和事务(如果应用程序使用Flink的事务支持)结合起来,Flink可以实现Exactly-Once语义,确保结果的一致性和准确性。当作业从检查点恢复时,它将只会处理一次输入数据,并产生一次输出,避免了重复和丢失的数据写入。冷启动和部署:可以使用检查点来实现作业的冷启动,即在作业启动时,从最近的检查点恢复状态和数据,并从上一次检查点的位置继续处理。这对于在作业启动或重新部署时非常有用,可以快速恢复到之前的状态,减少恢复所需的时间。跨版本迁移:当使用不同版本的Flink或更改作业的代码时,可以使用检查点将作业从旧的版本转移到新的版本,从而实现跨版本迁移。总之,Checkpoint是Flink中的关键机制,其用途包括容错和故障恢复、Exactly-Once语义、冷启动和部署以及跨版本迁移。通过使用Checkpoint,可以提高作业的可靠性、一致性和可恢复性。五、Flink设置Checkpoint要设置Flink的Checkpoint和TTL,可以按照以下步骤进行操作:设置Checkpoint:在Flink作业中启用Checkpoint:可以通过在Flink配置文件(flink-conf.yaml)中设置以下属性来开启Checkpoint:execution.checkpointing.enabled: true设置Checkpoint间隔:可以通过以下属性设置Checkpoint的间隔时间(默认值为10秒):execution.checkpointing.interval: <interval_in_milliseconds>设置Checkpoint保存路径:可以通过以下属性设置Checkpoint文件的保存路径(默认为jobmanager根路径):state.checkpoints.dir: <checkpoint_directory_path>六、Flink SQL关联多张表在Flink SQL中,可以通过使用窗口操作来保证在一段时间内多张表的数据总能关联到。窗口操作可以用于基于时间的数据处理,将数据划分为窗口,并在每个窗口上执行关联操作。 下面是一个示例,演示如何在一段时间内关联多张表的数据: ```sql -- 创建两个输入表 CREATE TABLE table1 ( id INT, name STRING, eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'topic1', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' ); CREATE TABLE table2 ( id INT, value STRING, eventTime TIMESTAMP(3), WATERMARK FOR eventTime AS eventTime - INTERVAL '1' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'topic2', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' ); -- 执行关联操作 SELECT t1.id, t1.name, t2.value FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id AND t1.eventTime BETWEEN t2.eventTime - INTERVAL '5' MINUTE AND t2.eventTime + INTERVAL '5' MINUTE在上面的例子中,首先创建了两个输入表table1和table2,并分别指定了输入源(此处使用了Kafka作为示例输入源)。然后,在执行关联操作时,使用了通过窗口操作进行时间范围的过滤条件,即"t1.eventTime BETWEEN t2.eventTime - INTERVAL ‘5’ MINUTE AND t2.eventTime + INTERVAL ‘5’ MINUTE",确保了在一段时间内两张表的数据能够关联到。通过使用窗口操作,可以根据具体的时间范围来进行数据关联,从而保证在一段时间内多张表的数据总能关联到。七、Flink SQL使用TTL关联多表Flink还提供了Time-To-Live (TTL)功能,可以用于在表中定义数据的生存时间。当数据的时间戳超过定义的TTL时,Flink会自动将其从表中删除。这在处理实时数据时非常有用,可以自动清理过期的数据。在Flink中使用TTL可以通过创建表时指定TTL属性来实现,如下所示:CREATE TABLE myTable ( id INT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND, PRIMARY KEY (id) NOT ENFORCED, TTL (event_time) AS event_time + INTERVAL '1' HOUR ) WITH ( 'connector.type' = 'kafka', ... )在这个例子中,表myTable定义了一个event_time列,并使用TTL函数指定了数据的生存时间为event_time加上1小时。当数据的event_time超过1小时时,Flink会自动删除这些数据。通过在Flink SQL中同时使用JOIN和TTL,你可以实现多张表的关联,并根据指定的条件删除过期的数据,从而更灵活地处理和管理数据。

0
0
0
浏览量2018
打酱油的后端

Flink系列之:Table API Connectors之Raw Format

一、Raw FormatRaw format 允许读写原始(基于字节)值作为单个列。注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。Raw format 连接器是内置的。二、示例例如,你可能在 Kafka 中具有原始日志数据,并希望使用 Flink SQL 读取和分析此类数据。47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"下面的代码创建了一张表,使用 raw format 以 UTF-8 编码的形式从中读取(也可以写入)底层的 Kafka topic 作为匿名字符串值:CREATE TABLE nginx_log ( log STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'nginx_log', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'raw' )然后,你可以将原始数据读取为纯字符串,之后使用用户自定义函数将其分为多个字段进行进一步分析。例如 示例中的 my_split。SELECT t.hostname, t.datetime, t.url, t.browser, ... FROM( SELECT my_split(log) as t FROM nginx_log );相对应的,你也可以将一个 STRING 类型的列以 UTF-8 编码的匿名字符串值写入 Kafka topic。三、Format 参数参数 是否必选 默认值类型描述format必选 (none)String指定要使用的格式, 这里应该是 ‘raw’。raw.charset可选 UTF-8String指定字符集来编码文本字符串。raw.endianness可选 big-endianString指定字节序来编码数字值的字节。有效值为’big-endian’和’little-endian’。四、数据类型映射下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。

0
0
0
浏览量1981
打酱油的后端

Flink系列之:集合操作

一、集合操作适用于流、批操作二、UNIONUNION 和 UNION ALL 返回两个表中的数据。 UNION 会去重,UNION ALL 不会去重。Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c'); Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b'); Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2); +---+ | s| +---+ | c| | a| | b| | d| | e| +---+ Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2); +---+ | c| +---+ | c| | a| | b| | b| | c| | d| | e| | a| | b| | b| +---+三、INTERSECTINTERSECT 和 INTERSECT ALL 返回两个表中共有的数据。 INTERSECT 会去重,INTERSECT ALL 不会去重。Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2); +---+ | s| +---+ | a| | b| +---+ Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2); +---+ | s| +---+ | a| | b| | b| +---+四、EXCEPTEXCEPT 和 EXCEPT ALL 返回在一个表中存在,但在另一个表中不存在数据。 EXCEPT 会去重,EXCEPT ALL不会去重。Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2); +---+ | s | +---+ | c | +---+ Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2); +---+ | s | +---+ | c | | c | +---+五、IN如果表达式(可以是列,也可以是函数等)存在于子查询的结果中,则返回 true。子查询的表结果必须由一列组成。此列必须与表达式具有相同的数据类型。SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts )优化器会把 IN 条件重写为 join 和 group 操作。对于流式查询,计算查询结果所需的状态可能会根据输入行数而无限增长。你可以设置一个合适的状态 time-to-live(TTL)来淘汰过期数据以防止状态过大。注意:这可能会影响查询结果的正确性。六、XISTSSELECT user, amount FROM Orders WHERE product EXISTS ( SELECT product FROM NewProducts )如果子查询返回至少一行,则为 true。只支持能被重写为 join 和 group 的操作。优化器会把 EXIST 重写为 join 和 group 操作.对于流式查询,计算查询结果所需的状态可能会根据输入行数而无限增长。你可以设置一个合适的状态 time-to-live(TTL)来淘汰过期数据以防止状态过大。注意:这可能会影响查询结果的正确性。

0
0
0
浏览量1994
打酱油的后端

PyFlink系列之一:PyFlink安装和PyFlink使用的详细技术

一、下载PyFlink命令行下载PyFlink:pip install apache-flinkPycharm下载PyFlink:二、创建TableEnvironment创建 TableEnvironment 的推荐方式是通过 EnvironmentSettings 对象创建:from pyflink.table import EnvironmentSettings, TableEnvironment # create a streaming TableEnvironment env_settings = EnvironmentSettings.in_streaming_mode() # or a batch TableEnvironment # env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) 或者,用户可以从现有的 StreamExecutionEnvironment 创建 StreamTableEnvironment,以与 DataStream API 进行互操作。from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment # create a streaming TableEnvironment from a StreamExecutionEnvironment env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env) 三、TableEnvironment API1.Table/SQL 操作这些 APIs 用来创建或者删除 Table API/SQL 表和写查询:APIs描述from_elements(elements, schema=None, verify_schema=True)通过元素集合来创建表。from_pandas(pdf, schema=None, split_num=1)通过 pandas DataFrame 来创建表。from_path(path)通过指定路径下已注册的表来创建一个表,例如通过 create_temporary_view 注册表。sql_query(query)执行一条 SQL 查询,并将查询的结果作为一个 Table 对象。create_temporary_view(view_path, table) 将一个 Table 对象注册为一张临时表,类似于 SQL 的临时表。drop_temporary_view(view_path) 删除指定路径下已注册的临时表drop_temporary_table(table_path)删除指定路径下已注册的临时表。 你可以使用这个接口来删除临时 source 表和临时 sink 表。execute_sql(stmt)执行指定的语句并返回执行结果。 执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。注意,对于 “INSERT INTO” 语句,这是一个异步操作,通常在向远程集群提交作业时才需要使用。 但是,如果在本地集群或者 IDE 中执行作业时,你需要等待作业执行完成2.执行/解释作业这些 APIs 是用来执行/解释作业。注意,execute_sql API 也可以用于执行作业。APIs 描述explain_sql(stmt, *extra_details) 返回指定语句的抽象语法树和执行计划。create_statement_set() 创建一个可接受 DML 语句或表的 StatementSet 实例。 它可用于执行包含多个 sink 的作业。3.创建/删除用户自定义函数这些 APIs 用来注册 UDFs 或者 删除已注册的 UDFs。 注意,execute_sql API 也可以用于注册/删除 UDFs。APIs描述create_temporary_function(path, function)将一个 Python 用户自定义函数注册为临时 catalog 函数。create_temporary_system_function(name, function) 将一个 Python 用户自定义函数注册为临时系统函数。 如果临时系统函数的名称与临时 catalog 函数名称相同,优先使用临时系统函数。create_java_function(path, function_class_name, ignore_if_exists=None)将 Java 用户自定义函数注册为指定路径下的 catalog 函数。 如果 catalog 是持久化的,则可以跨多个 Flink 会话和集群使用已注册的 catalog 函数。create_java_temporary_function(path, function_class_name)将 Java 用户自定义函数注册为临时 catalog 函数。create_java_temporary_system_function(name, function_class_name)将 Java 用户定义的函数注册为临时系统函数。drop_function(path)删除指定路径下已注册的 catalog 函数。drop_temporary_function(path)删除指定名称下已注册的临时系统函数。drop_temporary_system_function(name)删除指定名称下已注册的临时系统函数。4.依赖管理这些 APIs 用来管理 Python UDFs 所需要的 Python 依赖。APIs 描述add_python_file(file_path)添加 Python 依赖,可以是 Python 文件,Python 包或者本地目录。 它们将会被添加到 Python UDF 工作程序的 PYTHONPATH 中。set_python_requirements(requirements_file_path, requirements_cache_dir=None) 指定一个 requirements.txt 文件,该文件定义了第三方依赖关系。 这些依赖项将安装到一个临时 catalog 中,并添加到 Python UDF 工作程序的 PYTHONPATH 中。add_python_archive(archive_path, target_dir=None) 添加 Python 归档文件。该文件将被解压到 Python UDF 程序的工作目录中。5.配置APIs描述get_config()返回 table config,可以通过 table config 来定义 Table API 的运行时行为。下面的代码示例展示了如何通过这个 API 来设置配置选项:# set the parallelism to 8 table_env.get_config().get_configuration().set_string("parallelism.default", "8") 四、Catalog APIs这些 APIs 用于访问 catalog 和模块五、Statebackend,Checkpoint 以及重启策略在 Flink 1.10 之前,你可以通过 StreamExecutionEnvironment 来配置 statebackend,checkpointing 以及重启策略。 现在你可以通过在 TableConfig 中,通过设置键值选项来配置它们。下面代码示例展示了如何通过 Table API 来配置 statebackend,checkpoint 以及重启策略:# 设置重启策略为 "fixed-delay" table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay") table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3") table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s") # 设置 checkpoint 模式为 EXACTLY_ONCE table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE") table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min") # 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager" # 你也可以将这个属性设置为 StateBackendFactory 的完整类名 # e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory table_env.get_config().get_configuration().set_string("state.backend", "rocksdb") # 设置 RocksDB statebackend 所需要的 checkpoint 目录 table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/") 六、Table APITable API 是批处理和流处理的统一的关系型 API。Table API 的查询不需要修改代码就可以采用批输入或流输入来运行。Table API 是 SQL 语言的超集,并且是针对 Apache Flink 专门设计的。Table API 集成了 Scala,Java 和 Python 语言的 API。Table API 的查询是使用 Java,Scala 或 Python 语言嵌入的风格定义的,有诸如自动补全和语法校验的 IDE 支持,而不是像普通 SQL 一样使用字符串类型的值来指定查询。Table API 和 Flink SQL 共享许多概念以及部分集成的 API。通过查看公共概念 & API来学习如何注册表或如何创建一个表对象。流概念页面讨论了诸如动态表和时间属性等流特有的概念。下面的例子中假定有一张叫 Orders 的表,表中有属性 (a, b, c, rowtime) 。rowtime 字段是流任务中的逻辑时间属性或是批任务中的普通时间戳字段。概述 & 示例Table API 支持 Scala, Java 和 Python 语言。Scala 语言的 Table API 利用了 Scala 表达式,Java 语言的 Table API 支持 DSL 表达式和解析并转换为等价表达式的字符串,Python 语言的 Table API 仅支持解析并转换为等价表达式的字符串。下面的例子展示了 Scala、Java 和 Python 语言的 Table API 的不同之处。表程序是在批环境下执行的。程序扫描了 Orders 表,通过字段 a 进行分组,并计算了每组结果的行数。JavaJava 的 Table API 通过引入 org.apache.flink.table.api.java.* 来使用。下面的例子展示了如何创建一个 Java 的 Table API 程序,以及表达式是如何指定为字符串的。 使用DSL表达式时也需要引入静态的 org.apache.flink.table.api.Expressions.*。import org.apache.flink.table.api.*; import static org.apache.flink.table.api.Expressions.*; EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); // 在表环境中注册 Orders 表 // ... // 指定表程序 Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) Table counts = orders .groupBy($("a")) .select($("a"), $("b").count().as("cnt")); // 打印 counts.execute().print(); ScalaScala 的 Table API 通过引入 org.apache.flink.table.api.、org.apache.flink.api.scala. 和 org.apache.flink.table.api.bridge.scala._(开启数据流的桥接支持)来使用。下面的例子展示了如何创建一个 Scala 的 Table API 程序。通过 Scala 的带美元符号($)的字符串插值来实现表字段引用。import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // 环境配置 val settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); val tEnv = TableEnvironment.create(settings); // 在表环境中注册 Orders 表 // ... // 指定表程序 val orders = tEnv.from("Orders") // schema (a, b, c, rowtime) val result = orders .groupBy($"a") .select($"a", $"b".count as "cnt") .execute() .print() Python下面的例子展示了如何创建一个 Python 的 Table API 程序,以及表达式是如何指定为字符串的。from pyflink.table import * # 环境配置 t_env = TableEnvironment.create( environment_settings=EnvironmentSettings.in_batch_mode()) # 在表环境中注册 Orders 表和结果 sink 表 source_data_path = "/path/to/source/directory/" result_data_path = "/path/to/result/directory/" source_ddl = f""" create table Orders( a VARCHAR, b BIGINT, c BIGINT, rowtime TIMESTAMP(3), WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{source_data_path}' ) """ t_env.execute_sql(source_ddl) sink_ddl = f""" create table `Result`( a VARCHAR, cnt BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{result_data_path}' ) """ t_env.execute_sql(sink_ddl) # 指定表程序 orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait() 下一个例子展示了一个更加复杂的 Table API 程序。这个程序也扫描 Orders 表。程序过滤了空值,使字符串类型的字段 a 标准化,并且每个小时进行一次计算并返回 a 的平均账单金额 b。Java// 环境配置 // ... // 指定表程序 Table orders = tEnv.from("Orders"); // schema (a, b, c, rowtime) Table result = orders .filter( and( $("a").isNotNull(), $("b").isNotNull(), $("c").isNotNull() )) .select($("a").lowerCase().as("a"), $("b"), $("rowtime")) .window(Tumble.over(lit(1).hours()).on($("rowtime")).as("hourlyWindow")) .groupBy($("hourlyWindow"), $("a")) .select($("a"), $("hourlyWindow").end().as("hour"), $("b").avg().as("avgBillingAmount")); Scala// 环境配置 // ... // 指定表程序 val orders: Table = tEnv.from("Orders") // schema (a, b, c, rowtime) val result: Table = orders .filter($"a".isNotNull && $"b".isNotNull && $"c".isNotNull) .select($"a".lowerCase() as "a", $"b", $"rowtime") .window(Tumble over 1.hour on $"rowtime" as "hourlyWindow") .groupBy($"hourlyWindow", $"a") .select($"a", $"hourlyWindow".end as "hour", $"b".avg as "avgBillingAmount") Python# 指定表程序 from pyflink.table.expressions import col, lit orders = t_env.from_path("Orders") # schema (a, b, c, rowtime) result = orders.filter(orders.a.is_not_null & orders.b.is_not_null & orders.c.is_not_null) \ .select(orders.a.lower_case.alias('a'), orders.b, orders.rowtime) \ .window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \ .group_by(col('hourly_window'), col('a')) \ .select(col('a'), col('hourly_window').end.alias('hour'), b.avg.alias('avg_billing_amount')) 因为 Table API 的批数据 API 和流数据 API 是统一的,所以这两个例子程序不需要修改代码就可以运行在流输入或批输入上。在这两种情况下,只要流任务没有数据延时,程序将会输出相同的结果。七、OperationsTable API支持如下操作。请注意不是所有的操作都可以既支持流也支持批;这些操作都具有相应的标记。1.From:Batch Streaming和 SQL 查询的 FROM 子句类似。 执行一个注册过的表的扫描。Java:Table orders = tableEnv.from("Orders");Scala:val orders = tableEnv.from("Orders")Python:orders = t_env.from_path("Orders")2.FromValues:Batch Streaming和 SQL 查询中的 VALUES 子句类似。 基于提供的行生成一张内联表。你可以使用 row(…) 表达式创建复合行:Java:Table table = tEnv.fromValues( row(1, "ABC"), row(2L, "ABCDE") ); Scala:table = tEnv.fromValues( row(1, "ABC"), row(2L, "ABCDE") ) Python:table = t_env.from_elements([(1, 'ABC'), (2, 'ABCDE')]) 这将生成一张结构如下的表:root |-- f0: BIGINT NOT NULL // original types INT and BIGINT are generalized to BIGINT |-- f1: VARCHAR(5) NOT NULL // original types CHAR(3) and CHAR(5) are generalized // to VARCHAR(5). VARCHAR is used instead of CHAR so that // no padding is applied 这个方法会根据输入的表达式自动获取类型。如果在某一个特定位置的类型不一致,该方法会尝试寻找一个所有类型的公共超类型。如果公共超类型不存在,则会抛出异常。你也可以明确指定所需的类型。指定如 DECIMAL 这样的一般类型或者给列命名可能是有帮助的。Java:Table table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("name", DataTypes.STRING()) ), row(1, "ABC"), row(2L, "ABCDE") ); Scala:val table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("name", DataTypes.STRING()) ), row(1, "ABC"), row(2L, "ABCDE") ) Python:table = t_env.from_elements( [(1, 'ABC'), (2, 'ABCDE')], schema=DataTypes.Row([DataTypes.FIELD('id', DataTypes.DECIMAL(10, 2)), DataTypes.FIELD('name', DataTypes.STRING())]))这将生成一张结构如下的表:root |-- id: DECIMAL(10, 2) |-- name: STRING 3.Select:Batch Streaming和 SQL 的 SELECT 子句类似。 执行一个 select 操作。JavaTable orders = tableEnv.from("Orders"); Table result = orders.select($("a"), $("c").as("d"));Scalaval orders = tableEnv.from("Orders") Table result = orders.select($"a", $"c" as "d");Pythonorders = t_env.from_path("Orders") result = orders.select(orders.a, orders.c.alias('d'))可以选择星号(*)作为通配符,select 表中的所有列。JavaTable result = orders.select($("*")); ScalaTable result = orders.select($"*")Pythonfrom pyflink.table.expressions import col result = orders.select(col("*"))4.As:Batch Streaming重命名字段。JavaTable orders = tableEnv.from("Orders"); Table result = orders.as("x, y, z, t");scalaval orders: Table = tableEnv.from("Orders").as("x", "y", "z", "t")Pythonorders = t_env.from_path("Orders") result = orders.alias("x, y, z, t")5.Where / Filter:Batch Streaming和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。Java:Table orders = tableEnv.from("Orders"); Table result = orders.where($("b").isEqual("red")); Table orders = tableEnv.from("Orders"); Table result = orders.filter($("b").isEqual("red")); Scala:val orders: Table = tableEnv.from("Orders") val result = orders.filter($"a" % 2 === 0) val orders: Table = tableEnv.from("Orders") val result = orders.filter($"a" % 2 === 0) Python:orders = t_env.from_path("Orders") result = orders.where(orders.a == 'red') orders = t_env.from_path("Orders") result = orders.filter(orders.a == 'red') 八、列操作1.AddColumns:Batch Streaming执行字段添加操作。 如果所添加的字段已经存在,将抛出异常。JavaTable orders = tableEnv.from("Orders"); Table result = orders.addColumns(concat($("c"), "sunny")); Scalaval orders = tableEnv.from("Orders"); val result = orders.addColumns(concat($"c", "Sunny")) Pythonfrom pyflink.table.expressions import concat orders = t_env.from_path("Orders") result = orders.add_columns(concat(orders.c, 'sunny')) 2.AddOrReplaceColumns:Batch Streaming执行字段添加操作。 如果添加的列名称和已存在的列名称相同,则已存在的字段将被替换。 此外,如果添加的字段里面有重复的字段名,则会使用最后一个字段。JavaTable orders = tableEnv.from("Orders"); Table result = orders.addOrReplaceColumns(concat($("c"), "sunny").as("desc")); Scalaval orders = tableEnv.from("Orders"); val result = orders.addOrReplaceColumns(concat($"c", "Sunny") as "desc") Pythonfrom pyflink.table.expressions import concat orders = t_env.from_path("Orders") result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc')) 3.DropColumns:Batch StreamingJavaable orders = tableEnv.from("Orders"); Table result = orders.dropColumns($("b"), $("c")); Scalaval orders = tableEnv.from("Orders"); val result = orders.dropColumns($"b", $"c") Pythonorders = t_env.from_path("Orders") result = orders.drop_columns(orders.b, orders.c) 4.RenameColumns:Batch Streaming执行字段重命名操作。 字段表达式应该是别名表达式,并且仅当字段已存在时才能被重命名。JavaTable orders = tableEnv.from("Orders"); Table result = orders.renameColumns($("b").as("b2"), $("c").as("c2")); Scalaval orders = tableEnv.from("Orders"); val result = orders.renameColumns($"b" as "b2", $"c" as "c2") Pythonorders = t_env.from_path("Orders") result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2')) 九、Aggregations1.GroupBy Aggregation:Batch Streaming和 SQL 的 GROUP BY 子句类似。 使用分组键对行进行分组,使用伴随的聚合算子来按照组进行聚合行。JavaTable orders = tableEnv.from("Orders"); Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d")); Scalaval orders: Table = tableEnv.from("Orders") val result = orders.groupBy($"a").select($"a", $"b".sum().as("d")) Pythonorders = t_env.from_path("Orders") result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d')) 2.GroupBy Window Aggregation:Batch Streaming使用分组窗口结合单个或者多个分组键对表进行分组和聚合。JavaTable orders = tableEnv.from("Orders"); Table result = orders .window(Tumble.over(lit(5).minutes()).on($("rowtime")).as("w")) // 定义窗口 .groupBy($("a"), $("w")) // 按窗口和键分组 // 访问窗口属性并聚合 .select( $("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").sum().as("d") ); Scalaval orders: Table = tableEnv.from("Orders") val result: Table = orders .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口 .groupBy($"a", $"w") // 按窗口和键分组 .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".sum as "d") // 访问窗口属性并聚合 Pythonfrom pyflink.table.window import Tumble from pyflink.table.expressions import lit, col orders = t_env.from_path("Orders") result = orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \ .group_by(orders.a, col('w')) \ .select(orders.a, col('w').start, col('w').end, orders.b.sum.alias('d')) 3.Over Window Aggregation和 SQL 的 OVER 子句类似。JavaTable orders = tableEnv.from("Orders"); Table result = orders // 定义窗口 .window( Over .partitionBy($("a")) .orderBy($("rowtime")) .preceding(UNBOUNDED_RANGE) .following(CURRENT_RANGE) .as("w")) // 滑动聚合 .select( $("a"), $("b").avg().over($("w")), $("b").max().over($("w")), $("b").min().over($("w")) ); Scalaval orders: Table = tableEnv.from("Orders") val result: Table = orders // 定义窗口 .window( Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE following CURRENT_RANGE as "w") .select($"a", $"b".avg over $"w", $"b".max().over($"w"), $"b".min().over($"w")) // 滑动聚合 Pythonfrom pyflink.table.window import Over from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE orders = t_env.from_path("Orders") result = orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime) .preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE) .alias("w")) \ .select(orders.a, orders.b.avg.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) 所有的聚合必须定义在同一个窗口上,比如同一个分区、排序和范围内。目前只支持 PRECEDING 到当前行范围(无界或有界)的窗口。尚不支持 FOLLOWING 范围的窗口。ORDER BY 操作必须指定一个单一的时间属性。4.Distinct Aggregation:Batch Streaming和 SQL DISTINCT 聚合子句类似,例如 COUNT(DISTINCT a)。 Distinct 聚合声明的聚合函数(内置或用户定义的)仅应用于互不相同的输入值。 Distinct 可以应用于 GroupBy Aggregation、GroupBy Window Aggregation 和 Over Window Aggregation。Java:Table orders = tableEnv.from("Orders"); // 按属性分组后的的互异(互不相同、去重)聚合 Table groupByDistinctResult = orders .groupBy($("a")) .select($("a"), $("b").sum().distinct().as("d")); // 按属性、时间窗口分组后的互异(互不相同、去重)聚合 Table groupByWindowDistinctResult = orders .window(Tumble .over(lit(5).minutes()) .on($("rowtime")) .as("w") ) .groupBy($("a"), $("w")) .select($("a"), $("b").sum().distinct().as("d")); // over window 上的互异(互不相同、去重)聚合 Table result = orders .window(Over .partitionBy($("a")) .orderBy($("rowtime")) .preceding(UNBOUNDED_RANGE) .as("w")) .select( $("a"), $("b").avg().distinct().over($("w")), $("b").max().over($("w")), $("b").min().over($("w")) ); Scala:val orders: Table = tableEnv.from("Orders"); // 按属性分组后的的互异(互不相同、去重)聚合 val groupByDistinctResult = orders .groupBy($"a") .select($"a", $"b".sum.distinct as "d") // 按属性、时间窗口分组后的互异(互不相同、去重)聚合 val groupByWindowDistinctResult = orders .window(Tumble over 5.minutes on $"rowtime" as "w").groupBy($"a", $"w") .select($"a", $"b".sum.distinct as "d") // over window 上的互异(互不相同、去重)聚合 val result = orders .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as $"w") .select($"a", $"b".avg.distinct over $"w", $"b".max over $"w", $"b".min over $"w") Python:from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE orders = t_env.from_path("Orders") # 按属性分组后的的互异(互不相同、去重)聚合 group_by_distinct_result = orders.group_by(orders.a) \ .select(orders.a, orders.b.sum.distinct.alias('d')) # 按属性、时间窗口分组后的互异(互不相同、去重)聚合 group_by_window_distinct_result = orders.window( Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a, col('w')) \ .select(orders.a, orders.b.sum.distinct.alias('d')) # over window 上的互异(互不相同、去重)聚合 result = orders.over_window(Over .partition_by(orders.a) .order_by(orders.rowtime) .preceding(UNBOUNDED_RANGE) .alias("w")) \ .select(orders.a, orders.b.avg.distinct.over(col('w')), orders.b.max.over(col('w')), orders.b.min.over(col('w'))) 用户定义的聚合函数也可以与 DISTINCT 修饰符一起使用。如果计算不同(互异、去重的)值的聚合结果,则只需向聚合函数添加 distinct 修饰符即可。JavaTable orders = tEnv.from("Orders"); // 对 user-defined aggregate functions 使用互异(互不相同、去重)聚合 tEnv.registerFunction("myUdagg", new MyUdagg()); orders.groupBy("users") .select( $("users"), call("myUdagg", $("points")).distinct().as("myDistinctResult") ); Scalaval orders: Table = tableEnv.from("Orders") val result = orders.distinct() Pythonorders = t_env.from_path("Orders") result = orders.distinct() 十、Joins1.Inner Join:Batch Streaming和 SQL 的 JOIN 子句类似。关联两张表。两张表必须有不同的字段名,并且必须通过 join 算子或者使用 where 或 filter 算子定义至少一个 join 等式连接谓词。Java:Table orders = tableEnv.from("Orders"); Table result = orders.distinct(); Scala:val left = tableEnv.from("MyTable").select($"a", $"b", $"c") val right = tableEnv.from("MyTable").select($"d", $"e", $"f") val result = left.join(right).where($"a" === $"d").select($"a", $"b", $"e") Python:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) result = left.join(right).where(left.a == right.d).select(left.a, left.b, right.e) 2.Outer Join:Batch Streaming和 SQL LEFT/RIGHT/FULL OUTER JOIN 子句类似。 关联两张表。 两张表必须有不同的字段名,并且必须定义至少一个等式连接谓词。Java:Table left = tableEnv.from("MyTable).select($("a"), $("b"), $("c")); Table right = tableEnv.from("MyTable).select($("d"), $("e"), $("f")); Table leftOuterResult = left.leftOuterJoin(right, $("a").isEqual($("d"))) .select($("a"), $("b"), $("e")); Table rightOuterResult = left.rightOuterJoin(right, $("a").isEqual($("d"))) .select($("a"), $("b"), $("e")); Table fullOuterResult = left.fullOuterJoin(right, $("a").isEqual($("d"))) .select($("a"), $("b"), $("e")); Scala:val left = tableEnv.from("MyTable").select($"a", $"b", $"c") val right = tableEnv.from("MyTable").select($"d", $"e", $"f") val leftOuterResult = left.leftOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e") val rightOuterResult = left.rightOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e") val fullOuterResult = left.fullOuterJoin(right, $"a" === $"d").select($"a", $"b", $"e") Python:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) 3.Interval Join:Batch StreamingInterval join 是可以通过流模式处理的常规 join 的子集。Interval join 至少需要一个 equi-join 谓词和一个限制双方时间界限的 join 条件。这种条件可以由两个合适的范围谓词(<、<=、>=、>)或一个比较两个输入表相同时间属性(即处理时间或事件时间)的等值谓词来定义。Java:Table left = tableEnv.from("MyTable).select($("a"), $("b"), $("c"), $("ltime")); Table right = tableEnv.from("MyTable).select($("d"), $("e"), $("f"), $("rtime")); Table result = left.join(right) .where( and( $("a").isEqual($("d")), $("ltime").isGreaterOrEqual($("rtime").minus(lit(5).minutes())), $("ltime").isLess($("rtime").plus(lit(10).minutes())) )) .select($("a"), $("b"), $("e"), $("ltime")); Scala:val left = tableEnv.from("MyTable").select($"a", $"b", $"c", $"ltime") val right = tableEnv.from("MyTable").select($"d", $"e", $"f", $"rtime") val result = left.join(right) .where($"a" === $"d" && $"ltime" >= $"rtime" - 5.minutes && $"ltime" < $"rtime" + 10.minutes) .select($"a", $"b", $"e", $"ltime") Python:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'), col('rowtime1')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'), col('rowtime2')) joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >= right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 + lit(2).seconds)) result = joined_table.select(joined_table.a, joined_table.b, joined_table.e, joined_table.rowtime1) 4.Inner Join with Table Function (UDTF):Batch Streamingjoin 表和表函数的结果。左(外部)表的每一行都会 join 表函数相应调用产生的所有行。 如果表函数调用返回空结果,则删除左侧(外部)表的一行。Java:Table orders = tableEnv.from("Orders"); Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d")); Scala Python Scala:// 实例化 User-Defined Table Function val split: TableFunction[_] = new MySplitUDTF() // join val result: Table = table .joinLateral(split($"c") as ("s", "t", "v")) .select($"a", $"b", $"s", $"t", $"v") Python:# 注册 User-Defined Table Function @udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT()]) def split(x): return [Row(1, 2, 3)] # join orders = t_env.from_path("Orders") joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v")) result = joined_table.select(joined_table.a, joined_table.b, joined_table.s, joined_table.t, joined_table.v) 5.Join with Temporal TableTemporal table 是跟踪随时间变化的表。Temporal table 函数提供对特定时间点 temporal table 状态的访问。表与 temporal table 函数进行 join 的语法和使用表函数进行 inner join 的语法相同。目前仅支持与 temporal table 的 inner join。Java:Table ratesHistory = tableEnv.from("RatesHistory"); // 注册带有时间属性和主键的 temporal table function TemporalTableFunction rates = ratesHistory.createTemporalTableFunction( "r_proctime", "r_currency"); tableEnv.registerFunction("rates", rates); // 基于时间属性和键与“Orders”表关联 Table orders = tableEnv.from("Orders"); Table result = orders .joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency"))); Python:目前不支持 Python 的 Table API。 十一、Set Operations1.Union:Batch和 SQL UNION 子句类似。Union 两张表会删除重复记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.union(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.union(right) 2.UnionAll:Batch Streaming和 SQL UNION ALL 子句类似。Union 两张表。 两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.unionAll(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.unionAll(right) 3.Intersect:Batch和 SQL INTERSECT 子句类似。Intersect 返回两个表中都存在的记录。如果一条记录在一张或两张表中存在多次,则只返回一条记录,也就是说,结果表中不存在重复的记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.intersect(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.intersect(right) 4.IntersectAll:Batch和 SQL INTERSECT ALL 子句类似。IntersectAll 返回两个表中都存在的记录。如果一条记录在两张表中出现多次,那么该记录返回的次数同该记录在两个表中都出现的次数一致,也就是说,结果表可能存在重复记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.intersectAll(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.intersectAll(right) 5.Minus:Batch和 SQL EXCEPT 子句类似。Minus 返回左表中存在且右表中不存在的记录。左表中的重复记录只返回一次,换句话说,结果表中没有重复记录。两张表必须具有相同的字段类型。JavaTable left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.minus(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.minus(right) 6.MinusAll:Batch和 SQL EXCEPT ALL 子句类似。MinusAll 返回右表中不存在的记录。在左表中出现 n 次且在右表中出现 m 次的记录,在结果表中出现 (n - m) 次,例如,也就是说结果中删掉了在右表中存在重复记录的条数的记录。两张表必须具有相同的字段类型。Java:Table left = tableEnv.from("orders1"); Table right = tableEnv.from("orders2"); left.minusAll(right); Scala:val left = tableEnv.from("orders1") val right = tableEnv.from("orders2") left.minusAll(right) 7.In:Batch Streaming和 SQL IN 子句类似。如果表达式的值存在于给定表的子查询中,那么 In 子句返回 true。子查询表必须由一列组成。这个列必须与表达式具有相同的数据类型。Java:Table left = tableEnv.from("Orders1") Table right = tableEnv.from("Orders2"); Table result = left.select($("a"), $("b"), $("c")).where($("a").in(right)); Scala:val left = tableEnv.from("Orders1") val right = tableEnv.from("Orders2"); val result = left.select($"a", $"b", $"c").where($"a".in(right)) Python:left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('a')) result = left.select(left.a, left.b, left.c).where(left.a.in_(right)) 十二、OrderBy, Offset & Fetch1.Order By:Batch Streaming和 SQL ORDER BY 子句类似。返回跨所有并行分区的全局有序记录。对于无界表,该操作需要对时间属性进行排序或进行后续的 fetch 操作。Java:from pyflink.table.expressions import col left = t_env.from_path("Source1").select(col('a'), col('b'), col('c')) right = t_env.from_path("Source2").select(col('d'), col('e'), col('f')) left_outer_result = left.left_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) right_outer_result = left.right_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) full_outer_result = left.full_outer_join(right, left.a == right.d).select(left.a, left.b, right.e) Scala:val result = in.orderBy($"a".asc)Python:result = in.order_by(in.a.asc)2.Offset & Fetch:Batch Streaming和 SQL 的 OFFSET 和 FETCH 子句类似。Offset 操作根据偏移位置来限定(可能是已排序的)结果集。Fetch 操作将(可能已排序的)结果集限制为前 n 行。通常,这两个操作前面都有一个排序操作。对于无界表,offset 操作需要 fetch 操作。Java:// 从已排序的结果集中返回前5条记录 Table result1 = in.orderBy($("a").asc()).fetch(5); // 从已排序的结果集中返回跳过3条记录之后的所有记录 Table result2 = in.orderBy($("a").asc()).offset(3); // 从已排序的结果集中返回跳过10条记录之后的前5条记录 Table result3 = in.orderBy($("a").asc()).offset(10).fetch(5); Scala:// 从已排序的结果集中返回前5条记录 val result1: Table = in.orderBy($"a".asc).fetch(5) // 从已排序的结果集中返回跳过3条记录之后的所有记录 val result2: Table = in.orderBy($"a".asc).offset(3) // 从已排序的结果集中返回跳过10条记录之后的前5条记录 val result3: Table = in.orderBy($"a".asc).offset(10).fetch(5) Python:# 从已排序的结果集中返回前5条记录 result1 = table.order_by(table.a.asc).fetch(5) # 从已排序的结果集中返回跳过3条记录之后的所有记录 result2 = table.order_by(table.a.asc).offset(3) # 从已排序的结果集中返回跳过10条记录之后的前5条记录 result3 = table.order_by(table.a.asc).offset(10).fetch(5) 3.Insert:Batch Streaming和 SQL 查询中的 INSERT INTO 子句类似,该方法执行对已注册的输出表的插入操作。executeInsert() 方法将立即提交执行插入操作的 Flink job。输出表必须已注册在 TableEnvironment(详见表连接器)中。此外,已注册表的 schema 必须与查询中的 schema 相匹配。Java:Table orders = tableEnv.from("Orders"); orders.executeInsert("OutOrders");Scala:val orders = tableEnv.from("Orders") orders.executeInsert("OutOrders")Python:orders = t_env.from_path("Orders") orders.execute_insert("OutOrders")十三、Windows1.Group WindowsGroup window 聚合根据时间或行计数间隔将行分为有限组,并为每个分组进行一次聚合函数计算。对于批处理表,窗口是按时间间隔对记录进行分组的便捷方式。窗口是使用 window(GroupWindow w) 子句定义的,并且需要使用 as 子句来指定别名。为了按窗口对表进行分组,窗口别名必须像常规分组属性一样在 groupBy(…) 子句中引用。 以下示例展示了如何在表上定义窗口聚合Java:Table table = input .window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w .groupBy($("w")) // 以窗口 w 对表进行分组 .select($("b").sum()); // 聚合Scala:val table = input .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w .groupBy($"w") // 以窗口 w 对表进行分组 .select($"b".sum) // 聚合Python:# 定义窗口并指定别名为 w,以窗口 w 对表进行分组,然后再聚合 table = input.window([w: GroupWindow].alias("w")) \ .group_by(col('w')).select(input.b.sum)在流环境中,如果窗口聚合除了窗口之外还根据一个或多个属性进行分组,则它们只能并行计算,例如,groupBy(…) 子句引用了一个窗口别名和至少一个附加属性。仅引用窗口别名(例如在上面的示例中)的 groupBy(…) 子句只能由单个非并行任务进行计算。 以下示例展示了如何定义有附加分组属性的窗口聚合。Java:Table table = input .window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w .groupBy($("w"), $("a")) // 以属性 a 和窗口 w 对表进行分组 .select($("a"), $("b").sum()); // 聚合Scala:val table = input .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w .groupBy($"w", $"a") // 以属性 a 和窗口 w 对表进行分组 .select($"a", $"b".sum) // 聚合Python:# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组, # 然后再聚合 table = input.window([w: GroupWindow].alias("w")) \ .group_by(col('w'), input.a).select(input.b.sum)时间窗口的开始、结束或行时间戳等窗口属性可以作为窗口别名的属性添加到 select 子句中,如 w.start、w.end 和 w.rowtime。窗口开始和行时间戳是包含的上下窗口边界。相反,窗口结束时间戳是唯一的上窗口边界。例如,从下午 2 点开始的 30 分钟滚动窗口将 “14:00:00.000” 作为开始时间戳,“14:29:59.999” 作为行时间时间戳,“14:30:00.000” 作为结束时间戳。Java:able table = input .window([GroupWindow w].as("w")) // 定义窗口并指定别名为 w .groupBy($("w"), $("a")) // 以属性 a 和窗口 w 对表进行分组 .select($("a"), $("w").start(), $("w").end(), $("w").rowtime(), $("b").count()); // 聚合并添加窗口开始、结束和 rowtime 时间戳Scala:val table = input .window([w: GroupWindow] as $"w") // 定义窗口并指定别名为 w .groupBy($"w", $"a") // 以属性 a 和窗口 w 对表进行分组 .select($"a", $"w".start, $"w".end, $"w".rowtime, $"b".count) // 聚合并添加窗口开始、结束和 rowtime 时间戳Python:# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组, # 然后再聚合并添加窗口开始、结束和 rowtime 时间戳 table = input.window([w: GroupWindow].alias("w")) \ .group_by(col('w'), input.a) \ .select(input.a, col('w').start, col('w').end, col('w').rowtime, input.b.count)Window 参数定义了如何将行映射到窗口。 Window 不是用户可以实现的接口。相反,Table API 提供了一组具有特定语义的预定义 Window 类。下面列出了支持的窗口定义。2.Tumble (Tumbling Windows)滚动窗口将行分配给固定长度的非重叠连续窗口。例如,一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。滚动窗口是通过 Tumble 类定义的,具体如下:MethodDescriptionover将窗口的长度定义为时间或行计数间隔。on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。alias指定窗口的别名。别名用于在 group_by() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。Java:// Tumbling Event-time Window .window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w")); // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w")); // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));Scala:// Tumbling Event-time Window .window(Tumble over 10.minutes on $"rowtime" as $"w") // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.minutes on $"proctime" as $"w") // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.rows on $"proctime" as $"w")Python:# Tumbling Event-time Window .window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias("w")) # Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble.over(lit(10).minutes).on(col('proctime')).alias("w")) # Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble.over(row_interval(10)).on(col('proctime')).alias("w"))3.Slide (Sliding Windows)滑动窗口具有固定大小并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,行可能分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每一行分配给 3 个不同的 15 分钟大小的窗口,以 5 分钟的间隔进行一次计算。滑动窗口可以定义在事件时间、处理时间或行数上。滑动窗口是通过 Slide 类定义的,具体如下:MethodDescriptionover 将窗口的长度定义为时间或行计数间隔。every将窗口的长度定义为时间或行计数间隔。滑动间隔的类型必须与窗口长度的类型相同。on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。Java:// Sliding Event-time Window .window(Slide.over(lit(10).minutes()) .every(lit(5).minutes()) .on($("rowtime")) .as("w")); // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide.over(lit(10).minutes()) .every(lit(5).minutes()) .on($("proctime")) .as("w")); // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));Scala:// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on $"rowtime" as $"w") // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on $"proctime" as $"w") // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on $"proctime" as $"w")Python:# Sliding Event-time Window .window(Slide.over(lit(10).minutes).every(lit(5).minutes).on(col('rowtime')).alias("w")) # Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide.over(lit(10).minutes).every(lit(5).minutes).on(col('proctime')).alias("w")) # Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide.over(row_interval(10)).every(row_interval(5)).on(col('proctime')).alias("w"))4.Session (Session Windows)会话窗口没有固定的大小,其边界是由不活动的间隔定义的,例如,如果在定义的间隔期内没有事件出现,则会话窗口将关闭。例如,定义30 分钟间隔的会话窗口,当观察到一行在 30 分钟内不活动(否则该行将被添加到现有窗口中)且30 分钟内没有添加新行,窗口会关闭。会话窗口支持事件时间和处理时间。MethodDescriptionwithGap将两个窗口之间的间隙定义为时间间隔。on要对数据进行分组(时间间隔)或排序(行计数)的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。as指定窗口的别名。别名用于在 groupBy() 子句中引用窗口,并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。Java:// Session Event-time Window .window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w")); // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));Scala:// Session Event-time Window .window(Session withGap 10.minutes on $"rowtime" as $"w") // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on $"proctime" as $"w")Python:# Session Event-time Window .window(Session.with_gap(lit(10).minutes).on(col('rowtime')).alias("w")) # Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session.with_gap(lit(10).minutes).on(col('proctime')).alias("w"))5.Over WindowsOver window 聚合聚合来自在标准的 SQL(OVER 子句),可以在 SELECT 查询子句中定义。与在“GROUP BY”子句中指定的 group window 不同, over window 不会折叠行。相反,over window 聚合为每个输入行在其相邻行的范围内计算聚合。Over windows 使用 window(w: OverWindow*) 子句(在 Python API 中使用 over_window(*OverWindow))定义,并通过 select() 方法中的别名引用。以下示例显示如何在表上定义 over window 聚合。Java:Table table = input .window([OverWindow w].as("w")) // define over window with alias w .select($("a"), $("b").sum().over($("w")), $("c").min().over($("w"))); // aggregate over the over window wScala:val table = input .window([w: OverWindow] as $"w") // define over window with alias w .select($"a", $"b".sum over $"w", $"c".min over $"w") // aggregate over the over window wPython:# define over window with alias w and aggregate over the over window w table = input.over_window([w: OverWindow].alias("w")) \ .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))OverWindow 定义了计算聚合的行范围。OverWindow 不是用户可以实现的接口。相反,Table API 提供了Over 类来配置 over window 的属性。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围内定义 over window 。可以通过 Over 类(和其他类)上的方法来定义 over window,具体如下:Partition By可选的在一个或多个属性上定义输入的分区。每个分区单独排序,聚合函数分别应用于每个分区。注意:在流环境中,如果窗口包含 partition by 子句,则只能并行计算 over window 聚合。如果没有 partitionBy(…),数据流将由单个非并行任务处理。Order By必须的定义每个分区内行的顺序,从而定义聚合函数应用于行的顺序。注意:对于流处理查询,必须声明事件时间或处理时间属性。目前,仅支持单个排序属性。Preceding可选的定义了包含在窗口中并位于当前行之前的行的间隔。间隔可以是时间或行计数间隔。有界 over window 用间隔的大小指定,例如,时间间隔为10分钟或行计数间隔为10行。无界 over window 通过常量来指定,例如,用UNBOUNDED_RANGE指定时间间隔或用 UNBOUNDED_ROW 指定行计数间隔。无界 over windows 从分区的第一行开始。如果省略前面的子句,则使用 UNBOUNDED_RANGE 和 CURRENT_RANGE 作为窗口前后的默认值。Following可选的定义包含在窗口中并在当前行之后的行的窗口间隔。间隔必须以与前一个间隔(时间或行计数)相同的单位指定。目前,不支持在当前行之后有行的 over window。相反,你可以指定两个常量之一:CURRENT_ROW 将窗口的上限设置为当前行。CURRENT_RANGE 将窗口的上限设置为当前行的排序键,例如,与当前行具有相同排序键的所有行都包含在窗口中。如果省略后面的子句,则时间间隔窗口的上限定义为 CURRENT_RANGE,行计数间隔窗口的上限定义为CURRENT_ROW。As必须的为 over window 指定别名。别名用于在之后的 select() 子句中引用该 over window。注意:目前,同一个 select() 调用中的所有聚合函数必须在同一个 over window 上计算。6.Unbounded Over WindowsJava// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_RANGE).as("w")); // 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy("proctime").preceding(UNBOUNDED_RANGE).as("w")); // 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(UNBOUNDED_ROW).as("w")); // 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(UNBOUNDED_ROW).as("w"));Scala// 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_RANGE as "w") // 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_RANGE as "w") // 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding UNBOUNDED_ROW as "w") // 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding UNBOUNDED_ROW as "w")Python# 无界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(UNBOUNDED_RANGE).alias("w")) # 无界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(UNBOUNDED_RANGE).alias("w")) # 无界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(UNBOUNDED_ROW).alias("w")) # 无界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(UNBOUNDED_ROW).alias("w"))7.Bounded Over WindowsJava// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(lit(1).minutes()).as("w")); // 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(lit(1).minutes()).as("w")); // 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over.partitionBy($("a")).orderBy($("rowtime")).preceding(rowInterval(10)).as("w")); // 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over.partitionBy($("a")).orderBy($("proctime")).preceding(rowInterval(10)).as("w"));Scala:// 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding 1.minutes as "w") // 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding 1.minutes as "w") // 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .window(Over partitionBy $"a" orderBy $"rowtime" preceding 10.rows as "w") // 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .window(Over partitionBy $"a" orderBy $"proctime" preceding 10.rows as "w")Python:# 有界的事件时间 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(lit(1).minutes).alias("w")) # 有界的处理时间 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(lit(1).minutes).alias("w")) # 有界的事件时间行数 over window(假定有一个叫“rowtime”的事件时间属性) .over_window(Over.partition_by(col('a')).order_by(col('rowtime')).preceding(row_interval(10)).alias("w")) # 有界的处理时间行数 over window(假定有一个叫“proctime”的处理时间属性) .over_window(Over.partition_by(col('a')).order_by(col('proctime')).preceding(row_interval(10)).alias("w")) 十四、Row-based Operations基于行生成多列输出的操作。1.Map:Batch Streaming使用用户定义的标量函数或内置标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。Java:public class MyMapFunction extends ScalarFunction { public Row eval(String a) { return Row.of(a, "pre-" + a); } @Override public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.ROW(Types.STRING(), Types.STRING()); } } ScalarFunction func = new MyMapFunction(); tableEnv.registerFunction("func", func); Table table = input .map(call("func", $("c")).as("a", "b")); Scala:class MyMapFunction extends ScalarFunction { def eval(a: String): Row = { Row.of(a, "pre-" + a) } override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.ROW(Types.STRING, Types.STRING) } val func = new MyMapFunction() val table = input .map(func($"c")).as("a", "b") Python:使用 python 的通用标量函数或向量化标量函数执行 map 操作。如果输出类型是复合类型,则输出将被展平。from pyflink.common import Row from pyflink.table import DataTypes from pyflink.table.udf import udf def map_function(a: Row) -> Row: return Row(a.a + 1, a.b * a.b) # 使用 python 通用标量函数进行 map 操作 func = udf(map_function, result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT())])) table = input.map(func).alias('a', 'b') # 使用 python 向量化标量函数进行 map 操作 pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT()))]), func_type='pandas') table = input.map(pandas_func).alias('a', 'b') 2.FlatMap:Batch StreamingJava:使用表函数执行 flatMap 操作。public class MyFlatMapFunction extends TableFunction<Row> { public void eval(String str) { if (str.contains("#")) { String[] array = str.split("#"); for (int i = 0; i < array.length; ++i) { collect(Row.of(array[i], array[i].length())); } } } @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING(), Types.INT()); } } TableFunction func = new MyFlatMapFunction(); tableEnv.registerFunction("func", func); Table table = input .flatMap(call("func", $("c")).as("a", "b")); Scala:class MyFlatMapFunction extends TableFunction[Row] { def eval(str: String): Unit = { if (str.contains("#")) { str.split("#").foreach({ s => val row = new Row(2) row.setField(0, s) row.setField(1, s.length) collect(row) }) } } override def getResultType: TypeInformation[Row] = { Types.ROW(Types.STRING, Types.INT) } } val func = new MyFlatMapFunction val table = input .flatMap(func($"c")).as("a", "b") Python:from pyflink.table.udf import udtf from pyflink.table import DataTypes from pyflink.common import Row @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) def split(x: Row) -> Row: for s in x.b.split(","): yield x.a, s input.flat_map(split) 3.Aggregate:Batch StreamingJava:使用聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。public class MyMinMaxAcc { public int min = 0; public int max = 0; } public class MyMinMax extends AggregateFunction<Row, MyMinMaxAcc> { public void accumulate(MyMinMaxAcc acc, int value) { if (value < acc.min) { acc.min = value; } if (value > acc.max) { acc.max = value; } } @Override public MyMinMaxAcc createAccumulator() { return new MyMinMaxAcc(); } public void resetAccumulator(MyMinMaxAcc acc) { acc.min = 0; acc.max = 0; } @Override public Row getValue(MyMinMaxAcc acc) { return Row.of(acc.min, acc.max); } @Override public TypeInformation<Row> getResultType() { return new RowTypeInfo(Types.INT, Types.INT); } } AggregateFunction myAggFunc = new MyMinMax(); tableEnv.registerFunction("myAggFunc", myAggFunc); Table table = input .groupBy($("key")) .aggregate(call("myAggFunc", $("a")).as("x", "y")) .select($("key"), $("x"), $("y")); Scala:case class MyMinMaxAcc(var min: Int, var max: Int) class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { def accumulate(acc: MyMinMaxAcc, value: Int): Unit = { if (value < acc.min) { acc.min = value } if (value > acc.max) { acc.max = value } } override def createAccumulator(): MyMinMaxAcc = MyMinMaxAcc(0, 0) def resetAccumulator(acc: MyMinMaxAcc): Unit = { acc.min = 0 acc.max = 0 } override def getValue(acc: MyMinMaxAcc): Row = { Row.of(Integer.valueOf(acc.min), Integer.valueOf(acc.max)) } override def getResultType: TypeInformation[Row] = { new RowTypeInfo(Types.INT, Types.INT) } } val myAggFunc = new MyMinMax val table = input .groupBy($"key") .aggregate(myAggFunc($"a") as ("x", "y")) .select($"key", $"x", $"y") Python:使用 python 的通用聚合函数或 向量化聚合函数来执行聚合操作。你必须使用 select 子句关闭 aggregate ,并且 select 子句不支持聚合函数。如果输出类型是复合类型,则聚合的输出将被展平。from pyflink.common import Row from pyflink.table import DataTypes from pyflink.table.udf import AggregateFunction, udaf class CountAndSumAggregateFunction(AggregateFunction): def get_value(self, accumulator): return Row(accumulator[0], accumulator[1]) def create_accumulator(self): return Row(0, 0) def accumulate(self, accumulator, row: Row): accumulator[0] += 1 accumulator[1] += row.b def retract(self, accumulator, row: Row): accumulator[0] -= 1 accumulator[1] -= row.b def merge(self, accumulator, accumulators): for other_acc in accumulators: accumulator[0] += other_acc[0] accumulator[1] += other_acc[1] def get_accumulator_type(self): return DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT())]) def get_result_type(self): return DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), DataTypes.FIELD("b", DataTypes.BIGINT())]) function = CountAndSumAggregateFunction() agg = udaf(function, result_type=function.get_result_type(), accumulator_type=function.get_accumulator_type(), name=str(function.__class__.__name__)) # 使用 python 通用聚合函数进行聚合 result = t.group_by(t.a) \ .aggregate(agg.alias("c", "d")) \ .select("a, c, d") # 使用 python 向量化聚合函数进行聚合 pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") t.aggregate(pandas_udaf.alias("a", "b")) \ .select("a, b") 4.Group Window Aggregate:Batch Streaming在 group window 和可能的一个或多个分组键上对表进行分组和聚合。你必须使用 select 子句关闭 aggregate。并且 select 子句不支持“*“或聚合函数。Java:AggregateFunction myAggFunc = new MyMinMax(); tableEnv.registerFunction("myAggFunc", myAggFunc); Table table = input .window(Tumble.over(lit(5).minutes()) .on($("rowtime")) .as("w")) // 定义窗口 .groupBy($("key"), $("w")) // 以键和窗口分组 .aggregate(call("myAggFunc", $("a")).as("x", "y")) .select($("key"), $("x"), $("y"), $("w").start(), $("w").end()); // 访问窗口属性与聚合结果Scala:val myAggFunc = new MyMinMax val table = input .window(Tumble over 5.minutes on $"rowtime" as "w") // 定义窗口 .groupBy($"key", $"w") // 以键和窗口分组 .aggregate(myAggFunc($"a") as ("x", "y")) .select($"key", $"x", $"y", $"w".start, $"w".end) // 访问窗口属性与聚合结果Python:from pyflink.table import DataTypes from pyflink.table.udf import AggregateFunction, udaf pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") tumble_window = Tumble.over(expr.lit(1).hours) \ .on(expr.col("rowtime")) \ .alias("w") t.select(t.b, t.rowtime) \ .window(tumble_window) \ .group_by("w") \ .aggregate(pandas_udaf.alias("d", "e")) \ .select("w.rowtime, d, e")5.FlatAggregate和 GroupBy Aggregation 类似。使用运行中的表之后的聚合算子对分组键上的行进行分组,以按组聚合行。和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。你必须使用 select 子句关闭 flatAggregate。并且 select 子句不支持聚合函数。除了使用 emitValue 输出结果,你还可以使用 emitUpdateWithRetract 方法。和 emitValue 不同的是,emitUpdateWithRetract 用于下发已更新的值。此方法在retract 模式下增量输出数据,例如,一旦有更新,我们必须在发送新的更新记录之前收回旧记录。如果在表聚合函数中定义了这两个方法,则将优先使用 emitUpdateWithRetract 方法而不是 emitValue 方法,这是因为该方法可以增量输出值,因此被视为比 emitValue 方法更有效。Java:/** * Top2 Accumulator。 */ public class Top2Accum { public Integer first; public Integer second; } /** * 用户定义的聚合函数 top2。 */ public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> { @Override public Top2Accum createAccumulator() { Top2Accum acc = new Top2Accum(); acc.first = Integer.MIN_VALUE; acc.second = Integer.MIN_VALUE; return acc; } public void accumulate(Top2Accum acc, Integer v) { if (v > acc.first) { acc.second = acc.first; acc.first = v; } else if (v > acc.second) { acc.second = v; } } public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) { for (Top2Accum otherAcc : iterable) { accumulate(acc, otherAcc.first); accumulate(acc, otherAcc.second); } } public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) { // 下发 value 与 rank if (acc.first != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.first, 1)); } if (acc.second != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.second, 2)); } } } tEnv.registerFunction("top2", new Top2()); Table orders = tableEnv.from("Orders"); Table result = orders .groupBy($("key")) .flatAggregate(call("top2", $("a")).as("v", "rank")) .select($("key"), $("v"), $("rank");Scala:import java.lang.{Integer => JInteger} import org.apache.flink.table.api.Types import org.apache.flink.table.functions.TableAggregateFunction /** * Top2 Accumulator。 */ class Top2Accum { var first: JInteger = _ var second: JInteger = _ } /** * 用户定义的聚合函数 top2。 */ class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum] { override def createAccumulator(): Top2Accum = { val acc = new Top2Accum acc.first = Int.MinValue acc.second = Int.MinValue acc } def accumulate(acc: Top2Accum, v: Int) { if (v > acc.first) { acc.second = acc.first acc.first = v } else if (v > acc.second) { acc.second = v } } def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = { val iter = its.iterator() while (iter.hasNext) { val top2 = iter.next() accumulate(acc, top2.first) accumulate(acc, top2.second) } } def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): Unit = { // 下发 value 与 rank if (acc.first != Int.MinValue) { out.collect(JTuple2.of(acc.first, 1)) } if (acc.second != Int.MinValue) { out.collect(JTuple2.of(acc.second, 2)) } } } val top2 = new Top2 val orders: Table = tableEnv.from("Orders") val result = orders .groupBy($"key") .flatAggregate(top2($"a") as ($"v", $"rank")) .select($"key", $"v", $"rank")Python:使用 python 通用 Table Aggregate Function 执行 flat_aggregate 操作。和 GroupBy Aggregation 类似。使用运行中的表之后的聚合运算符对分组键上的行进行分组,以按组聚合行。和 AggregateFunction 的不同之处在于,TableAggregateFunction 的每个分组可能返回0或多条记录。你必须使用 select 子句关闭 flat_aggregate。并且 select 子句不支持聚合函数。from pyflink.common import Row from pyflink.table.udf import TableAggregateFunction, udtaf from pyflink.table import DataTypes class Top2(TableAggregateFunction): def emit_value(self, accumulator): yield Row(accumulator[0]) yield Row(accumulator[1]) def create_accumulator(self): return [None, None] def accumulate(self, accumulator, row: Row): if row.a is not None: if accumulator[0] is None or row.a > accumulator[0]: accumulator[1] = accumulator[0] accumulator[0] = row.a elif accumulator[1] is None or row.a > accumulator[1]: accumulator[1] = row.a def merge(self, accumulator, accumulators): for other_acc in accumulators: self.accumulate(accumulator, other_acc[0]) self.accumulate(accumulator, other_acc[1]) def get_accumulator_type(self): return DataTypes.ARRAY(DataTypes.BIGINT()) def get_result_type(self): return DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT())]) mytop = udtaf(Top2()) t = t_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), (2, 'Hi', 'Hello')], ['a', 'b', 'c']) result = t.select(t.a, t.c) \ .group_by(t.c) \ .flat_aggregate(mytop) \ .select(t.a) \ .flat_aggregate(mytop.alias("b"))

0
0
0
浏览量2014
打酱油的后端

Flink系列Table API和SQL之:创建表环境和创建表

一、快速上手Table API和SQL创建表环境TableEnvironment tableEnv = ...;创建输入表,连接外部系统读取数据tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ('connector' = ... )");注册一个表,连接到外部系统,用于输出tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ('connector' = ... )");执行SQL对表进行查询转换,得到一个新的表Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");使用Table API对表进行查询转换,得到一个新的表Table table2 = tableEnv.from("inputTable").select(...);将得到的结果写入输出表TableResult tableResult = table1.executeInsert("outputTable");二、创建表环境对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的表环境(TableEnvironment)。主要负责:注册Catalog和表执行SQL查询注册用户自定义函数(UDF)DataStream和表之间的转换这里的Catalog就是目录,与标准SQL中的概念是一致的,主要用来管理所有数据库(database)和表(table)的元数据(metadata)。通过Catalog可以方便地对数据库和表进行查询的管理,所以可以认为我们所定义的表都会挂靠在某个目录下,这样就可以快速检索。在表环境中可以由用户自定义Catalog,并在其中注册表和自定义函数(UDF)。默认的Catalog就叫作default_catalog。每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式。计划器默认使用blink planner。import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class CommonApiTest { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment talbeEnv = StreamTableEnvironment.create(env); //1。定义环境配置来创建表执行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); //2.基于blink版本planner进行批处理 EnvironmentSettings settings2 = EnvironmentSettings.newInstance() .inBatchMode() .useBlinkPlanner() .build(); TableEnvironment tableEnv3 = TableEnvironment.create(settings2); } }三、创建表表(Table)是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。Flink中的表是由多个行数据构成的,每个行(Row)又可以有定义好的多个列(Column)字段。整体来看,表就是固定类型的数据组成的二维矩阵。为了方便的查询表,表环境中会维护一个目录(Catalog)和表的对应关系。所以表都是通过Catalog来进行注册创建的。表在环境中有一个唯一的ID,由三部分组成:目录(catalog)名,数据库(database)名,以及表名。在默认情况下,目录名为default_catalog,数据库名为default_database。所以如果我们直接创建一个叫做MyTable的表,它的ID就是:default_catalog.default_database.MyTable具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。1.连接器表(Connector Tables)最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。例如我们可以连接到Kafka或者文件系统,将存储在这些外部系统的数据以表的形式定义出来,这样对表的读写就可以通过连接器转换成对外部系统的读写了。当我们在表环境中读取这张表,连接器就会从外部系统读取数据并进行转换。而当我们向这张表写入数据,连接器就会将数据输出(Sink)到外部系统中。在代码中,可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ('connector' = ... )");这里的TEMPORARY关键字可以省略。这里没有定义Catalog和Database,所以都是默认的,表的完整ID就是default_catalog.default_database.MyTable。如果希望使用自定义的目录名和库名,可以在环境中进行设置:tEnv.useCatalog("custom_catalog"); tEnv.useDatabase("custom_database");这样创建的表完整ID就变成了custom_catalog.custom_database.MyTable。之后在表环境中创建的所有表,ID也会都以custom_catalog.custom_database作为前缀。 //2。创建表 String creatDDL = "CREATE TABLE clickTable (" + "user STRING, " + "url STRING, " + "ts BIGINT " + ") WITH (" + " 'connector' = 'filesystem',"+ " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink" + "/input/clicks.txt'," + " 'format' = 'csv'" + ")"; tableEnv.executeSql(creatDDL); //创建一张用于输出的表 //2。创建表 String creatOutDDL = "CREATE TABLE outTable (" + "user STRING, " + "url STRING, " + "ts BIGINT " + ") WITH (" + " 'connector' = 'filesystem',"+ " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink" + "/input/output.txt'," + " 'format' = 'csv'" + ")"; tableEnv.executeSql(creatOutDDL);2.虚拟表(Virtual Tables)在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable ...");这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。得到的newTable是一个中间转换结果,如果之后又希望直接使用这个表执行SQL,又该怎么做呢?由于newTable是一个Table对象,并没有在表环境中注册。所以我们还需要将这个中间结果表注册到环境中,才能在SQL中使用:tableEnv.createTemporaryView("NewTable",newTable);我们发现,这里的注册其实是创建了一个虚拟表(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建虚拟视图(createTemporaryView)。视图之所以是虚拟的,是因为我们并不会直接保存这个表的内容,并没有实体。只是在用到这张表的时候,会将它对应的查询语句嵌入到SQL中。

0
0
0
浏览量2016
打酱油的后端

Flink系列之:背压下的检查点

一、Checkpointing under backpressure通常情况下,对齐 Checkpoint 的时长主要受 Checkpointing 过程中的同步和异步两个部分的影响。 然而,当 Flink 作业正运行在严重的背压下时,Checkpoint 端到端延迟的主要影响因子将会是传递 Checkpoint Barrier 到 所有的算子/子任务的时间。这在 checkpointing process) 的概述中有说明原因。并且可以通过高 alignment time and start delay metrics 观察到。 当这种情况发生并成为一个问题时,有三种方法可以解决这个问题:消除背压源头,通过优化 Flink 作业,通过调整 Flink 或 JVM 参数,抑或是通过扩容。减少 Flink 作业中缓冲在 In-flight 数据的数据量。启用非对齐 Checkpoints。 这些选项并不是互斥的,可以组合在一起。本文档重点介绍后两个选项。二、缓冲区 DebloatingFlink 1.14 引入了一个新的工具,用于自动控制在 Flink 算子/子任务之间缓冲的 In-flight 数据的数据量。缓冲区 Debloating 机 制可以通过将属性taskmanager.network.memory.buffer-debloat.enabled设置为true来启用。此特性对对齐和非对齐 Checkpoint 都生效,并且在这两种情况下都能缩短 Checkpointing 的时间,不过 Debloating 的效果对于 对齐 Checkpoint 最明显。 当在非对齐 Checkpoint 情况下使用缓冲区 Debloating 时,额外的好处是 Checkpoint 大小会更小,并且恢复时间更快 (需要保存 和恢复的 In-flight 数据更少)。有关缓冲区 Debloating 功能如何工作以及如何配置的更多信息,可以参考 network memory tuning guide。 请注意,您仍然可以继续使用在前面调优指南中介绍过的方式来手动减少缓冲在 In-flight 数据的数据量。三、非对齐 Checkpoints从Flink 1.11开始,Checkpoint 可以是非对齐的。 Unaligned checkpoints 包含 In-flight 数据(例如,存储在缓冲区中的数据)作为 Checkpoint State的一部分,允许 Checkpoint Barrier 跨越这些缓冲区。因此, Checkpoint 时长变得与当前吞吐量无关,因为 Checkpoint Barrier 实际上已经不再嵌入到数据流当中。如果您的 Checkpointing 由于背压导致周期非常的长,您应该使用非对齐 Checkpoint。这样,Checkpointing 时间基本上就与 端到端延迟无关。请注意,非对齐 Checkpointing 会增加状态存储的 I/O,因此当状态存储的 I/O 是 整个 Checkpointing 过程当中真 正的瓶颈时,您不应当使用非对齐 Checkpointing。为了启用非对齐 Checkpoint,您可以:Java代码StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 启用非对齐 Checkpoint env.getCheckpointConfig().enableUnalignedCheckpoints();Scala代码:val env = StreamExecutionEnvironment.getExecutionEnvironment() // 启用非对齐 Checkpoint env.getCheckpointConfig.enableUnalignedCheckpoints()Python代码:env = StreamExecutionEnvironment.get_execution_environment() # 启用非对齐 Checkpoint env.get_checkpoint_config().enable_unaligned_checkpoints()或者在 flink-conf.yml 配置文件中增加配置:execution.checkpointing.unaligned: true四、对齐 Checkpoint 的超时在启用非对齐 Checkpoint 后,你依然可以通过编程的方式指定对齐 Checkpoint 的超时:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));或是在 flink-conf.yml 配置文件中配置:execution.checkpointing.aligned-checkpoint-timeout: 30 s在启动时,每个 Checkpoint 仍然是 aligned checkpoint,但是当全局 Checkpoint 持续时间超过 aligned-checkpoint-timeout 时, 如果 aligned checkpoint 还没完成,那么 Checkpoint 将会转换为 Unaligned Checkpoint。五、限制并发 CheckpointFlink 当前并不支持并发的非对齐 Checkpoint。然而,由于更可预测的和更短的 Checkpointing 时长,可能也根本就不需要并发的 Checkpoint。此外,Savepoint 也不能与非对齐 Checkpoint 同时发生,因此它们将会花费稍长的时间。与 Watermark 的相互影响非对齐 Checkpoint 在恢复的过程中改变了关于 Watermark 的一个隐式保证。目前,Flink 确保了 Watermark 作为恢复的第一步, 而不是将最近的 Watermark 存放在 Operator 中,以方便扩缩容。在非对齐 Checkpoint 中,这意味着当恢复时,Flink 会在恢复 In-flight 数据后再生成 Watermark。如果您的 Pipeline 中使用了对每条记录都应用最新的 Watermark 的算子将会相对于 使用对齐 Checkpoint产生不同的结果。如果您的 Operator 依赖于最新的 Watermark 始终可用,解决办法是将 Watermark 存放在 OperatorState 中。在这种情况下,Watermark 应该使用单键 group 存放在 UnionState 以方便扩缩容。与长时间运行的记录处理的相互作用尽管未对齐的检查点障碍仍然能够超越队列中的所有其他记录。如果当前记录需要花费大量时间来处理,则此屏障的处理仍然可能会被延迟。当同时触发多个计时器时(例如在窗口操作中),可能会发生这种情况。当系统在处理单个输入记录时被阻塞等待多个网络缓冲区可用性时,可能会出现第二种有问题的情况。 Flink 无法中断单个输入记录的处理,未对齐的检查点必须等待当前处理的记录被完全处理。这可能会在两种情况下导致问题。由于不适合单个网络缓冲区的大记录的序列化或在 flatMap 操作中,会为一个输入记录生成许多输出记录。在这种情况下,背压可能会阻止未对齐的检查点,直到处理单个输入记录所需的所有网络缓冲区都可用。当处理单个记录需要一段时间时,它也可能发生在任何其他情况下。因此,检查点的时间可能会比预期的时间长,或者可能会有所不同。某些数据分布模式没有检查点有些属性包含的连接无法与 Channel 中的数据一样保存在 Checkpoint 中。为了保留这些功能并确保没有状态冲突或非预期的行为,非同一 Checkpoint 对于这些类型的连接是禁用的。所有其他的交换仍然执行非单色检查点。点对点连接我们目前没有任何对于点对点连接中有关数据有序性的强保证。然而,由于数据已经被以前置的 Source 或是 KeyBy 相同的方式隐式 组织,一些用户会依靠这种特性在提供的有序性保证的同时将计算敏感型的任务划分为更小的块。只要并行度不变,非对齐 Checkpoint(UC) 将会保留这些特性。但是如果加上UC的伸缩容,这些特性将会被改变。针对如下任务如果我们想将并行度从 p=2 扩容到 p=3,那么需要根据 KeyGroup 将 KeyBy 的 Channel 中的数据突然的划分到3个 Channel 中去。这 很容易做到,通过使用 Operator 的 KeyGroup 范围和确定记录属于某个 Key(group) 的方法(不管实际使用的是什么方法)。对于 Forward 的 Channel,我们根本没有 KeyContext。Forward Channel 里也没有任何记录被分配了任何 KeyGroup;也无法计算它,因为无法保证 Key仍然存在。广播 Connections广播 Connection 带来了另一个问题。无法保证所有 Channel 中的记录都以相同的速率被消费。这可能导致某些 Task 已经应用了与 特定广播事件对应的状态变更,而其他任务则没有,如图所示。广播分区通常用于实现广播状态,它应该跨所有 Operator 都相同。Flink 实现广播状态,通过仅 Checkpointing 有状态算子的 SubTask 0 中状态的单份副本。在恢复时,我们将该份副本发往所有的 Operator。因此,可能会发生以下情况:某个算子将很快从它的 Checkpointed Channel 消费数据并将修改应有于记录来获得状态。六、故障排除Corrupted in-flight data以下描述的操作是最后采取的手段,因为它们将会导致数据的丢失。为了防止 In-flight 数据损坏,或者由于其他原因导致作业应该在没有 In-flight 数据的情况下恢复,可以使用 recover-without-channel-state.checkpoint-id 属性。该属性需要指定一个 Checkpoint Id,对它来说 In-flight 中的数据将会被忽略。除非已经持久化的 In-flight 数据内部的损坏导致无 法恢复的情况,否则不要设置该属性。只有在重新部署作业后该属性才会生效,这就意味着只有启用 externalized checkpoint时,此操作才有意义。

0
0
0
浏览量1932
打酱油的后端

Flink系列Table API和SQL之:普通Top N和窗口Top N

一、普通Top N在Flink SQL中,是通过OVER聚合和一个条件筛选来实现Top N的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来。然后再用WHERE子句筛选行号小于等于N的那些行返回。基本语法如下:SELECT ... FROM ( SELECT ..., ROW_NUMBER() OVER( [PARTITION BY <字段1>[,<字段1>...]] ORDER BY <排序字段1> [asc|desc][,<排序字段2> [asc|desc]...] ) AS row_num FROM ...) WHERE row_num <= N [AND <其他条件>]OVER窗口定义,目的就是利用ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为row_num,并在外层的查询中以row_num<=N作为条件进行筛选,就可以得到根据排序字段统计的Top N结果了。二、普通Top N代码实现import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TopNExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //在创建表的DDL中直接定义时间属性 String createDDL = "create table clickTable (" + " `user` String, " + " url String, " + " ts bigint, " + " et AS TO_TIMESTAMP( from_unixtime(ts/1000))," + " watermark for et as et-interval '1' second " + ") with ( " + " 'connector' = 'filesystem'," + " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " + " 'format' = 'csv'" + ") "; tableEnv.executeSql(createDDL); //普通topN 选取当前所有用户中浏览量最大的两个 Table topNResultTable = tableEnv.sqlQuery("select user,cnt,row_num " + "from (" + " select*,row_number() over(" + " order by cnt desc" + " ) as row_num" + " from ( select user,count(url) as cnt from clickTable group by user)" + " ) where row_num<=2"); tableEnv.toChangelogStream(topNResultTable).print("top 2: "); env.execute(); } }top 2: > +I[Mary, 1, 1] top 2: > +I[Bob, 1, 2] top 2: > -U[Mary, 1, 1] top 2: > +U[Bob, 2, 1] top 2: > -U[Bob, 1, 2] top 2: > +U[Mary, 1, 2] top 2: > -U[Mary, 1, 2] top 2: > +U[Alice, 2, 2] top 2: > -U[Bob, 2, 1] top 2: > +U[Bob, 3, 1] top 2: > -U[Alice, 2, 2] top 2: > +U[Alice, 3, 2]三、窗口Top N除了直接对数据进行Top N的选取,也可以针对窗口来做Top N。具体来说,可以先做一个窗口聚合,将窗口信息window_start、window_end连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的Top N那样定义OVER窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前N行就可以得到结果。所以窗口Top N的实现就是窗口聚合与OVER聚合的结合使用。四、窗口Top N代码实现import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class TopNExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //在创建表的DDL中直接定义时间属性 String createDDL = "create table clickTable (" + " `user` String, " + " url String, " + " ts bigint, " + " et AS TO_TIMESTAMP( from_unixtime(ts/1000))," + " watermark for et as et-interval '1' second " + ") with ( " + " 'connector' = 'filesystem'," + " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " + " 'format' = 'csv'" + ") "; tableEnv.executeSql(createDDL); //窗口topN 一段时间内活跃用户统计 String subQuery = " select user, count(url) as cnt, window_start, window_end " + " from table ( " + " tumble( table clickTable, descriptor(et), interval '10' second )" + ") " + " group by user, window_start, window_end "; Table windowTopNResultTable = tableEnv.sqlQuery("select user, cnt, row_num, window_end " + "from (" + " select * , row_number() over(" + " partition by window_start, window_end " + // 固定写法 " order by cnt desc" + " ) as row_num" + " from ( " + subQuery + " ) " + ") where row_num <= 2"); tableEnv.toDataStream(windowTopNResultTable).print("window Top N :"); env.execute(); } }window Top N :> +I[Bob, 2, 1, 1970-01-01T08:00:10] window Top N :> +I[Alice, 2, 2, 1970-01-01T08:00:10] window Top N :> +I[Bob, 1, 1, 1970-01-01T08:00:20] window Top N :> +I[Alice, 1, 1, 1970-01-01T08:00:30]

0
0
0
浏览量1868
打酱油的后端

Flink系列之:窗口聚合

一、窗口表值函数(TVF)聚合适用于流批窗口聚合在 GROUP BY 子句中定义,包含应用窗口 TVF 的关系的“window_start”和“window_end”列。就像使用常规 GROUP BY 子句的查询一样,使用按窗口聚合进行分组的查询将计算每个组的单个结果行。SELECT ... FROM <windowed_table> -- relation applied windowing TVF GROUP BY window_start, window_end, ...与连续表上的其他聚合不同,窗口聚合不会发出中间结果,而只会发出最终结果,即窗口末尾的总聚合。此外,窗口聚合会在不再需要时清除所有中间状态。二、窗口表值函数TVFFlink 支持 TUMBLE、HOP 和 CUMULATE 类型的窗口聚合。在流模式下,窗口表值函数的时间属性字段必须位于事件或处理时间属性上。在批处理模式下,窗口表值函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的属性。以下是 TUMBLE、HOP 和 CUMULATE 窗口聚合的一些示例。-- 表必须具有时间属性,例如该表中的“bidtime” Flink SQL> desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | | supplier_id | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ Flink SQL> SELECT * FROM Bid; +------------------+-------+------+-------------+ | bidtime | price | item | supplier_id | +------------------+-------+------+-------------+ | 2020-04-15 08:05 | 4.00 | C | supplier1 | | 2020-04-15 08:07 | 2.00 | A | supplier1 | | 2020-04-15 08:09 | 5.00 | D | supplier2 | | 2020-04-15 08:11 | 3.00 | B | supplier2 | | 2020-04-15 08:13 | 1.00 | E | supplier1 | | 2020-04-15 08:17 | 6.00 | F | supplier2 | +------------------+-------+------+-------------+ -- 翻转窗口聚合 Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +------------------+------------------+-------+ -- 跳跃窗口聚合 Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | +------------------+------------------+-------+ -- 累积窗口聚合 Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +------------------+------------------+-------+注意:为了更好地理解窗口的行为,我们简化了时间戳值的显示,不显示尾随零,例如如果类型为 TIMESTAMP(3),2020-04-15 08:05 在 Flink SQL Client 中应显示为 2020-04-15 08:05:00.000。三、分组集窗口聚合还支持 GROUPING SETS 语法。分组集允许比标准 GROUP BY 描述的分组操作更复杂的分组操作。行按每个指定的分组集单独分组,并为每个组计算聚合,就像简单的 GROUP BY 子句一样。使用 GROUPING SETS 的窗口聚合要求 window_start 和 window_end 列必须位于 GROUP BY 子句中,但不能位于 GROUPING SETS 子句中。Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ()); +------------------+------------------+-------------+-------+ | window_start | window_end | supplier_id | price | +------------------+------------------+-------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 | +------------------+------------------+-------------+-------+GROUPING SETS 的每个子列表可以指定零个或多个列或表达式,并以与直接在 GROUP BY 子句中使用相同的方式进行解释。空分组集意味着所有行都聚合到一个组,即使不存在输入行也会输出该组。对于未出现这些列的分组集,对分组列或表达式的引用将替换为结果行中的空值。四、ROLLUPROLLUP 是用于指定常见类型的分组集的简写符号。它表示给定的表达式列表和列表的所有前缀,包括空列表。使用 ROLLUP 进行窗口聚合要求 window_start 和 window_end 列必须位于 GROUP BY 子句中,但不能位于 ROLLUP 子句中。例如,以下查询与上面的查询等效。SELECT window_start, window_end, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, ROLLUP (supplier_id);五、CUBECUBE 是用于指定常见类型的分组集的简写符号。它表示给定的列表及其所有可能的子集 - 幂集。使用 CUBE 的窗口聚合要求 window_start 和 window_end 列必须位于 GROUP BY 子句中,但不能位于 CUBE 子句中。例如,以下两个查询是等效的。SELECT window_start, window_end, item, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, CUBE (supplier_id, item); SELECT window_start, window_end, item, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ( (supplier_id, item), (supplier_id ), ( item), ( ) )六、选择组窗口开始和结束时间戳分组窗口的开始和结束时间戳可以通过 window_start 和 window_end 来选定.七、多级窗口聚合window_start 和 window_end 列是普通的时间戳字段,并不是时间属性。因此它们不能在后续的操作中当做时间属性进行基于时间的操作。 为了传递时间属性,需要在 GROUP BY 子句中添加 window_time 列。window_time 是 Windowing TVFs 产生的三列之一,它是窗口的时间属性。 window_time 添加到 GROUP BY 子句后就能被选定了。下面的查询可以把它用于后续基于时间的操作,比如:多级窗口聚合 和 Window TopN。下面展示了一个多级窗口聚合:第一个窗口聚合后把时间属性传递给第二个窗口聚合。-- 每个supplier_id翻滚5分钟 CREATE VIEW window1 AS -- 注意:内部Window TVF 的窗口开始和窗口结束字段在select 子句中是可选的。但是,如果它们出现在子句中,则需要为它们起别名,以防止名称与外部窗口 TVF 的窗口开始和窗口结束发生冲突。 SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY supplier_id, window_start, window_end, window_time; -- 在第一个窗口翻滚 10 分钟 SELECT window_start, window_end, SUM(partial_price) as total_price FROM TABLE( TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;八、分组窗口聚合警告:分组窗口聚合已经过时。推荐使用更加强大和有效的窗口表值函数聚合。“窗口表值函数聚合"相对于"分组窗口聚合"有如下优点:包含 性能调优 中提到的所有性能优化。支持标准的 GROUPING SETS 语法。可以在窗口聚合结果上使用 窗口 TopN。等等。分组窗口聚合定义在 SQL 的 GROUP BY 子句中。和普通的 GROUP BY 子句一样,包含分组窗口函数的 GROUP BY 子句的查询会对各组分别计算,各产生一个结果行。批处理表和流表上的SQL支持以下分组窗口函数:分组窗口函数Group Window FunctionDescriptionTUMBLE(time_attr, interval)定义一个滚动时间窗口。它把数据分配到连续且不重叠的固定时间区间(interval),例如:一个5分钟的滚动窗口以5分钟为间隔对数据进行分组。滚动窗口可以被定义在事件时间(流 + 批)或者处理时间(流)上。HOP(time_attr, interval, interval)定义一个滑动时间窗口,它有窗口大小(第二个 interval 参数)和滑动间隔(第一个 interval 参数)两个参数。如果滑动间隔小于窗口大小,窗口会产生重叠。所以,数据可以被指定到多个窗口。例如:一个15分钟大小和5分钟滑动间隔的滑动窗口将每一行分配给3个15分钟大小的不同窗口,这些窗口以5分钟的间隔计算。滑动窗口可以被定义在事件时间(流 + 批)或者处理时间(流)上。SESSION(time_attr, interval)定义一个会话时间窗口。会话时间窗口没有固定的时间区间,但其边界是通过不活动的时间 interval 定义的,即:一个会话窗口会在指定的时长内没有事件出现时关闭。例如:一个30分钟间隔的会话窗口收到一条数据时,如果之前已经30分钟不活动了(否则,这条数据会被分配到已经存在的窗口中),它会开启一个新窗口,如果30分钟之内没有新数据到来,就会关闭。会话窗口可以被定义在事件时间(流 + 批) 或者处理时间(流)上。九、时间属性在流处理模式,分组窗口函数的 time_attr 属性必须是一个有效的处理或事件时间。在批处理模式,分组窗口函数的 time_attr 参数必须是一个 TIMESTAMP 类型的属性。十、选取分组窗口开始和结束时间戳分组窗口的开始和结束时间戳以及时间属性也可以通过下列辅助函数的方式获取到:辅助函数描述TUMBLE_START(time_attr, interval)、HOP_START(time_attr, interval, interval)、SESSION_START(time_attr, interval)返回相应的滚动,滑动或会话窗口的下限的时间戳(inclusive),即窗口开始时间。TUMBLE_END(time_attr, interval)、HOP_END(time_attr, interval, interval)、SESSION_END(time_attr, interval)返回相应滚动窗口,跳跃窗口或会话窗口的上限的时间戳(exclusive),即窗口结束时间。注意: 上限时间戳(exlusive)不能作为 rowtime attribute 用于后续基于时间的操作,例如:interval joins 和 group window 或 over window aggregations。TUMBLE_ROWTIME(time_attr, interval)、HOP_ROWTIME(time_attr, interval, interval)、SESSION_ROWTIME(time_attr, interval)返回相应滚动窗口,跳跃窗口或会话窗口的上限的时间戳(inclusive),即窗口事件时间,或窗口处理时间。返回的值是 rowtime attribute,可以用于后续基于时间的操作,比如:interval joins 和 group window 或 over window aggregations。TUMBLE_PROCTIME(time_attr, interval)、HOP_PROCTIME(time_attr, interval, interval)、SESSION_PROCTIME(time_attr, interval)返回的值是 proctime attribute,可以用于后续基于时间的操作,比如: interval joins 和 group window 或 over window aggregations。注意: 辅助函数的参数必须和 GROUP BY 子句中的分组窗口函数一致。下面的例子展示了在流式表上如何使用分组窗口 SQL 查询:CREATE TABLE Orders ( user BIGINT, product STRING, amount INT, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '1' MINUTE ) WITH (...); SELECT user, TUMBLE_START(order_time, INTERVAL '1' DAY) AS wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(order_time, INTERVAL '1' DAY), user

0
0
0
浏览量1954