与标准SQL中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值,比如说,可以以每一行数据为基准,计算它之前1小时内所有数据的平均值,也可以计算它之前10个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的"开窗函数"。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口TVF聚合都是多对一的关系,将数据分组之后每组只会得到一个聚合结果。开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个多对多的关系。
与标准SQL中一致,Flink SQL中的开窗函数也是通过OVER子句来实现的,所以有时开窗聚合也叫做OVER聚合(Over Aggregation)。基本语法如下:
SELECT
<聚合函数> OVER (
[PARTITION BY <字段1>[,<字段2>,...]]
ORDER BY <时间属性字段>
<开窗范围>),
...
FROM ...
这里OVER关键字前面是一个聚合函数,会应用在后面OVER定义的窗口上。在OVER子句中主要有以下几个部分:
BETWEEN ... PRECEDING AND CURRENT ROW
前面提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间作出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。
范围间隔:
RANGE BETWEENT INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
行间隔:
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
/**
* Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
* <p>
* Project: FlinkTutorial
* <p>
* Created by wushengran
*/
public class TimeAndWindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 在创建表的DDL中直接定义时间属性
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING, " +
" url STRING, " +
" ts BIGINT, " +
" et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
" WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv', " +
" 'format' = 'csv' " +
")";
tableEnv.executeSql(createDDL);
// 2. 在流转换成Table时定义时间属性
SingleOutputStreamOperator<Event> clickStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
Table clickTable = tableEnv.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"),
$("et").rowtime());
clickTable.printSchema();
// 聚合查询转换
// 4. 开窗聚合
Table overWindowResultTable = tableEnv.sqlQuery("SELECT user_name, " +
" avg(ts) OVER (" +
" PARTITION BY user_name " +
" ORDER BY et " +
" ROWS BETWEEN 3 PRECEDING AND CURRENT ROW" +
") AS avg_ts " +
"FROM clickTable");
// 结果表转换成流打印输出
tableEnv.toDataStream(overWindowResultTable).print("over window: ");
env.execute();
}
}
over window: > +I[Mary, 1000]
over window: > +I[Bob, 2000]
over window: > +I[Alice, 3000]
over window: > +I[Bob, 2500]
over window: > +I[Alice, 3500]
over window: > +I[Bob, 6000]
over window: > +I[Alice, 10333]
阅读量:2010
点赞量:0
收藏量:0