《Delta Lake Up & Running》第三章:Delta表的基本操作-灵析社区

攻城狮无远

Delta表可以通过多种方式创建。创建表的方式主要取决于您对工具集的熟悉程度。如果您主要是SQL开发人员,可以使用SQL的CREATE TABLE来创建Delta表,而Python用户可能更喜欢使用DataFrameWriter API或细粒度且易于使用的DeltaTableBuilder API。

在创建表时,您可以定义生成列,其值是根据Delta表中其他列上的用户指定函数自动生成的。虽然存在一些限制,但生成列是丰富Delta表模式的强大方式。

Delta表可以通过标准的ANSI SQL或使用流行的PySpark DataFrameReader API来读取。您可以使用经典的SQL INSERT语句写入Delta表,也可以将DataFrame附加到表中。最后,利用SQL的COPY INTO选项是快速附加大量数据的绝佳方式。

根据您经常使用的查询模式对Delta表进行分区可以显着改善查询和DML性能。组成Delta表的各个文件将以子目录的形式组织,这些子目录与分区列的值对齐。

Delta Lake允许您在事务日志中的提交条目中关联自定义元数据。这可以用于标记敏感的提交以进行审计。您还可以在表属性中存储自定义标签,就像您可以为云资源添加标签一样,现在您也可以将这些标签与Delta表关联起来。您还可以修改某些Delta功能。例如,您可以将delta.appendonly属性关联到表上以防止删除和更新。

创建Delta表

Delta Lake允许我们以三种不同的方式创建表:

  1. SQL数据定义语言(DDL)命令 SQL开发人员已经非常熟悉经典的CREATE TABLE命令,您只需添加一些属性即可使用它来创建Delta表。
  2. PySpark DataFrameWriter API 大数据Python(和Scala)开发人员很可能已经非常熟悉这个API,您可以继续使用它来操作Delta表。
  3. DeltaTableBuilder API 这是专为Delta表设计的新API。它采用了流行的Builder模式,为每个Delta表和列属性提供非常精细的控制。

在接下来的章节中,我们将亲自体验这些表创建方法中的每一种。

使用SQL DDL创建Delta表

在Spark计算环境中使用的SQL版本被称为Spark SQL,它是Spark支持的ANSI SQL的变种。Spark SQL通常与ANSI标准SQL兼容。有关Spark SQL变种的更多详细信息,请参考Spark文档。

正如前面提到的,您可以在Spark SQL中使用标准的SQL DDL命令来创建Delta表:

%sql
-- Create a Delta table by specifying the delta format, followed
-- by the path in quotes
CREATE TABLE IF NOT EXISTS delta.`/mnt/datalake/book/chapter03/rateCard`
(
    rateCodeId   INT,
    rateCodeDesc STRING
)
USING DELTA

您使用的表名的表示法是file_format | path_to_table,其中file_format是delta,而path_to_table是Delta表的路径。在实际应用中,使用这种格式可能会变得繁琐,因为文件路径可能会变得相当长。这就是目录的用处。目录允许您使用database.table_name表示法注册表,其中database是表的逻辑分组,而table_name是表的缩写。例如,如果您首先创建了一个名为taxidb的数据库,如下所示:

%sql
CREATE DATABASE IF NOT EXISTS taxidb;

然后,您可以按照以下方式创建上述表:

%sql
-- Create the table using the taxidb catalog
CREATE TABLE IF NOT EXISTS taxidb.rateCard
(
    rateCodeId   INT,
    rateCodeDesc STRING
)
USING DELTA
LOCATION '/mnt/datalake/book/chapter03/rateCard'

从此时开始,您可以将这个Delta表称为taxidb.rateCard,这比delta./mnt/datalake/book/chapter03/rateCard或者可能更长的路径更容易记忆和键入。Spark生态系统中最广泛使用的目录是Hive目录。

当在创建表的数据湖位置运行目录列表时,您会看到我们的目录是空的(因为您尚未加载任何数据),除了包含表的事务日志的_delta_log目录:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/rateCard
total 12
drwxrwxrwx 2 root root 4096 Dec  2 19:02 .
drwxrwxrwx 2 root root 4096 Dec  2 19:02 ..
drwxrwxrwx 2 root root 4096 Dec  2 16:40 _delta_log

当您打开_delta_log目录时,您会看到我们的第一个事务日志条目:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/rateCard/_delta_log
total 15
drwxrwxrwx 2 root root 4096 Dec  2 19:02 .
drwxrwxrwx 2 root root 4096 Dec  2 19:02 ..
-rwxrwxrwx 1 root root 1886 Dec  2 19:02 00000000000000000000.crc
-rwxrwxrwx 1 root root  939 Dec  2 19:02 00000000000000000000.json

在第2章关于事务日志的讨论中,您已经了解了可以写入事务日志条目的不同操作。其中一个操作是元数据操作,它描述了表的模式、分区列(如果适用)和其他信息。这个元数据操作总是写入为我们的新表创建的第一个事务日志条目中。

要找到这个元数据操作,您可以在事务条目中搜索字符串"metadata":

%sh
grep metadata /dbfs/mnt/datalake/book/chapter03/rateCard
  /_delta_log/00000.json > /tmp/metadata.json
python -m json.tool /tmp/metadata.json

这会产生以下输出:

{
    "metaData": {
        "id": "f79c4c11-a807-49bc-93f4-2bbe778e2a04",
        "format": {
            "provider": "parquet",
            "options": {}
        },
        "schemaString": "{\"type\":\"struct\",
        \"fields\":[{\"name\":\"rateCodeId\",
        \"type\":\"integer\",\"nullable\":true,
        \"metadata\":{}},{\"name\":\"rateCodeDesc\",
        \"type\":\"string\",\"nullable\":true,
        \"metadata\":{}}]}",
        "partitionColumns": [],
        "configuration": {},
        "createdTime": 1670007736533
    }
}

在这里,您可以看到Delta Lake已将表的模式写入事务日志条目,以及一些审计和分区信息。

DESCRIBE语句

SQL的DESCRIBE命令可用于返回Parquet文件或Delta表的基本元数据信息。返回表的元数据包括每个列的以下信息:

  • 列名
  • 列数据类型
  • 应用于列的任何注释

以下是一个表级别的DESCRIBE命令示例:

%sql
DESCRIBE TABLE taxidb.rateCard;

+--------------+-----------+---------+
| col_name     | data_type | comment |
+--------------+-----------+---------+
| rateCodeId   | int       | <null>  |
| rateCodeDesc | string    | <null>  |
+--------------+-----------+---------+

当您想要查找Delta Lake特定的属性时,还可以使用DESCRIBE TABLE EXTENDED命令,它提供更详细的元数据信息,包括以下通用属性:

  • 创建表的数据库的目录名称(在这种情况下是Hive元数据存储)
  • Hive数据库
  • 表名
  • 底层文件的位置
  • 表的所有者
  • 表属性

还包括以下Delta Lake特定的属性:

  • delta.minReaderVersion 读取该Delta表的读取器所需的最低协议读取器版本。
  • delta.minWriterVersion 向该Delta表写入数据的写入器所需的最低协议写入器版本。有关所有可用表属性的完整列表,请参考Delta Lake文档。

以下是DESCRIBE TABLE EXTENDED命令的示例:

%sql
DESCRIBE TABLE EXTENDED taxidb.rateCard;

这会生成以下输出:

+------------------------------+------------------------------+---------+
| col_name                     | data_type                    | comment |
+------------------------------+------------------------------+---------+
| rateCodeId                   | int                          | <null>  |
| rateCodeDesc                 | string                       | <null>  |
|                              |                              |         |
| # Detailed Table Information |                              |         |
| Catalog                      | hive_metastore               |         |
| Database                     | taxidb                       |         |
| Table                        | ratecard                     |         |
| Type                         | EXTERNAL                     |         |
| Location                     | dbfs:/.../chapter03/rateCard |         |
| Provider                     | delta                        |         |
| Owner                        | root                         |         |
| Table Properties             | [delta.minReaderVersion=1,   |         |
|                              |  delta.minWriterVersion=2]   |         |
+------------------------------+------------------------------+---------+

到目前为止,我们已经介绍了如何使用SQL DDL创建Delta表。在接下来的部分,我们将切换回Python,并看看如何使用熟悉的PySpark DataFrames来创建新的Delta表。

使用DataFrameWriter API创建Delta表

Spark DataFrames类似于关系数据库表或带有标题的Excel电子表格。数据以不同数据类型的行和列的形式存在。用于读取、写入和操作DataFrame的函数集合被统称为Spark DataFrameWriter API。

创建托管表

DataFrameWriter API的一个好处是你可以同时创建表并将来自Spark DataFrame的数据插入其中,如下面的代码片段所示:

INPUT_PATH = '/databricks-datasets/nyctaxi/taxizone/taxi_rate_code.csv'
DELTALAKE_PATH = \
  'dbfs:/mnt/datalake/book/chapter03/createDeltaTableWithDataFrameWriter'

# Read the DataFrame from the input path
df_rate_codes = spark                                              \
                .read                                              \
                .format("csv")                                     \
                .option("inferSchema", True)                       \
                .option("header", True)                            \
                .load(INPUT_PATH)

# Save our DataFrame as a managed Hive table
df_rate_codes.write.format("delta").saveAsTable('taxidb.rateCard')

在这里,我们首先从taxi_rate_code.csv文件中填充DataFrame,然后通过指定.format("delta")选项将DataFrame保存为Delta表。表的模式将是我们DataFrame的模式。请注意,这将是一个托管表,因为我们没有为数据文件指定位置。您可以通过运行SQL的DESCRIBE TABLE EXTENDED命令来验证这一点:

%sql
DESCRIBE TABLE EXTENDED taxidb.rateCard;
+------------------------------+----------------------------------------------+
| col_name                     | data_type                                    |
+------------------------------+----------------------------------------------+
| RateCodeID                   | int                                          |
| RateCodeDesc                 | string                                       |
|                              |                                              |
| # Detailed Table Information |                                              |
| Catalog                      | hive_metastore                               |
| Database                     | taxidb                                       |
| Table                        | ratecard                                     |
| Type                         | MANAGED                                      |
| Location                     | dbfs:/user/hive/warehouse/taxidb.db/ratecard |
| Provider                     | delta                                        |
| Owner                        | root                                         |
| Is_managed_location          | true                                         |
| Table Properties             | [delta.minReaderVersion=1,                   |
|                              |  delta.minWriterVersion=2]                   |
+------------------------------+----------------------------------------------+

我们可以看到表的数据存储在/user/hive/warehouse位置,并且表的类型设置为MANAGED。

如果在表上运行SELECT命令,您可以看到数据确实成功从CSV文件加载:

%sql
SELECT * FROM taxidb.rateCard
+------------+-----------------------+
| RateCodeID | RateCodeDesc          |
+------------+-----------------------+
| 1          | Standard Rate         |
| 2          | JFK                   |
| 3          | Newark                |
| 4          | Nassau or Westchester |
| 5          | Negotiated fare       |
| 6          | Group ride            |
+------------+-----------------------+

创建一个非托管表

您可以通过同时指定Delta表的路径和名称来创建一个非托管表。在以下代码中,我们按顺序执行了这两个步骤。首先,删除现有的表:

%sql
-- Drop the existing table
DROP TABLE IF EXISTS taxidb.rateCard;

接下来,写入并创建表:

# Next, create our Delta table, specifying both
# the path and the Delta table N=name
df_rate_codes                           \
        .write                          \
        .format("delta")                \
        .mode("overwrite")              \
        .option('path', DELTALAKE_PATH) \
        .saveAsTable('taxidb.rateCard')

再次通过执行简单的SELECT语句,我们可以验证DataFrame的数据已被加载:

%sql
SELECT * FROM taxidb.rateCard

+------------+-----------------------+
| RateCodeID | RateCodeDesc          |
+------------+-----------------------+
| 1          | Standard Rate         |
| 2          | JFK                   |
| 3          | Newark                |
| 4          | Nassau or Westchester |
| 5          | Negotiated fare       |
| 6          | Group ride            |
+------------+-----------------------+

使用DeltaTableBuilder API创建Delta表

创建Delta表的最后一种方法是使用DeltaTableBuilder API。由于它专门针对Delta表设计,与传统的DataFrameWriter API相比,它提供了更高程度的细粒度控制。用户可以更轻松地指定附加信息,如列注释、表属性和生成的列。

生成器设计模式在软件语言中非常流行。生成器模式旨在“将复杂对象的构建与其表示分离,以便相同的构建过程可以创建不同的表示”。它用于逐步构建复杂对象,最后一步将返回该对象。

在这种情况下,我们正在构建的复杂对象是一个Delta表。Delta表支持许多选项,设计一个具有许多参数的标准API是有挑战的。因此,DeltaTableBuilder具有许多小方法,比如addColumn(),它们都返回对生成器对象的引用。这样,我们可以继续添加其他对addColumn()或生成器的方法的调用。我们调用的最后一个方法是execute(),它收集接收到的所有属性,创建Delta表,并将对表的引用返回给调用者。要使用DeltaTableBuilder,我们需要进行以下导入:

from delta.tables import *

这个示例创建了一个托管表:

# In this Create Table, you do NOT specify a location, so you are
# creating a MANAGED table
DeltaTable.createIfNotExists(spark)                              \
    .tableName("taxidb.greenTaxis")                              \
    .addColumn("RideId", "INT", comment = "Primary Key")         \
    .addColumn("VendorId", "INT", comment = "Ride Vendor")       \
    .addColumn("EventType", "STRING")                            \
    .addColumn("PickupTime", "TIMESTAMP")                        \
    .addColumn("PickupLocationId", "INT")                        \
    .addColumn("CabLicense", "STRING")                           \
    .addColumn("DriversLicense", "STRING")                       \
    .addColumn("PassengerCount", "INT")                          \
    .addColumn("DropTime", "TIMESTAMP")                          \
    .addColumn("DropLocationId", "INT")                          \
    .addColumn("RateCodeId", "INT", comment = "Ref to RateCard") \
    .addColumn("PaymentType", "INT")                             \
    .addColumn("TripDistance", "DOUBLE")                         \
    .addColumn("TotalAmount", "DOUBLE")                          \
    .execute()

由于每个方法都返回对生成器对象的引用,我们可以继续调用 .addColumn() 来添加每个列。最后,我们调用 .execute() 来创建Delta表。

生成的列

Delta Lake支持生成列,这是一种特殊类型的列,其值根据用户指定的函数在Delta表中的其他列上自动生成。当写入具有生成列的Delta表并且未明确为它们提供值时,Delta Lake会自动计算这些值。

让我们创建一个示例。为了保持与出租车主题的一致性,我们将创建一个黄色出租车表的简化版本:

%sql
CREATE TABLE taxidb.YellowTaxis
(
    RideId               INT        COMMENT 'This is our primary Key column',
    VendorId             INT,
    PickupTime           TIMESTAMP,
    PickupYear           INT        GENERATED ALWAYS AS(YEAR  (PickupTime)),
    PickupMonth          INT        GENERATED ALWAYS AS(MONTH (PickupTime)),
    PickupDay            INT        GENERATED ALWAYS AS(DAY   (PickupTime)),
    DropTime             TIMESTAMP,
    CabNumber            STRING     COMMENT 'Official Yellow Cab Number'
) USING DELTA
LOCATION "/mnt/datalake/book/chapter03/YellowTaxis.delta"
COMMENT 'Table to store Yellow Taxi data'

我们看到了使用GENERATED ALWAYS AS的列,它从PickupTime列中提取了YEAR、MONTH和DAY。当我们插入记录时,这些列的值将自动填充:

%sql
INSERT INTO taxidb.YellowTaxis
    (RideId, VendorId, PickupTime, DropTime, CabNumber)
VALUES
    (5, 101, '2021-7-1T8:43:28UTC+3', '2021-7-1T8:43:28UTC+3', '51-986')

当我们选择该记录时,我们可以看到生成的列已自动填充:

%sql
SELECT PickupTime, PickupYear, PickupMonth, PickupDay FROM taxidb.YellowTaxis

+---------------------------+------------+-------------+-----------+
| pickupTime                | pickupYear | pickupMonth | pickupDay |
+---------------------------+------------+-------------+-----------+
| 2021-07-01 05:43:28+00:00 | 2021       | 7           | 1         |
+---------------------------+------------+-------------+-----------+

当我们明确为生成列提供一个值时,该值必须满足约束条件(<value> ⇔ generation expression)IS TRUE,否则插入操作将失败并报错。

在GENERATED ALWAYS AS中使用的表达式可以是任何Spark SQL函数,只要在给定相同的参数值时始终返回相同的结果,有一些例外情况我们将很快提到。您可以考虑使用一个生成列来生成一个类似这样的唯一ID列:

%sql
CREATE OR REPLACE TABLE default.dummy
(
    ID   STRING GENERATED ALWAYS AS (UUID()),
    Name STRING
) USING DELTA

然而,当您尝试运行此操作时,您会收到以下错误消息:

Found uuid(). A generated column cannot use a non deterministic expression.

UUID()函数在每次调用时都会返回不同的值,这违反了前面的规则。以下是此规则的一些例外情况,适用于以下类型的函数:

  • 用户自定义函数
  • 聚合函数
  • 窗口函数
  • 返回多行的函数

使用列出的函数创建的GENERATED ALWAYS AS列是有效的,并可以在多种场景中非常有用,例如计算给定记录样本的标准差。

读取Delta表

在读取表时,我们有几种选项:使用DataFrameReader的SQL和PySpark。当我们在Databricks Community Edition中使用笔记本时,通常在笔记本内部同时使用SQL和PySpark单元格。有些操作,比如快速的SELECT,用SQL更简单和更快,而复杂的操作有时更容易在PySpark和DataFrameReader中表达。当然,这也取决于工程师的经验和偏好。我们建议采用一种务实的方法,根据您当前正在解决的问题,合理混合使用这两种方法。

使用SQL读取Delta表

要读取Delta表,我们只需打开一个SQL单元格并编写SQL查询。如果按照GitHub README文件中的说明设置了环境,我们将在/mnt/datalake/book/chapter03/YellowTaxisDelta文件夹中拥有一个Delta表:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDelta
total 236955
drwxrwxrwx 2 root root      4096 Dec  4 18:04 .
drwxrwxrwx 2 root root      4096 Dec  2 19:02 ..
drwxrwxrwx 2 root root      4096 Dec  4 16:41 _delta_log
-rwxrwxrwx 1 root root 134759123 Dec  4 18:04 part-00000-...-c000.snappy.parquet
-rwxrwxrwx 1 root root 107869302 Dec  4 18:04 part-00001-...-c000.snappy.parquet

我们可以快速将Delta表位置注册到元数据存储中,如下所示:

%sql
CREATE TABLE taxidb.YellowTaxis
USING DELTA
LOCATION "/mnt/datalake/book/chapter03/YellowTaxisDelta/"

创建表之后,我们可以快速统计记录的数量:

%sql
SELECT
    COUNT(*)
FROM
    taxidb.yellowtaxis

这会给我们以下的计数结果:

+----------+
| count(1) |
+----------+
| 9999995  |
+----------+

我们可以看到有近1000万行可供使用。我们可以使用另一种DESCRIBE命令变体来获取表的详细信息:

%sql
DESCRIBE TABLE FORMATTED taxidb.YellowTaxis;

DESCRIBE TABLE FORMATTED命令格式化输出,使其更易阅读:

+------------------------------+--------------------------------------+
| col_name                     | data_type                            |
+------------------------------+--------------------------------------+
| RideId                       | int                                  |
| VendorId                     | int                                  |
| PickupTime                   | timestamp                            |
| DropTime                     | timestamp                            |
| PickupLocationId             | int                                  |
| DropLocationId               | int                                  |
| CabNumber                    | string                               |
| DriverLicenseNumber          | string                               |
| PassengerCount               | int                                  |
| TripDistance                 | double                               |
| RatecodeId                   | int                                  |
| PaymentType                  | int                                  |
| TotalAmount                  | double                               |
| FareAmount                   | double                               |
| Extra                        | double                               |
| MtaTax                       | double                               |
| TipAmount                    | double                               |
| TollsAmount                  | double                               |
| ImprovementSurcharge         | double                               |
|                              |                                      |
| # Detailed Table Information |                                      |
| Catalog                      | hive_metastore                       |
| Database                     | taxidb                               |
| Table                        | YellowTaxis                          |
| Type                         | EXTERNAL                             |
| Location                     | dbfs:/.../chapter03/YellowTaxisDelta |
| Provider                     | delta                                |
| Owner                        | root                                 |
| Table Properties             | [delta.minReaderVersion=1,           |
|                              |  delta.minWriterVersion=2]           |
+------------------------------+--------------------------------------+

由于Spark SQL支持大多数ANSI SQL子集,我们可以使用任何类型的复杂查询。以下是一个示例,返回FareAmount超过50美元的CabNumbers:

%sql
SELECT
    CabNumber,
    AVG(FareAmount) AS AverageFare
FROM
    taxidb.yellowtaxis
GROUP BY
    CabNumber
HAVING
     AVG(FareAmount) > 50
ORDER BY
    2 DESC
LIMIT 5

这给我们的结果是:

+-----------+-------------+
| cabnumber | AverageFare |
+-----------+-------------+
| SIR104    | 111.5       |
| T628190C  | 109.0       |
| PEACE16   | 89.7        |
| T439972C  | 89.5        |
| T802013C  | 85.0        |
+-----------+-------------+

我们还可以在Python中直接使用spark.sql,使用标准SQL作为参数。以下是一个简单的Python代码片段,执行与之前的SQL查询相同的操作:

number_of_results = 5
sql_statement = f"""
SELECT
    CabNumber,
    AVG(FareAmount) AS AverageFare
FROM
    taxidb.yellowtaxis
GROUP BY
    CabNumber
HAVING
     AVG(FareAmount) > 50
ORDER BY
    2 DESC
LIMIT {number_of_results}"""

df = spark.sql(sql_statement)
display(df)

这会生成与SQL相同的结果:

+-----------+-------------+
| cabnumber | AverageFare |
+-----------+-------------+
| SIR104    | 111.5       |
| T628190C  | 109.0       |
| PEACE16   | 89.7        |
| T439972C  | 89.5        |
| T802013C  | 85.0        |
+-----------+-------------+

我们建议使用三重引号语法,这样可以轻松跨多行定义字符串而无需使用继续行。此外,请注意我们有一个名为number_of_results的变量,然后将三重引号字符串转换为f-string,并使用{}语法将变量插入到限制中。

使用PySpark读取表

要在PySpark中读取相同的表,您可以使用DataFrameReader。例如,要实现记录的计数,我们使用以下方式:

df = spark.read.format("delta").table("taxidb.YellowTaxis")
print(f"Number of records: {df.count():,}")

输出:

Number of records: 9,999,995

请注意,我们指定了Delta格式,因为我们的表是Delta表,我们可以使用.table()方法指定我们要读取整个表。最后,我们使用了一个f-string,这次使用了“:,”格式化符号,它会在每三位数字之间使用逗号分隔符。

接下来,让我们重新创建之前在SQL中完成的按出租车编号排序的前五个平均费用的代码。以下是Python代码:

# Make sure to import the functions you want to use
from pyspark.sql.functions import col, avg, desc

# Read YellowTaxis into our DataFrame
df = spark.read.format("delta").table("taxidb.YellowTaxis")

# Perform the GROUP BY, average (AVG), HAVING and order by equivalents
# in pySpark
results = df.groupBy("CabNumber")                          \
            .agg(avg("FareAmount").alias("AverageFare"))   \
            .filter(col("AverageFare") > 50)               \
            .sort(col("AverageFare").desc())               \
            .take(5)                                      

# Print out the result, since this is a list and not a DataFrame
# you an use list comprehension to output the results in a single
# line
[print(result) for result in results]

我们将得到以下输出:

Row(CabNumber='SIR104', AverageFare=111.5)
Row(CabNumber='T628190C', AverageFare=109.0)
Row(CabNumber='PEACE16', AverageFare=89.7)
Row(CabNumber='T439972C', AverageFare=89.5)
Row(CabNumber='T802013C', AverageFare=85.0)

我们可以简单地使用 groupBy() 函数按列分组:

要计算平均值,首先我们要使用.agg()方法。在方法内部,我们可以指定要计算的聚合函数,本例中是.avg()(平均值)。在Python中,等效于HAVING条件的是.filter()方法,其中可以使用过滤表达式来指定筛选条件。最后,我们使用.sort()方法对数据进行排序,然后使用.take()来提取前五个结果。需要注意的是,.take()函数将返回一个Python列表。由于我们有一个列表,可以使用列表推导式来输出列表中的每个结果。

写入到 Delta 表

有多种方法可以写入Delta表。您可能希望重新创建整个表,或者只是想追加数据到表中。更高级的主题,如更新和合并,将在第四章中讨论。 首先,我们将清空YellowTaxis表,以便重新开始,然后我们将使用传统的SQL INSERT语句插入数据。接下来,我们将追加来自较小CSV文件的数据。我们还将简要介绍在写入Delta表时的覆盖模式,最后,我们将使用SQL的COPY INTO功能来合并一个大型的CSV文件。

清空YellowTaxis表

我们可以使用CREATE TABLE语句重新创建我们的Delta表:

%sql
CREATE TABLE taxidb.YellowTaxis
(
    RideId                  INT,
    VendorId                INT,
    PickupTime              TIMESTAMP,
    DropTime                TIMESTAMP,
    PickupLocationId        INT,
    DropLocationId          INT,
    CabNumber               STRING,
    DriverLicenseNumber     STRING,
    PassengerCount          INT,
    TripDistance            DOUBLE,
    RatecodeId              INT,
    PaymentType             INT,
    TotalAmount             DOUBLE,
    FareAmount              DOUBLE,
    Extra                   DOUBLE,
    MtaTax                  DOUBLE,
    TipAmount               DOUBLE,
    TollsAmount             DOUBLE,        
    ImprovementSurcharge    DOUBLE
   
) USING DELTA        
LOCATION "/mnt/datalake/book/chapter03/YellowTaxisDelta"

表已准备好,我们可以开始插入数据。

使用SQL INSERT插入数据

要向YellowTaxis Delta表插入记录,我们可以使用SQL INSERT命令:

%sql
INSERT INTO taxidb.yellowtaxis
(RideId, VendorId, PickupTime, DropTime,
 PickupLocationId, DropLocationId, CabNumber,
 DriverLicenseNumber, PassengerCount, TripDistance,
 RatecodeId, PaymentType, TotalAmount,
 FareAmount, Extra, MtaTax, TipAmount,
 TollsAmount, ImprovementSurcharge)

 VALUES(9999995, 1, '2019-11-01T00:00:00.000Z',
        '2019-11-01T00:02:23.573Z', 65, 71, 'TAC304',
        '453987', 2, 4.5, 1, 1, 20.34, 15.0, 0.5,
        0.4, 2.0, 2.0, 1.1)

这将插入一行:

+---------------------+---------------------+
| num_affected_rows   | num_inserted_rows |
+---------------------+---------------------+
|          1          |          1          |
+---------------------+---------------------+

使用带有插入的RideId的SQL SELECT语句和WHERE子句来验证数据是否已正确加载:

%sql
SELECT count(RideId) AS count FROM taxidb.YellowTaxis
WHERE RideId = 9999995

输出:

+-------+
| count |
+-------+
|   1   |
+-------+

输出显示所有数据已正确加载。

将DataFrame追加到表中

现在让我们将一个DataFrame追加到我们的表中。在这种情况下,我们将从一个CSV文件加载DataFrame。为了正确加载数据,我们不希望自动推断模式。相反,我们将使用我们知道是正确的YellowTaxis表的模式。 我们可以通过从表中加载一个DataFrame轻松提取模式:

df = spark.read.format("delta").table("taxidb.YellowTaxis")
yellowTaxiSchema = df.schema
print(yellowTaxiSchema)

这显示了表的模式如下:

root
 |-- RideId: integer (nullable = true)
 |-- VendorId: integer (nullable = true)
 |-- PickupTime: timestamp (nullable = true)
 |-- DropTime: timestamp (nullable = true)
 |-- PickupLocationId: integer (nullable = true)
 |-- DropLocationId: integer (nullable = true)
 |-- CabNumber: string (nullable = true)
 |-- DriverLicenseNumber: string (nullable = true)
 |-- PassengerCount: integer (nullable = true)
 |-- TripDistance: double (nullable = true)
 |-- RatecodeId: integer (nullable = true)
 |-- PaymentType: integer (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- FareAmount: double (nullable = true)
 |-- Extra: double (nullable = true)
 |-- MtaTax: double (nullable = true)
 |-- TipAmount: double (nullable = true)
 |-- TollsAmount: double (nullable = true)
 |-- ImprovementSurcharge: double (nullable = true)

现在我们有了模式,我们可以从追加的CSV文件中加载一个新的DataFrame(df_for_append):

df_for_append = spark.read                                       \
     .option("header", "true")                                   \
     .schema(yellowTaxiSchema)                                   \
     .csv("/mnt/datalake/book/data files/YellowTaxis_append.csv")
display(df_for_append)

我们看到以下输出(显示部分输出):

+---------+-----------+---------------------+---------------------+
| RideId  | VendorId  | PickupTime          | DropTime            |
+---------+-----------+---------------------+---------------------+
| 9999996 | 1         | 2019-01-01T00:00:00 | 2022-03-01T00:13:13 |
+---------+-----------+---------------------+---------------------+
| 9999997 | 1         | 2019-01-01T00:00:00 | 2022-03-01T00:09:21 |
+---------+-----------+---------------------+---------------------+
| 9999998 | 1         | 2019-01-01T00:00:00 | 2022-03-01T00:09:15 |
+---------+-----------+---------------------+---------------------+
| 9999999 | 1         | 2019-01-01T00:00:00 | 2022-03-01T00:10:01 |             
+---------+-----------+---------------------+---------------------+

我们现在有了四行额外的数据,它们都具有VendorId为1。现在我们可以将这个CSV文件追加到Delta表中:

df_for_append.write              \
     .mode("append")             \
     .format("delta")            \
     .save("/mnt/datalake/book/chapter03/YellowTaxisDelta")

这将数据直接追加到Delta表中。由于我们在执行INSERT语句之前表中已有一行数据,并且插入了额外的四行数据,所以我们知道YellowTaxis表现在应该有五行数据:

%sql
SELECT
    COUNT(*)
FROM
    taxidb.YellowTaxis
+----------+
| count(1) |
+----------+
| 5        |
+----------+

现在我们有五行数据。

在写入Delta表时使用覆盖模式

在前面的示例中,我们在使用DataFrameWriter API写入Delta表时使用了.mode("append")。Delta Lake还支持在写入Delta表时使用覆盖模式。当使用此模式时,您将以原子方式替换表中的所有数据。 如果在前一个代码块中使用.mode("overwrite"),我们将用df_for_append DataFrame覆盖整个YellowTaxis Delta表。 即使在代码中使用.mode("overwrite"),旧的分区文件也不会立即物理删除。为了支持时间旅行等功能,这些文件不能立即删除。当确保不再需要这些文件时,我们可以使用像VACUUM这样的命令来物理删除它们。时间旅行和VACUUM命令将在第6章中详细介绍。

使用SQL的COPY INTO命令插入数据

我们可以使用SQL的COPY INTO命令将数据追加到我们的表中。当我们需要快速追加大量数据时,这个命令特别有用。 我们可以使用以下命令将来自CSV文件的数据追加到表中:

%sql
COPY INTO taxidb.yellowtaxis
FROM (
    SELECT     RideId::Int
             , VendorId::Int
             , PickupTime::Timestamp
             , DropTime::Timestamp
             , PickupLocationId::Int
             , DropLocationId::Int
             , CabNumber::String
             , DriverLicenseNumber::String
             , PassengerCount::Int
             , TripDistance::Double
             , RateCodeId::Int
             , PaymentType::Int
             , TotalAmount::Double
             , FareAmount::Double
             , Extra::Double
             , MtaTax::Double
             , TipAmount::Double
             , TollsAmount::Double
             , ImprovementSurcharge::Double
             
        FROM '/mnt/datalake/book/DataFiles/YellowTaxisLargeAppend.csv'
)
FILEFORMAT = CSV
FORMAT_OPTIONS ("header" = "true")

CSV文件中的所有字段都是字符串,因此在加载数据时,我们需要在SQL SELECT语句中提供某种类型的模式。这会提供每列的类型,以确保我们加载了正确的模式。请注意,在这种情况下指定了FILEFORMAT,即CSV。最后,由于我们的文件有标题行,我们需要在FORMAT_OPTIONS中指定标题行。 该语句的输出为:

+---------------------+---------------------+
|  num_affected_rows  |  num_inserted_rows  |
+---------------------+---------------------+
|      9999995        |      9999995        |
+---------------------+---------------------+

您可以看到,我们仅用几秒钟就插入了近1000万行数据。COPY INTO命令还会跟踪并不会重新加载先前加载过的文件。我们可以通过再次运行COPY INTO命令来测试这一点:

+---------------------+---------------------+
|  num_affected_rows  |  num_inserted_rows  |
+---------------------+---------------------+
|         0           |          0          |
+---------------------+---------------------+

正如您所见,没有加载额外的行。最后,当我们检查最终的行数时,我们会看到现在有一百万行:

%sql
SELECT
    COUNT(*)
FROM    
    taxidb.YellowTaxis

输出:

+-----------+
|  count(1) |
+-----------+
| 10000000  |
+-----------+

分区

Delta表通常采用标准查询模式进行访问。例如,来自物联网系统的数据通常按天、小时甚至分钟进行访问。查询黄色出租车数据的分析师可能希望按VendorId等方式访问数据。

这些用例非常适合分区。将数据分区以与查询模式对齐可以显著加快查询性能,特别是当与其他性能优化(例如Z-ordering)结合使用时。Delta表的分区由包含具有相同一个或多个列的值的数据行子集的文件夹组成。

例如,对于黄色出租车数据,分区列可以是VendorId。在对表进行分区后,将为每个VendorId创建单独的文件夹。文件夹名称的最后部分将具有VendorId=XX:

drwxrwxrwx 2 root root 4096 Dec 13 15:16 VendorId=1
drwxrwxrwx 2 root root 4096 Dec 13 15:16 VendorId=2
drwxrwxrwx 2 root root 4096 Dec 13 15:16 VendorId=4

一旦对表进行了分区,所有包含分区列的谓词的查询将运行得更快,因为Spark可以立即选择具有正确分区的文件夹。您可以在创建Delta表时通过指定PARTITIONED BY子句来进行数据分区。

按单一列进行分区

让我们拿YellowTaxis表来创建一个按VendorId分区的新版本。首先,创建分区表:

%sql
CREATE TABLE taxidb.YellowTaxisPartitioned
(
    RideId                  INT,
    VendorId                INT,
    PickupTime              TIMESTAMP,
    DropTime                TIMESTAMP,
    PickupLocationId        INT,
    DropLocationId          INT,
    CabNumber               STRING,
    DriverLicenseNumber     STRING,
    PassengerCount          INT,
    TripDistance            DOUBLE,
    RatecodeId              INT,
    PaymentType             INT,
    TotalAmount             DOUBLE,
    FareAmount              DOUBLE,
    Extra                   DOUBLE,
    MtaTax                  DOUBLE,
    TipAmount               DOUBLE,
    TollsAmount             DOUBLE,        
    ImprovementSurcharge    DOUBLE
   
) USING DELTA        
PARTITIONED BY(VendorId)
LOCATION "/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned"

注意PARTITIONED BY(VendorId)子句。现在您已经有了表,接下来将从旧的YellowTaxis表中加载数据,并将数据写入新表。首先,使用DataFrameReader读取数据:

input_df = spark.read.format("delta").table("taxidb.YellowTaxis")

接下来,使用DataFrameWriter将数据写入分区的Delta表:

input_df                                                               \
    .write                                                             \
    .format("delta")                                                   \
    .mode("overwrite")                                                 \
    .save("/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned")

现在,当我们查看表的目录时,我们将看到每个VendorID的子目录:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned
drwxrwxrwx 2 root root 4096 Dec  5 17:39 .
drwxrwxrwx 2 root root 4096 Dec  2 19:02 ..
drwxrwxrwx 2 root root 4096 Dec  5 16:44 VendorId=1
drwxrwxrwx 2 root root 4096 Dec  5 16:44 VendorId=2
drwxrwxrwx 2 root root 4096 Dec  5 16:44 VendorId=4
drwxrwxrwx 2 root root 4096 Dec  5 16:44 _delta_log

当我们查看不同的VendorId时,我们确实只看到了这三个ID:

%sql
SELECT
    DISTINCT(VendorId)
FROM
    taxidb.YellowTaxisPartitioned;

我们会看到相同的ID:

+----------+
| VendorId |
+----------+
| 2        |
| 1        |
| 4        |
+----------+

VendorId子目录包含个别的Parquet文件,如图所示,VendorId=4的情况:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned/VendorId=4
total 3378
drwxrwxrwx 2 root root   4096 Dec  5 17:41 .
drwxrwxrwx 2 root root   4096 Dec  5 17:39 ..
-rwxrwxrwx 1 root root 627551 Dec  5 17:41 part-00000-...parquet
-rwxrwxrwx 1 root root 618844 Dec  5 17:41 part-00001-...parquet
-rwxrwxrwx 1 root root 616377 Dec  5 17:41 part-00002-...parquet
-rwxrwxrwx 1 root root 614035 Dec  5 17:41 part-00003-...parquet
-rwxrwxrwx 1 root root 612410 Dec  5 17:41 part-00004-...parquet
-rwxrwxrwx 1 root root 360432 Dec  5 17:41 part-00005-...parquet

多列分区

不必仅通过一个列进行分区。我们可以使用多个层次的列作为分区列。例如,对于物联网数据,我们可能希望按天、小时和分钟进行分区,因为这是最常用的查询模式。

举个例子,假设我们不仅希望将YellowTaxis表按VendorId分区,还希望按RateCodeId分区。首先,我们需要删除现有的YellowTaxisPartitioned表及其底层文件。接下来,我们可以重新创建该表:

%sql
-- Create the table
CREATE TABLE taxidb.YellowTaxisPartitioned
(
    RideId                  INT,
    …
) USING DELTA        
PARTITIONED BY(VendorId, RatecodeId) -- Partition by VendorId AND rateCodeId
LOCATION "/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned"

注意更新的分区子句:PARTITIONED BY(VendorId, RatecodeId)。 完成这一步之后,我们可以以与之前相同的方式重新加载表格。一旦表格加载完成,我们可以再次查看目录结构。第一层看起来仍然相同:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned
drwxrwxrwx 2 root root 4096 Dec 13 15:33 .
drwxrwxrwx 2 root root 4096 Dec  2 19:02 ..
drwxrwxrwx 2 root root 4096 Dec 13 15:16 VendorId=1
drwxrwxrwx 2 root root 4096 Dec 13 15:16 VendorId=2
drwxrwxrwx 2 root root 4096 Dec 13 15:16 VendorId=4
drwxrwxrwx 2 root root 4096 Dec 13 15:16 _delta_log

当我们查看VendorId=1的目录时,我们可以看到按RatecodeId进行的分区:

%sh
ls -al /dbfs/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned/VendorId=1
drwxrwxrwx 2 root root 4096 Dec 13 15:35 .
drwxrwxrwx 2 root root 4096 Dec 13 15:33 ..
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=1
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=2
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=3
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=4
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=5
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=6
drwxrwxrwx 2 root root 4096 Dec 13 15:16 RatecodeId=99

最后,当我们在RatecodeId级别进行查询时:

%sh
ls -al /dbfs/.../chapter03/YellowTaxisDeltaPartitioned/VendorId=1/RatecodeId=1

我们可以看到该分区的Parquet文件:

drwxrwxrwx 2 root root     4096 Dec 13 15:35 .
drwxrwxrwx 2 root root     4096 Dec 13 15:35 ..
-rwxrwxrwx 1 root root 10621353 Dec 13 15:35 part-00000-...parquet
-rwxrwxrwx 1 root root 10547673 Dec 13 15:35 part-00001-...parquet
-rwxrwxrwx 1 root root 10566377 Dec 13 15:35 part-00002-...parquet
-rwxrwxrwx 1 root root 10597523 Dec 13 15:35 part-00003-...parquet
-rwxrwxrwx 1 root root 10570937 Dec 13 15:35 part-00004-...parquet
-rwxrwxrwx 1 root root  6119491 Dec 13 15:35 part-00005-...parquet
-rwxrwxrwx 1 root root 13820133 Dec 13 15:35 part-00007-...parquet
-rwxrwxrwx 1 root root 24076060 Dec 13 15:35 part-00008-...parquet
-rwxrwxrwx 1 root root  6772609 Dec 13 15:35 part-00009-...parquet

检查分区是否存在

要确定表是否包含特定分区,您可以使用以下语句:

SELECT COUNT(*) > 0 FROM <table-name> WHERE <partition-column> = <value>

如果分区存在,将返回true。以下SQL语句检查VendorId = 1和RatecodeId = 99的分区是否存在:

%sql
SELECT
    COUNT(*) > 0 AS `Partition exists`
FROM
    taxidb.YellowTaxisPartitioned
WHERE
    VendorId = 2 AND RateCodeId = 99

这将返回true,因为正如之前所示,这个分区是存在的。

有选择性地使用replaceWhere更新Delta分区

在前一部分,我们看到了通过分区数据可以显著加快查询操作的方法。我们还可以使用replaceWhere选项有选择性地更新一个或多个分区。有选择地将更新应用于某些分区并不总是可能的;有些更新需要应用于整个数据湖。但是,在适用的情况下,这些有选择性的更新可以带来显著的性能提升。Delta Lake可以以卓越的性能更新分区,同时保证数据完整性。

要查看replaceWhere的实际应用,让我们来看一个特定的分区:

%sql
SELECT  
    RideId, VendorId, PaymentType
FROM    
    taxidb.yellowtaxispartitioned
WHERE  
    VendorID = 1 AND RatecodeId = 99 LIMIT 5

在结果中,我们看到了各种支付类型的混合。

+---------+----------+-------------+
| RideId  | VendorId | PaymentType |
+---------+----------+-------------+
| 1137733 | 1        | 1           |
| 1144423 | 1        | 2           |
| 1214030 | 1        | 1           |
| 1223028 | 1        | 1           |
| 1300054 | 1        | 2           |
+---------+----------+-------------+

假设我们有一个业务原因,规定了VendorId = 1和RatecodeId = 9的所有PaymentTypes应为3。我们可以使用以下PySpark表达式和replaceWhere来实现这个结果:

from pyspark.sql.functions import *
spark.read                                                              \
    .format("delta")                                                    \
    .load("/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned")   \
    .where((col("VendorId") == 1) & (col("RatecodeId") == 99))          \
    .withColumn("PaymentType", lit(3))                                  \
    .write                                                              \
    .format("delta")                                                    \
    .option("replaceWhere", "VendorId = 1 AND RateCodeId = 99")         \
    .mode("overwrite")                                                  \
    .save("/mnt/datalake/book/chapter03/YellowTaxisDeltaPartitioned")

现在当我们查找这个分区的不同PaymentTypes时:

%sql
SELECT  
    DISTINCT(PaymentType)
FROM    
    taxidb.yellowtaxispartitioned
WHERE 
    VendorID = 1 AND RatecodeId = 99

我们可以看到,我们只有PaymentType = 3。

+-------------+
| PaymentType |
+-------------+
| 3           |
+-------------+

我们可以验证其他分区未受影响:

%sql
SELECT  
    DISTINCT(PaymentType)
FROM    
    taxidb.yellowtaxispartitioned
ORDER BY
    PaymentType

这显示了所有的PaymentTypes:

+-------------+
| PaymentType |
+-------------+
| 1           |
| 2           |
| 3           |
| 4           |
+-------------+

当您需要运行一个计算成本较高的操作,但只需要在特定分区上运行时,replaceWhere可以特别有用。

在黄色出租车的情景中,假设数据科学团队要求您在YellowTaxis表上运行他们的算法。最初,您可以在最小的分区上运行它,快速检索结果,得到批准后,可以在整个剩余的分区上进行算法运行,通常在夜间执行。

用户定义的元数据

出于审计或法规遵从的目的,我们可能希望为某些SQL操作添加标签。例如,我们的项目可能要求您在向某些表插入数据时使用通用数据保护法规(GDPR)标签。一旦我们用这个标签标记了INSERT操作,审计工具将能够生成包含这个特定标签的语句的完整列表。

我们可以在由SQL操作生成的元数据提交中指定这些标签作为用户自定义的字符串。我们可以使用DataFrameWriter的选项userMetadata,或者SparkSession配置spark.databricks.delta.commitInfo.userMetadata来实现这一点。如果两个选项都被指定,DataFrameWriter的选项将优先。

使用SparkSession设置自定义元数据

首先让我们看一下SparkSession配置。假设我们有一个INSERT操作,我们想要为审计目的分配一个GDPR标签。以下是一个SQL示例:

%sql
SET spark.databricks.delta.commitInfo.userMetadata=my-custom-metadata= 
  { "GDPR": "INSERT Request 1x965383" };

这个标签将应用于接下来的操作,这是一个标准的INSERT操作:

INSERT INTO taxidb.yellowtaxisPartitioned
(RideId, VendorId, PickupTime, DropTime,
 PickupLocationId, DropLocationId, CabNumber,
 DriverLicenseNumber, PassengerCount, TripDistance,
 RatecodeId, PaymentType, TotalAmount,
 FareAmount, Extra, MtaTax, TipAmount,
 TollsAmount, ImprovementSurcharge)
 VALUES(10000000, 3, '2019-11-01T00:00:00.000Z',
        '2019-11-01T00:02:23.573Z', 65, 71, 'TAC304',
        '453987', 2, 4.5, 1, 1, 20.34, 15.0, 0.5,
        0.4, 2.0, 2.0, 1.1)


阅读量:1341

点赞量:0

收藏量:0