Flink系列之:分组聚合-灵析社区

打酱油的后端

像大多数数据系统一样,Apache Flink支持聚合函数;包括内置的和用户定义的。用户自定义函数在使用前必须在目录中注册。

聚合函数把多行输入数据计算为一行结果。例如,有一些聚合函数可以计算一组行的 “COUNT”、“SUM”、“AVG”(平均)、“MAX”(最大)和 “MIN”(最小)。

SELECT COUNT(*) FROM Orders

对于流式查询,重要的是要理解 Flink 运行的是连续查询,永远不会终止。而且它们会根据其输入表的更新来更新其结果表。对于上述查询,每当有新行插入 Orders 表时,Flink 都会实时计算并输出更新后的结果。

Apache Flink 支持标准的 GROUP BY 子句来聚合数据。

SELECT COUNT(*)
FROM Orders
GROUP BY order_id

对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于分组的数量以及聚合函数的数量和类型。例如:MIN/MAX 的状态是重量级的,COUNT 是轻量级的。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。

Flink 对于分组聚合提供了一系列性能优化的方法。

一、DISTINCT 聚合

DISTINCT 聚合在聚合函数前去掉重复的数据。下面的示例计算 Orders 表中不同 order_ids 的数量,而不是总行数。

SELECT COUNT(DISTINCT order_id) FROM Orders

对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小大多数情况下取决于去重行的数量和分组持续的时间,持续时间较短的 group 窗口不会产生状态过大的问题。可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这可能会影响查询结果的正确性。

二、GROUPING SETS

Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。

SELECT supplier_id, rating, COUNT(*) AS total
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())

这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商的评分总数。

  • 首先,我们定义了一个包含供应商ID、产品ID和评分的VALUES子句,表示我们的原始数据。每个元组代表了一个产品的供应商、产品和评分。
  • 然后,我们使用AS关键字给VALUES子句指定别名为Products,并指定了三个列名:supplier_id、product_id和rating。
  • 接下来,我们使用GROUP BY子句和GROUPING SETS关键字来分组数据。GROUP BY子句定义了我们想要按照哪些列进行分组。在这个查询中,我们定义了三个分组集合:(supplier_id, rating)、(supplier_id)和()。它们分别表示按照supplier_id和rating分组、只按照supplier_id分组以及不进行任何分组。
  • 最后,我们使用COUNT(*)函数来计算每个分组的产品评分总数,并将结果作为"total"列返回。
  • 这个查询的结果将为每个供应商和评分组合提供评分总数,以及每个供应商的总评分数和所有供应商的总评分

结果:

+-------------+--------+-------+
| supplier_id | rating | total |
+-------------+--------+-------+
|   supplier1 |      4 |     1 |
|   supplier1 | (NULL) |     2 |
|      (NULL) | (NULL) |     4 |
|   supplier1 |      3 |     1 |
|   supplier2 |      3 |     1 |
|   supplier2 | (NULL) |     2 |
|   supplier2 |      4 |     1 |
+-------------+--------+-------+

GROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。

对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。例如,上例中的 GROUPING SETS ((supplier_id), ()) 里的 () 就是空子列表,与其对应的结果数据中的 supplier_id 列使用 NULL 填充。

对于流式查询,用于计算查询结果的状态可能无限膨胀。状态的大小取决于 Grouping Sets 的数量以及聚合函数的类型。可以提供一个合适的状态 time-to-live (TTL)配置来防止状态过大.注意:这可能会影响查询结果的正确性。

三、ROLLUP

ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。

例如:下面这个查询和上个例子是等效的。

SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, rating)

这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商和评分组合的评分总数。

  • 首先,我们定义了一个包含供应商ID、产品ID和评分的VALUES子句,表示我们的原始数据。每个元组代表了一个产品的供应商、产品和评分。
  • 然后,我们使用AS关键字给VALUES子句指定别名为Products,并指定了三个列名:supplier_id、product_id和rating。
  • 接下来,我们使用GROUP BY子句和ROLLUP关键字来进行分组。ROLLUP允许我们构造多个层次的大汇总。在这个查询中,我们使用ROLLUP(supplier_id, rating)来创建了两个层次的分组:一个按供应商ID和评分进行分组的层次,以及一个只按供应商ID进行分组的层次。
  • 最后,我们使用COUNT()函数来计算每个分组的产品评分总数,并返回结果中的"supplier_id"、"rating"和"COUNT()"三列。
  • 这个查询的结果将为每个供应商和评分组合提供评分总数,以及每个供应商在不同评分水平上的总评分数。同时,结果还包括以评分水平为基础的总评分数和所有供应商的总评分数。

四、CUBE

CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。

例如:下面两个查询是等效的。

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, rating, product_id)

SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
    ('supplier1', 'product1', 4),
    ('supplier1', 'product2', 3),
    ('supplier2', 'product3', 3),
    ('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
    ( supplier_id, product_id, rating ),
    ( supplier_id, product_id         ),
    ( supplier_id,             rating ),
    ( supplier_id                     ),
    (              product_id, rating ),
    (              product_id         ),
    (                          rating ),
    (                                 )
)

这个Flink SQL查询的目标是,基于给定的产品评分数据,计算每个供应商、评分和产品ID组合的评分总数。

  • 首先,我们定义了一个包含供应商ID、产品ID和评分的VALUES子句,表示我们的原始数据。每个元组代表了一个产品的供应商、产品和评分。
  • 然后,我们使用AS关键字给VALUES子句指定别名为Products,并指定了三个列名:supplier_id、product_id和rating。
  • 接下来,我们使用GROUP BY子句和CUBE关键字来进行分组。CUBE允许我们构造所有可能的组合。在这个查询中,我们使用CUBE(supplier_id, rating, product_id)来创建了所有可能的组合:按供应商ID、评分和产品ID进行分组的组合、只按供应商ID和评分进行分组的组合、只按供应商ID和产品ID进行分组的组合、只按评分和产品ID进行分组的组合,以及只按供应商ID进行分组的组合,只按评分进行分组的组合,只按产品ID进行分组的组合,以及不进行任何分组的组合。
  • 最后,我们使用COUNT()函数来计算每个分组的产品评分总数,并返回结果中的"supplier_id"、“rating”、"product_id"和"COUNT()"四列。
  • 这个查询的结果将为每个供应商、评分和产品ID组合提供评分总数,以及不同组合下的总评分数。同时,结果还包括每个供应商、每个评分和每个产品ID的总评分数,以及所有供应商、所有评分和所有产品ID的总评分数。

五、HAVING

HAVING 会删除 group 后不符合条件的行。 HAVING 和 WHERE 的不同点:WHERE 在 GROUP BY 之前过滤单独的数据行。HAVING 过滤 GROUP BY 生成的数据行。 HAVING 条件中的每一列引用必须是明确的 grouping 列,除非它出现在聚合函数中。

SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50

即使没有 GROUP BY 子句,HAVING 的存在也会使查询变成一个分组查询。这与查询包含聚合函数但没有 GROUP BY 子句时的情况相同。查询认为所有被选中的行形成一个单一的组,并且 SELECT 列表和 HAVING 子句只能从聚合函数中引用列。如果 HAVING 条件为真,这样的查询将发出一条记录,如果不为真,则发出零条记录。

阅读量:1405

点赞量:0

收藏量:0