创建好了表,接下来就是对表进行查询转换。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。
Flink为我们提供了两种查询方式:SQL和Table API。
基于表执行SQL语句,是我们最为熟悉的查询方式。Flink基于Apache Calcite来提供对SQL的支持,Calcite是一个为不同的计算平台提供标准SQL查询的底层工具,很多大数据框架如Apache Hive、Apache Kylin中的SQL支持都是通过集成Calcite来实现的。
在代码中,我们只要调用表环境sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象。
创建表环境
TableEnvironment tableEnv = ...;
创建表
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ('connector' = ...)");
查询用户Alice的点击事件,并提取表中前两个字段
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user,url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
目前Flink支持标准SQL中的绝大部分用法,并提供了丰富的计算函数。这样我们就可以把已有的技术迁移过来,像在MySQL、Hive中那样直接通过编写SQL实现自己的处理需求,从而大大降低了Flink上手的难度。
例如,我们可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行统计计算:
Table urlCountTable = tableEnv.sqlQuery(
"SELECT user,COUNT(url) " +
"FROM EventTable " +
"GROUP BY user "
);
上面的例子得到的是一个新的Table对象,我们可以再次将它注册为虚拟表继续在SQL中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入的是一个INSERT语句:
注册表:
tableEnv.executeSql("CREATE TABLE EventTable ... WITH ('connector' = ...)");
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ('connector' = ...)");
将查询结果输出到OutputTable中
tableEnv.executeSql(
"INSERT INTO OutputTable "+
"SELECT user,url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
将得到的结果写入输出表
TableResult tableResult = table1.executeInsert("outputTable");
Table eventTable = tableEnv.from("EventTable");
传入的参数就是注册好的表名。注意这里eventTable是一个Table对象,而EventTable是在环境中注册的表名。得到Table对象之后,就可以调用API进行各种转换操作了,得到的是一个新的Table对象:
Table maryClickTable = eventTable.where($("user").isEqual("Alice"))
.select($("url"),$("user"));
这里每个方法的参数都是一个表达式,用方法调用的形式直观地说明了想要表达的内容。“$”符号用来指定表中的一个字段。上面的代码和直接执行SQL是等效的。
Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦些。目前Table API支持的功能相对更少,可以预见未来Flink社区也会以扩展SQL为主,为大家提供更加通用的接口方式。所以接下来也会以介绍SQL为主。
调用Table API进行表的查询转换
Table clickTable = tableEnv.from("clickTable");
Table resultTable = clickTable.where($("user_name").isEqual("bom")).select($("username"), $(
"url"));
tableEnv.createTemporaryView("result",resultTable);
阅读量:1452
点赞量:0
收藏量:0