Flink系列Table API和SQL之:快速上手Table API和SQL-灵析社区

打酱油的后端

一、引入flink相关jar包

pom.xml引入flink相关jar包

	<properties>
		<flink.version>1.13.0</flink.version>
		<java.version>1.8</java.version>
		<scala.binary.version>2.12</scala.binary.version>
		<slf4j.version>1.7.30</slf4j.version>
	</properties>
<!-- 引入 Flink 相关依赖-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- 引入日志管理相关依赖-->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-to-slf4j</artifactId>
			<version>2.14.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.26</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-cep_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

二、ClickSource类


import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {
    // 声明一个标志位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        // 随机生成数据
        Random random = new Random();
        // 定义字段选取的数据集
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav"};
        // 循环生成数据
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            ctx.collect(new Event(user,url,timestamp));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

三、Event类

import java.sql.Timestamp;

public class Event {
    public  String user;
    public String url;
    public Long timestamp;
    public Event(){

    }
    public Event(String user,String url,Long timestamp){
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }
    public String toString(){
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';

    }
}

四、快速上手Table API和SQL

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class SimpleTableExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        //创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //将dataStream转换成table
        Table eventTable = tableEnv.fromDataStream(eventStream);
        //直接写sql进行转换
        Table resultTable = tableEnv.sqlQuery("select user,url from " + eventTable);
        //基于table直接转换
        Table resultTable2 = eventTable.select($("user"), $("url"))
                .where($("user").isEqual("Alice"));

        //转换成流打印输出
        tableEnv.toDataStream(resultTable2).print("result2");
        tableEnv.toDataStream(resultTable).print("result");

        env.execute();
    }
}

五、运行输出如下所示

result2> +I[Alice, ./cart]
result> +I[Alice, ./cart]
result> +I[Mary, ./cart]
result> +I[Mary, ./fav]
result> +I[Mary, ./cart]
result2> +I[Alice, ./fav]
result> +I[Alice, ./fav]
result> +I[Cary, ./fav]
result> +I[Bob, ./cart]
result> +I[Mary, ./fav]
result> +I[Bob, ./home]
result> +I[Bob, ./home]
result> +I[Mary, ./home]
result> +I[Mary, ./home]


阅读量:174

点赞量:0

收藏量:0