像大多数数据系统一样,Apache Flink支持聚合函数;包括内置的和用户定义的。用户自定义函数在使用前必须在目录中注册。
聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”(平均)、“MAX”(最大)和 “MIN”(最小)。
SELECT COUNT(*) FROM Orders
对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止。而且它们会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。
Apache Flink 支持标准的 GROUP BY 子句来聚合数据。
SELECT COUNT(*)
FROM Orders
GROUP BY order_id
对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。
Flink 对于分组聚合提供了一系列性能优化的方法。
DISTINCT 聚合在聚合函数前去掉重复的数据。下面的示例计算 Orders 表中不同 order_ids 的数量,而不是总行数。
SELECT COUNT(DISTINCT order_id) FROM Orders
对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小大多数情况下取决于去重行的数量和分组持续的时间,持续时间较短的 group 窗口不会产生状态过大的问题。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。
Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。
SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商的评分总数。
结果:
+-------------+--------+-------+
| supplier_id | rating | total |
+-------------+--------+-------+
| supplier1 | 4 | 1 |
| supplier1 | (NULL) | 2 |
| (NULL) | (NULL) | 4 |
| supplier1 | 3 | 1 |
| supplier2 | 3 | 1 |
| supplier2 | (NULL) | 2 |
| supplier2 | 4 | 1 |
+-------------+--------+-------+
GROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。
对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。例如,上例中的 GROUPING SETS ((supplier_id), ()) 里的 () 就是空子列表,与其对应的结果数据中的 supplier_id 列使用 NULL 填充。
对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于 Grouping Sets 的数量以及聚合函数的类型。可以提供一个合适的状态 time-to-live (TTL)配置来防止状态过大.注意:这可能会影响查询结果的正确性。
ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。
例如:下面这个查询和上个例子是等效的。
SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)
这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商和评分组合的评分总数。
CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。
例如:下面两个查询是等效的。
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)
这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商、评分和产品ID组合的评分总数。
HAVING 会删除 group 后不符合条件的行。 HAVING 和 WHERE 的不同点:WHERE 在 GROUP BY 之前过滤单独的数据行。HAVING 过滤 GROUP BY 生成的数据行。 HAVING 条件中的每一列引用必须是明确的 grouping 列,除非它出现在聚合函数中。
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
即使没有 GROUP BY 子句,HAVING 的存在也会使查询变成一个分组查询。这与查询包含聚合函数但没有 GROUP BY 子句时的情况相同。查询认为所有被选中的行形成一个单一的组,并且 SELECT 列表和 HAVING 子句只能从聚合函数中引用列。如果 HAVING 条件为真,这样的查询将发出一条记录,如果不为真,则发出零条记录。
阅读量:1405
点赞量:0
收藏量:0