pom.xml引入flink相关jar包
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<!-- 引入 Flink 相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class ClickSource implements SourceFunction<Event> {
// 声明一个标志位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 随机生成数据
Random random = new Random();
// 定义字段选取的数据集
String[] users = {"Mary", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav"};
// 循环生成数据
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
Long timestamp = Calendar.getInstance().getTimeInMillis();
ctx.collect(new Event(user,url,timestamp));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event(){
}
public Event(String user,String url,Long timestamp){
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
public String toString(){
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class SimpleTableExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//创建表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//将dataStream转换成table
Table eventTable = tableEnv.fromDataStream(eventStream);
//直接写sql进行转换
Table resultTable = tableEnv.sqlQuery("select user,url from " + eventTable);
//基于table直接转换
Table resultTable2 = eventTable.select($("user"), $("url"))
.where($("user").isEqual("Alice"));
//转换成流打印输出
tableEnv.toDataStream(resultTable2).print("result2");
tableEnv.toDataStream(resultTable).print("result");
env.execute();
}
}
result2> +I[Alice, ./cart]
result> +I[Alice, ./cart]
result> +I[Mary, ./cart]
result> +I[Mary, ./fav]
result> +I[Mary, ./cart]
result2> +I[Alice, ./fav]
result> +I[Alice, ./fav]
result> +I[Cary, ./fav]
result> +I[Bob, ./cart]
result> +I[Mary, ./fav]
result> +I[Bob, ./home]
result> +I[Bob, ./home]
result> +I[Mary, ./home]
result> +I[Mary, ./home]
阅读量:174
点赞量:0
收藏量:0