可以将关系型表/SQL与流处理做一个对比。
关系型表/SQL | 流处理 | |
---|---|---|
流处理 | 字段元组的有界集合 | 字段元祖的无限序列 |
查询 | 可以访问完整的数据输入 | 无法访问到所有数据,必须持续等待流式输入 |
对数据的访问 | ||
查询终止条件 | 生成固定大小的结果集合终止 | 永不停止,根据持续收到的数据不断更新查询结果 |
流处理面对的数据是连续不断的,这导致了流处理中的表跟我们熟悉的关系型数据库中的表完全不同。基于表的查询操作,也就有了新的含义。
希望把流数据换成表的形式,那么这表中的数据就会不断增长。如果进一步基于表执行SQL查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。
持续查询的过程,可以清晰地看到流、动态表和持续查询的关系:
持续查询的步骤如下:
这样,只要API将流和动态表的转换封装起来,就可以直接在数据流上执行SQL查询,用处理表的方式来做流处理了。
流转换成动态表的过程:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//将dataStream转换成table
Table eventTable = tableEnv.fromDataStream(eventStream);
//直接写sql进行转换
Table resultTable = tableEnv.sqlQuery("select user,url from " + eventTable);
//基于table直接转换
Table resultTable2 = eventTable.select($("user"), $("url"))
.where($("user").isEqual("Alice"));
//转换成流打印输出
tableEnv.toDataStream(resultTable2).print("result2");
tableEnv.toDataStream(resultTable).print("result");
env.execute();
在代码中定义了一个SQL查询
Table urlCountTable = tableEnv.sqlQuery("SELECT user,COUNT(url) as cnt FROM EnentTable GROUP BY user")
这个查询很简答,主要是分组聚合统计每个用户点击次数。把原始的动态表注册为EventTable,经过查询转换后得到urlCountTable。这个结果动态表中包含两个字断,具体定义如下:
[
user: VARCHAR, //用户名
cnt: BIGINT。//用户访问url的次数
]
当原始动态表不停地插入新的数据时,查询得到的urlCountTable会持续地进行更改。由于count数量可能会叠加增长,因此这里的更改操作可以是简单的插入(Insert),也可以是对之前数据的更新(Update)。换句话说,用来定义结果表的更新日志(changelog)流中,包含了INSERT和UPDATE两种操作。这种持续查询被称为更新查询(Update Query),更新查询得到的结果表如果想要转换成DataStream,必须调用toChangelogStream()方法。
查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表EventTable一样,只有插入Insert操作了。
Table aliceVisitTable = tableEnv.sqlQuery("SELECT url,user FROM EventTable WHERE user = 'Cary' ");
可以考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在接过表中增加一个endT字段,表示当前统计窗口的结束时间。这时结果表的字段定义如下:
[
user: VARCHAR, //用户名
endT: TIMESTAMP, //窗口结束时间
cnt: BIGINT. //用户访问url的次数
]
将动态表转换为撤回流的过程:
既然更新插入流中不区分插入(insert)和更新(update),自然会想到一个问题,如果希望更新一行数据时,怎么保证最后做的操作不是插入呢?
这就需要动态表中必须有唯一的键(key)。通过这个key进行查询,如果存在对应的数据就做更新(update),如果不存在就直接插入(insert)。这是一个动态表可以转换为更新插入流的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键(key),这样才能正确地处理消息。
将动态表转换为更新插入流的过程:
阅读量:1698
点赞量:0
收藏量:0