例如,你可能在 Kafka 中具有原始日志数据,并希望使用 Flink SQL 读取和分析此类数据。
47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316 "https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
下面的代码创建了一张表,使用 raw format 以 UTF-8 编码的形式从中读取(也可以写入)底层的 Kafka topic 作为匿名字符串值:
CREATE TABLE nginx_log (
log STRING
) WITH (
'connector' = 'kafka',
'topic' = 'nginx_log',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'raw'
)
然后,你可以将原始数据读取为纯字符串,之后使用用户自定义函数将其分为多个字段进行进一步分析。例如 示例中的 my_split。
SELECT t.hostname, t.datetime, t.url, t.browser, ...
FROM(
SELECT my_split(log) as t FROM nginx_log
);
相对应的,你也可以将一个 STRING 类型的列以 UTF-8 编码的匿名字符串值写入 Kafka topic。
参数 | 是否必选 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必选 | (none) | String | 指定要使用的格式, 这里应该是 ‘raw’。 |
raw.charset | 可选 | UTF-8 | String | 指定字符集来编码文本字符串。 |
raw.endianness | 可选 | big-endian | String | 指定字节序来编码数字值的字节。有效值为’big-endian’和’little-endian’。 |
下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。
阅读量:1976
点赞量:0
收藏量:0