Flink系列Table API和SQL之:普通Top N和窗口Top N-灵析社区

打酱油的后端

一、普通Top N

在Flink SQL中,是通过OVER聚合和一个条件筛选来实现Top N的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来。然后再用WHERE子句筛选行号小于等于N的那些行返回。

基本语法如下:

SELECT ...
FROM (
	SELECT ...,
	ROW_NUMBER() OVER(
		[PARTITION BY <字段1>[,<字段1>...]]
		ORDER BY <排序字段1> [asc|desc][,<排序字段2> [asc|desc]...]
	) AS row_num
	FROM ...)
WHERE row_num <= N [AND <其他条件>]

OVER窗口定义,目的就是利用ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为row_num,并在外层的查询中以row_num<=N作为条件进行筛选,就可以得到根据排序字段统计的Top N结果了。

二、普通Top N代码实现

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TopNExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //在创建表的DDL中直接定义时间属性
        String createDDL  = "create table clickTable (" +
                " `user` String, " +
                " url String, " +
                " ts bigint, " +
                " et AS TO_TIMESTAMP( from_unixtime(ts/1000))," +
                " watermark for et as et-interval '1' second " +
                ") with ( " +
                " 'connector' = 'filesystem'," +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " +
                " 'format' = 'csv'" +
                ") ";

        tableEnv.executeSql(createDDL);

        //普通topN 选取当前所有用户中浏览量最大的两个
        Table topNResultTable = tableEnv.sqlQuery("select user,cnt,row_num " +
                "from (" +
                " select*,row_number() over(" +
                " order by cnt desc" +
                " ) as row_num" +
                " from ( select user,count(url) as cnt from clickTable group by user)" +
                " ) where row_num<=2");

        tableEnv.toChangelogStream(topNResultTable).print("top 2: ");
        env.execute();


    }
}
top 2: > +I[Mary, 1, 1]
top 2: > +I[Bob, 1, 2]
top 2: > -U[Mary, 1, 1]
top 2: > +U[Bob, 2, 1]
top 2: > -U[Bob, 1, 2]
top 2: > +U[Mary, 1, 2]
top 2: > -U[Mary, 1, 2]
top 2: > +U[Alice, 2, 2]
top 2: > -U[Bob, 2, 1]
top 2: > +U[Bob, 3, 1]
top 2: > -U[Alice, 2, 2]
top 2: > +U[Alice, 3, 2]

三、窗口Top N

除了直接对数据进行Top N的选取,也可以针对窗口来做Top N。

具体来说,可以先做一个窗口聚合,将窗口信息window_start、window_end连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。接下来就可以像一般的Top N那样定义OVER窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前N行就可以得到结果。所以窗口Top N的实现就是窗口聚合与OVER聚合的结合使用。

四、窗口Top N代码实现

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TopNExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //在创建表的DDL中直接定义时间属性
        String createDDL  = "create table clickTable (" +
                " `user` String, " +
                " url String, " +
                " ts bigint, " +
                " et AS TO_TIMESTAMP( from_unixtime(ts/1000))," +
                " watermark for et as et-interval '1' second " +
                ") with ( " +
                " 'connector' = 'filesystem'," +
                " 'path' = '/Users/fei.yang4/project/learn/src/main/java/com/bigdata/plus/flink/input/clicks.csv' , " +
                " 'format' = 'csv'" +
                ") ";

        tableEnv.executeSql(createDDL);

        //窗口topN 一段时间内活跃用户统计
        String subQuery = " select user, count(url) as cnt, window_start, window_end " +
                " from  table ( " +
                " tumble( table clickTable, descriptor(et), interval '10' second )" +
                ") " +
                " group by user, window_start, window_end  ";
        Table windowTopNResultTable = tableEnv.sqlQuery("select user, cnt, row_num, window_end " +
                "from (" +
                " select * , row_number() over(" +
                "        partition by window_start, window_end " +  // 固定写法
                "        order by cnt desc" +
                "       ) as row_num" +
                "       from  ( " + subQuery + " ) " +
                ") where row_num <= 2");
        tableEnv.toDataStream(windowTopNResultTable).print("window Top N :");
        env.execute();
    }
}
window Top N :> +I[Bob, 2, 1, 1970-01-01T08:00:10]
window Top N :> +I[Alice, 2, 2, 1970-01-01T08:00:10]
window Top N :> +I[Bob, 1, 1, 1970-01-01T08:00:20]
window Top N :> +I[Alice, 1, 1, 1970-01-01T08:00:30]


阅读量:1863

点赞量:0

收藏量:0