在Flink SQL中,是通过OVER聚合和一个条件筛选来实现Top N的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来。然后再用WHERE子句筛选行号小于等于N的那些行返回。
基本语法如下:
SELECT ...
FROM (
SELECT ...,
ROW_NUMBER() OVER(
[PARTITION BY <字段1>[,<字段1>...]]
ORDER BY <排序字段1> [asc|desc][,<排序字段2> [asc|desc]...]
) AS row_num
FROM ...)
WHERE row_num <= N [AND <其他条件>]
OVER窗口定义,目的就是利用ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为row_num,并在外层的查询中以row_num<=N作为条件进行筛选,就可以得到根据排序字段统计的Top N结果了。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TopNExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//在创建表的DDL中直接定义时间属性
String createDDL = "create table clickTable (" +
" `user` String, " +
" url String, " +
" ts bigint, " +
" et AS TO_TIMESTAMP( from_unixtime(ts/1000))," +
" watermark for et as et-interval '1' second " +
") with ( " +
" 'connector' = 'filesystem'," +
" 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " +
" 'format' = 'csv'" +
") ";
tableEnv.executeSql(createDDL);
//普通topN 选取当前所有用户中浏览量最大的两个
Table topNResultTable = tableEnv.sqlQuery("select user,cnt,row_num " +
"from (" +
" select*,row_number() over(" +
" order by cnt desc" +
" ) as row_num" +
" from ( select user,count(url) as cnt from clickTable group by user)" +
" ) where row_num<=2");
tableEnv.toChangelogStream(topNResultTable).print("top 2: ");
env.execute();
}
}
top 2: > +I[Mary, 1, 1]
top 2: > +I[Bob, 1, 2]
top 2: > -U[Mary, 1, 1]
top 2: > +U[Bob, 2, 1]
top 2: > -U[Bob, 1, 2]
top 2: > +U[Mary, 1, 2]
top 2: > -U[Mary, 1, 2]
top 2: > +U[Alice, 2, 2]
top 2: > -U[Bob, 2, 1]
top 2: > +U[Bob, 3, 1]
top 2: > -U[Alice, 2, 2]
top 2: > +U[Alice, 3, 2]
除了直接对数据进行Top N的选取,也可以针对窗口来做Top N。
具体来说,可以先做一个窗口聚合,将窗口信息window_start、window_end连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的Top N那样定义OVER窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前N行就可以得到结果。所以窗口Top N的实现就是窗口聚合与OVER聚合的结合使用。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TopNExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//在创建表的DDL中直接定义时间属性
String createDDL = "create table clickTable (" +
" `user` String, " +
" url String, " +
" ts bigint, " +
" et AS TO_TIMESTAMP( from_unixtime(ts/1000))," +
" watermark for et as et-interval '1' second " +
") with ( " +
" 'connector' = 'filesystem'," +
" 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " +
" 'format' = 'csv'" +
") ";
tableEnv.executeSql(createDDL);
//窗口topN 一段时间内活跃用户统计
String subQuery = " select user, count(url) as cnt, window_start, window_end " +
" from table ( " +
" tumble( table clickTable, descriptor(et), interval '10' second )" +
") " +
" group by user, window_start, window_end ";
Table windowTopNResultTable = tableEnv.sqlQuery("select user, cnt, row_num, window_end " +
"from (" +
" select * , row_number() over(" +
" partition by window_start, window_end " + // 固定写法
" order by cnt desc" +
" ) as row_num" +
" from ( " + subQuery + " ) " +
") where row_num <= 2");
tableEnv.toDataStream(windowTopNResultTable).print("window Top N :");
env.execute();
}
}
window Top N :> +I[Bob, 2, 1, 1970-01-01T08:00:10]
window Top N :> +I[Alice, 2, 2, 1970-01-01T08:00:10]
window Top N :> +I[Bob, 1, 1, 1970-01-01T08:00:20]
window Top N :> +I[Alice, 1, 1, 1970-01-01T08:00:30]
阅读量:1863
点赞量:0
收藏量:0