调用toDataStream()方法
Table aliceVisitTable = tableEnv.sqlQuery(
"SELECT user,url " +
"FROM EventTable " +
"WHERE user = 'Alice' "
);
将表转换成数据流,这里需要将要转换的Table对象作为参数传入。
tableEnv.toDataStream(aliceVisitTable).print();
调用toChangelogStream()方法
tableEnv.createTemporaryView("clickTable",eventTable);
Table aggResult = tableEnv.sqlQuery("select user,COUNT(url) as cnt from clickTable group by user");
tableEnv.toChangelogStream(aggResult).print("agg");
调用fromDataStream()方法
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
获取表环境
//创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
读取数据源
SingleOutputStreamOperator<Event> eventStream = env.addSource(...)
将数据流转换成表
Table eventTable = tableEnv.fromDataStream(eventStream);
由于流中的数据本身就是定义好的POJO类型Event,所以我们将流转换成表之后,每一行数据就对应着一个Event,而表中的列名就对应着Event中的属性。
另外,还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置。
提取Event中的timestamp和url作为表中的列
Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp"),$("url"));
需要注意的是,timestamp本身是SQL中的关键字,所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的as()方法对字段进行重命名。
Table eventTable2 = tableEnv.fromDataStream(eventStream,$("timestamp").as("ts"),$("url"));
调用createTemporaryView()方法
tableEnv.createTemporaryView("EventTable",eventStream,$("timestamp").as("ts"),$("url"));
这样接下来就可以直接在SQL中引用表EventTable了。
调用fromChangelogStream()方法
表环境还提供了一个方法fromChangelogStream(),可以将一个更新日志流转换成表。这个方法要求流中的数据类型只能是Row,而且每一个数据都需要指定当前航的更新类型(RowKind)。所以一般是由连接器帮我们实现的。
原子类型:
StreamTableEnvironment tableEnv = ...;
DataStream<Long> stream = ...;
将数据流转换成动态表,动态表只有一个字段,重命名为myLong
Table table = tableEnv.fromDataStream(stream,$("myLong"));
Tuple类型
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long,Integer>> stream = ... ;
将数据流转换成只包含f1字段的表
Table table = tableEnv.fromDataStream(stream,$("f1"));
将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换
Table table = tableEnv.fromDataStream(stream,$("f1"),$("f0"));
将f1字段命名为myInt,f0命名为myLong
Table table = tableEnv.fromDataStream(stream,$("f1").as("myInt"),$("f0").as("myLong"));
Row类型
DataStream<Row> dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT,"Alice",12),
Row.ofKind(RowKind.INSERT,"Bob",5),
Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),
Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100)
);
将更新日志流转换为表
Table table = tableEnv.fromChangelogStream(dataStream);
阅读量:202
点赞量:0
收藏量:0