正如第三章所解释的,数据可观察性结合了技术和人员的作用,从数据角度收集系统状态的信息以及对该状态的期望。然后,它利用这些信息来使系统更具适应性或更加弹性。
本章将解释如何应用数据可观察性实践。我将从“数据源的数据可观察性”开始,这是一种引入收集策略到日常数据工作的方法,并向您展示如何最大程度地减少对效率的影响。然后,本章将详细说明如何实现订阅软件交付生命周期的期望,例如持续集成和持续部署(CI/CD)。
与任何新兴的实践和技术一样,为了增加数据可观察性的采用率,您需要降低参与的门槛;这样,人们就没有太多理由反对这种变化。然而,人员也是解决方案的一部分,因为他们在该过程中的参与对于确定他们的期望并对规则进行编码至关重要。为此,您将学习减少生成观察结果所需的工作量的几种方法,并了解如何在开发生命周期的适当阶段引入它们。
第二章解释了帮助观察者的信息的来源和类型。但是,如何从这些来源生成和收集信息呢? 这始于数据源头的数据可观察性。术语“源头”指的是负责读取、转换和写入数据的应用程序。正如第3章所解释的,这些应用程序可能是问题的根本原因,也可能是解决问题的手段。此外,与数据本身不同,这些应用程序在工程师和组织的控制之下。
因此,数据源头的数据可观察性方法依赖于应用程序能够生成遵循第2章中介绍的模型的数据观察,换句话说,这些应用程序被制作成数据可观察的。 纯数据和分析观察,如数据源、模式、血统和指标,与读取、转换和写入活动相关。本节中描述的策略通过解释运行这些活动时要考虑什么来解决这些活动。
在源头生成数据观察始于生成额外的信息,捕捉数据的特定活动行为:读取、转换和写入。例如,开发人员可以添加包含生成有关其应用程序执行的可见性所需信息的日志行。指南使用第2章中介绍的通道,即日志、指标、跟踪或血统,来传达可以集中在日志系统中的观察结果。
在下一节中,您将学习如何创建JSON格式的数据观察,这些观察可以收集(发布)到本地文件、本地服务、远程(网络)服务或类似的目标。例如,根据第2章中的数据可观察核心模型,Postgres表的数据源和模式实体将如示例4-1所示。
{
"id": "f1813697-339f-5576-a7ce-6eff6eb63249",
"name": "gold.crm.customer",
"location": "main-pg:5432/gold/crm/table",
"format": "postgres"
}
{
"id": "f1813697-339f-5576-a7ce-6eff6eb63249",
"data_source_ref": {"by_id": "e21ce0a8-a01e-5225-8a30-5dd809c0952e"},
"fields": [
{ "name": "lastname", "type": "string", "nullable": true },
{ "name": "id", "type": "int", "nullable": false }
]
}
能够在一个集中的平台上以相同的模型收集数据观察结果,对于在规模上生成数据可观察性的价值至关重要,比如跨应用程序、团队和部门。这就是为什么使用数据可观察核心模型对于轻松汇总数据观察结果,尤其是沿着血统线的汇总,非常重要。
让我们看看如何使用Python生成数据观察结果,使用低级API,这将用于介绍更高级的抽象(在接下来的章节中介绍)。
使用低级API的策略需要大量的时间和参与,因为您需要明确地创建每个观察结果。但是,这种策略也为您提供了最大的灵活性,因为它不涉及任何高级抽象。 另一方面,在这个级别支持数据可观察性,特别是在探索和维护期间,需要开发人员保持一致,并始终考虑他们可能在生产环境中想要观察的内容(例如,任何高级开发人员都应该为日志和检查生成与业务逻辑一样多的行)。在开发过程中,开发人员必须通过生成相关的观察结果来为应用程序的逻辑或行为修改创建可见性。此类观察的示例包括与新表的连接、新文件的创建或带有新字段的结构更改。 在接下来的几节中,您将通过一个完整的示例,了解使用Python编写的数据应用程序,这些应用程序在处理数据时生成数据观察,并完成以下操作:
在本章的其余部分,我将使用一种用Python编写的数据管道,我们将使其具有数据可观测性。GitHub上的管道代码允许您运行本章中的示例。它使用pandas库来处理CSV文件,由两个应用程序组成,即入库和报告,如图4-1所示。
两个应用程序(入库和报告)都使用Python和pandas,并从“每日股票价格”数据源中共享数据,以创建两个下游报告(BuzzFeed股票和AppTech)。
入库应用程序读取股市团队每月提供的每日股票价格的CSV文件。团队按年份和月份对文件进行了分区,然后将价格合并到存储为单独文件的月度视图中,如示例4-2中的入库所示。
import pandas as pd
AppTech = pd.read_csv(
"data/AppTech.csv",
parse_dates=["Date"],
dtype={"Symbol": "category"},
)
Buzzfeed = pd.read_csv(
"data/Buzzfeed.csv",
parse_dates=["Date"],
dtype={"Symbol": "category"},
)
monthly_assets = pd.concat([AppTech, Buzzfeed]) \
.astype({"Symbol": "category"})
monthly_assets.to_csv(
"data/monthly_assets.csv", index=False
)
在运行了第一个脚本并且文件可用之后,可以运行其余的流水线脚本来生成BuzzFeed和AppTech股票报告。这个示例只有一个脚本,即报告的Python文件,如示例4-3所示。
import pandas as pd
all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date'])
apptech = all_assets[all_assets['Symbol'] == 'APCX']
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']
buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open']
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']
kept_values = ['Open', 'Adj Close', 'Intraday_Delta']
buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False)
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)
为了正确执行流水线,尊重报告和摄取应用程序之间的依赖关系非常重要。换句话说,必须在运行报告应用程序之前成功运行摄取应用程序。否则,流水线将失败。实现这一目标的方法是使用一个编排器,在报告之前运行摄取应用程序,比如Dagster和Airflow。事实上,编排器是另一个需要配置的应用程序,需要在数据级别明确硬编码应用程序之间的依赖关系。但是,应用程序本身仍然不了解它们的下游依赖关系。
在编排器中硬编码依赖关系的反面是,这是一个需要维护的新资产(例如,明确依赖关系的准确性)。此外,在创建流水线时会施加额外的约束,因为团队必须保持独立,因此不能简单地更新现有的流水线以满足他们自己的需求。因此,必须在更高的级别创建一个扩展,通过添加一个新的分离的DAG来实现,这可能会断开明确的依赖关系。
回到我们的流水线,让我们讨论应用程序之间的功能依赖关系;也就是说,在运行报告之前,必须成功运行摄取。但成功是什么意思呢?
确定数据流水线的执行是否成功,我将分析相反的问题:摄取过程中可能发生哪些失败?明确的失败会导致应用程序崩溃。如果您使用编排器,这种类型的故障很容易处理,因为它是一个标志,用于不触发下一个应用程序,例如我们的示例中的报告应用程序。
另一方面,静默失败是指应用程序在没有错误代码或日志的情况下完成。因为它没有按预期运行,所以您必须考虑第二章介绍的期望的概念。
摄取应用程序的观察者可能会遇到以下明确的失败:
但从工程师观察的角度来看,以下示例表明了静默失败:
由于这些故障中的任何一个都可能发生,因此必须在它们发生时具备可见性,并且更好的做法是在摄取应用程序中及早预防它们的传播(请参阅“快速失败和安全失败”)。已经将明确的失败制作成了可见性,作为开发实践,以明确捕获这些错误(在Python中使用try...except)。但是,为了使观察者能够识别和发现静默失败,他们需要应用程序生成适当的观察结果。
在本节中,我将概述数据流水线必须生成的数据观察结果。为此,让我们快速查看图4-2,该图显示了低级API如何实现第2章中提出的模型。有趣的是,它们具有相似的结构,甚至一些实体(标记为)也相同;在接下来的段落中,我将逐个详细介绍每个部分,以突出这些事实。
在此图中,您会注意到用大写字母A、B、C和D标记的实体,它们位于圆圈内。 "A" 数据源突出显示了由摄取应用程序生成的观察结果,这些观察结果涉及其生成的数据,以及由报告应用程序读取数据时生成的观察结果,因此明确了隐式依赖关系。 实际上,这两个应用程序都生成了多个类似的观察结果,这些观察结果代表将它们联系在一起的所有依赖关系。在图4-2中,还突出显示了以下类似的观察结果:
在这一部分,我将介绍生成关于摄取应用程序执行上下文的观察所需的代码,如图4-3所示(请注意,报告可以重用相同的代码)。
将示例4-4中的代码插入到文件开头,以生成摄取应用程序的观察。
app_user = getpass.getuser()
repo = git.Repo(os.getcwd(), search_parent_directories=True)
code_repo = repo.remote().url
commit = repo.head.commit
code_version = commit.hexsha
code_author = commit.author.name
application_name = os.path.basename(os.path.realpath(__file__))
application_start_time = datetime.datetime.now().isoformat()
示例4-5中的额外指令创建了用于观察的变量,但目前还没有使用它们。如前所述,要记录信息,我们使用与示例4-5中编码的信息模型的JSON表示。
application_observations = {
"name": application_name,
"code": {
"repo": code_repo,
"version": code_version,
"author": code_author
},
"execution": {
"start": application_start_time,
"user": app_user
}
}
此代码创建了一个包含到目前为止创建的所有观察结果的JSON。但是,本节是关于使用数据可观察性的低级API。随着我们的继续,我们将遇到类似的模式,这为我们提供了创建函数的机会,以简化代码并在未来的数据收集和报告应用程序中共享它们。为了创建API,我们创建一个模型,模仿JSON中的观察核心模型,将每个实体转换为一个类,并将关系转换为引用(请参见示例4-6)。
class Application:
name: str
def __init__(self, name: str, repository: ApplicationRepository) -> None:
self.name = name
self.repository = repository
def to_json(self):
return {"name": self.name, "repository": self.repository.to_json()}
class ApplicationRepository:
location: str
def __init__(self, location: str) -> None:
self.location = location
def to_json(self):
return {"location": self.location}
app_repo = ApplicationRepository(code_repo)
app = Application(application_name, app_repo)
这意味着应用程序实体必须具有一个Application
类,具有一个能够保存文件名的属性name
,该属性可以将文件名存储为一个application_name
变量,并引用一个ApplicationRepository
实例。这个ApplicationRepository
实体将被编码为一个ApplicationRepository
类,具有属性location
,设置为git远程位置。
这个结构将有助于构建模型并生成更容易重用且能够导致标准化的JSON表示。 将概念编码成API类的一个附加好处是,它们有责任提供助手来提取相关的观察结果,就像示例4-7中所示。
location: str
# [...]
@staticmethod
def fetch_git_location():
import git
code_repo = git.Repo(os.getcwd(), search_parent_directories=True).remote().url
return code_repo
class Application:
name: str
# [...]
@staticmethod
def fetch_file_name():
import os
application_name = os.path.basename(os.path.realpath(__file__))
return application_name
app_repo = ApplicationRepository(ApplicationRepository.fetch_git_location())
app = Application(Application.fetch_file_name(), app_repo)
这种策略可能是实现该模型的一种直接方式。然而,我们更喜欢另一种方法,它减弱了实体之间的联系。在示例4-8中,所有信息都将记录在一个JSON中,实体分散在信息树中,其中Application是根实体。这种编码方式迫使我们在记录根实体之前创建所有观察结果,而根实体就是Application实例。Application构造函数将变成类似示例4-8的内容。
class Application:
name: str
def __init__(self, name: str, version: ApplicationVersion,
repo: ApplicationRepository, execution: ApplicationExecution,
server: Server, author: User) -> None:
pass
为了避免这种复杂性和约束,更好的方法是颠倒实体之间的依赖关系。不再让Application包含其ApplicationVersion或ApplicationRepository,而是创建单独的Application,然后在ApplicationVersion和ApplicationRepository内部添加对它的弱引用。示例4-9展示了这种模型的外观。
class ApplicationRepository:
location: str
application: Application
id: str
def __init__(self, location: str, application: Application) -> None:
self.location = location
self.application = application
id_content = ",".join([self.location, self.application.id])
self.id = md5(content.encode("utf-8")).hexdigest()
def to_json(self):
return { "id": self.id,
"location": self.location,
"application": self.application.id }
@staticmethod
def fetch_git_location():
import git
code_repo = git.Repo(os.getcwd(),
search_parent_directories=True).remote().url
return code_repo
有了这个模型,我们可以单独记录每个观察结果——logging.info
的两个调用——从而减少需要保存的信息量。因为我们需要重新构建实体之间的关系,所以引入了id
变量,以减少需要记录的信息和需要链接的观察结果数量。使用这些日志,id
可以通过它们的id
来重建模型中的链接,例如ApplicationRepository
和Application
之间的依赖关系,因为它们已经被记录下来了。
在这个示例中,应用程序在本地生成了id
,导致了一个设计上的问题,使其在多次执行中不一致。为了解决这个问题,我们必须定义一个功能性的id
,可以在多次执行、部署和应用程序之间标识实体。这个概念在建模中称为主键。您可以将主键用作散列算法的输入,例如使用hashlib
以确定性方式生成id
,在本例中使用md5
。
示例4-9说明了如何使用主键来一致生成id
,例如通过使用md5
。在本章的后续部分,我们将使用这种策略来生成实体。
现在让我们讨论主要的数据可观测性组件的观察结果:数据源、模式和数据指标。图4-4显示了我们必须记录的观察结果。
要生成的观察结果与所读取和写入的数据源相关。然后,处理转换(谱系)的问题。在摄入代码中,许多源使用pandas read_csv函数读取。
数据源的观察主要是提供给read_csv函数作为参数的文件路径。另一个观察是数据源格式,由于read_csv没有使用任何特定的解析器属性,如sep,因此数据是“真正的”CSV。
此外,read_csv没有提供关于标头的任何信息。因此,第一个非空行应该是列名,这将有助于创建关于摄入应用程序访问的数据的观察。
列的类型是可以观察的另一项信息。 pandas推断出大多数数值类型。但是,字符串、类别和日期保留为对象。 read_csv函数提供了两个提示:Date是日期信息,Symbol是分类数据,或者在这种情况下是字符串。
通过这些值,pandas创建了一个DataFrame,一种我们可以使用来捕获到目前为止列出的信息的表格。此外,DataFrame还允许我们计算关于值本身的附加内在信息,例如描述性统计信息。我们使用describe函数来计算这些基本统计信息,包括数字值和其他类型(如分类或日期值)的统计信息,使用参数:include='all'。
最后,摄入应用程序使用to_csv函数将月度资产CSV文件写入到保存要写入的值的内存中的DataFrame中。此函数提供的信息几乎与read_csv函数相同。因此,我们可以得出文件路径、列和类型。
让我们看看如何在可重用的API中建模这些观察结果。第一个类是DataSource,相对来说比较容易建模,如示例4-10所示。我们主要想知道格式以及应用程序正在读取或写入的数据源的位置。因此,它将模拟到文件的路径。
class DataSource:
location: str
format: str
id: str
def __init__(self, location: str, format: str = None) -> None:
self.location = location
self.format = format
id_content = ",".join([self.location, self.application.id])
self.id = md5(content.encode("utf-8")).hexdigest()
def to_json(self):
return {"id": self.id, "location": self.location, "format": self.format}
此外,id是基于提供的路径生成的,该路径是相对路径(以./开头),应扩展为使用绝对路径。此外,如果我们使用数据库表,例如除read_csv之外的其他方法,我们需要添加其他辅助方法来处理连接字符串。
接下来,示例4-11可以对应用程序在数据源中操作的模式进行建模。
class Schema:
fields: list[tuple[str, str]]
data_source: DataSource
id: str
def __init__(self, fields: list[tuple[str, str]], data_source: DataSource) -> None:
self.fields = fields
self.data_source = data_source
linearized_fields = ",".join(list(map(lambda x: x[0] + "-" + x[1],
sorted(self.fields))))
id_content = ",".join([linearized_fields, self.data_source.id])
self.id = hashlib.md5(id_content.encode("utf-8")).hexdigest()
def to_json(self):
from functools import reduce
jfields = reduce(lambda x, y: dict(**x, **y),
map(lambda f: {f[0]: f[1]},
self.fields))
return {"id": self.id, "fields": jfields, "data_source": self.data_source.id}
@staticmethod
def extract_fields_from_dataframe(df: pd.DataFrame):
fs = list(zip(df.columns.values.tolist(), map(lambda x: str(x),
df.dtypes.values.tolist())))
return fs
Schema类具有fields属性,用于模拟CSV文件的列。在这里,我们可以从pandas DataFrame的元数据中提取它:columns和dtypes。我们选择将字段表示为一对(字段名称,字段类型)的列表。
这部分的最后一个类是DataMetrics,它将模拟摄取应用程序读取和写入的文件的指标。但是,这是该类的不完整版本,因为它只编码了与Schema的关系。必须扩展它以确保它提供了特定用途的数据指标的可见性。这一目标需要一个后续的lineage(参见示例4-15)。
当前的类看起来像示例4-12。
class DataMetrics:
schema: Schema
metrics: list[tuple[str, float]]
id: str
def __init__(self, metrics: list[tuple[str, float]], schema: Schema) -> None:
self.metrics = metrics
self.schema = schema
self.id = hashlib.md5(",".join([self.schema.id]).encode("utf-8")).hexdigest()
def to_json(self):
from functools import reduce
jfields = reduce(lambda x, y: dict(**x, **y),
map(lambda f: {f[0]: f[1]},
self.metrics))
return {"id": self.id, "metrics": jfields, "schema": self.schema.id}
@staticmethod
def extract_metrics_from_dataframe(df: pd.DataFrame):
d = df.describe(include='all', datetime_is_numeric=True)
import math
import numbers
metrics = {}
filterf = lambda x: isinstance(x[1], numbers.Number) and not math.isnan(x[1])
mapperf = lambda x: (field + "." + x[0], x[1])
for field in d.columns[1:]:
msd = dict(filter(filterf, map(mapperf, d[field].to_dict().items())))
metrics.update(msd)
# metrics looks like:
# {"Symbol.count": 20, "Symbol.unique": 1, "Symbol.freq": 20,
# "Open.count": 20.0, "Open.mean": 3.315075, "Open.min": 1.68,
# "Open.25%": 1.88425"Open.75%": 2.37, "Open.max": 14.725,
# "Open.std": 3.7648500643766463, ...}
return list(metrics.items())
这里选择了这种简化形式的指标,仅表示数字描述统计信息,以一对(指标名称,数值)的列表形式。这些观察结果将直接从应用程序在包含数据值的DataFrame上调用describe函数的结果转换而来。
现在我们已经将所有实体定义为类,我们可以更新我们的代码(见示例4-13)以确保它适当地生成这些观察结果。
app = Application(Application.fetch_file_name())
app_repo = ApplicationRepository(ApplicationRepository.fetch_git_location(), app)
AppTech = pd.read_csv(
"data/AppTech.csv",
parse_dates=["Date"],
dtype={"Symbol": "category"},
)
AppTech_DS = DataSource("data/AppTech", "csv")
AppTech_SC = Schema(Schema.extract_fields_from_dataframe(AppTech), AppTech_DS)
AppTech_M = DataMetrics(DataMetrics.extract_metrics_from_dataframe(AppTech),
AppTech_SC)
Buzzfeed = pd.read_csv(
"data/Buzzfeed.csv",
parse_dates=["Date"],
dtype={"Symbol": "category"},
)
Buzzfeed_DS = DataSource("data/Buzzfeed", "csv")
Buzzfeed_SC = Schema(Schema.extract_fields_from_dataframe(Buzzfeed), Buzzfeed_DS)
Buzzfeed_M = DataMetrics(DataMetrics.extract_metrics_from_dataframe(Buzzfeed),
Buzzfeed_SC)
monthly_assets = pd.concat([AppTech, Buzzfeed]) \
.astype({"Symbol": "category"})
monthly_assets.to_csv(
"data/monthly_assets.csv", index=False
)
monthly_assets_DS = DataSource("data/monthly_assets", "csv")
monthly_assets_SC = Schema(Schema.extract_fields_from_dataframe(monthly_assets),
monthly_assets_DS)
monthly_assets_M = DataMetrics(
DataMetrics.extract_metrics_from_dataframe(monthly_assets),
monthly_assets_SC)
现在我们有了一个生成观察结果所需的代码,用于观察运行摄取应用程序时的数据行为。然而,大部分代码几乎是相同的。这个示例中的函数可以用来消除大部分的噪音。
def observations_for_df(df_name: str, df_format: str, df: pd.DataFrame) -> None:
ds = DataSource(df_name, df_format)
sc = Schema(Schema.extract_fields_from_dataframe(df), ds)
ms = DataMetrics(DataMetrics.extract_metrics_from_dataframe(df), sc)
在这一部分,我们处理的是文件,但如果你需要处理表格,使用SQL来读取、转换和写入数据呢?如果直接使用SQL执行这些操作,它包含与Python代码相同的操作。也就是说,它读取表格,进行转换,最终写入它们;例如,插入子查询。然而,SQL并不提供在中间生成信息的多种能力,比如元数据提取或度量。SQL的另一个应用程序(应该由数据库服务器负责)运行SQL。
请记住,SQL包含了大量信息,你的应用程序可以利用它。它包括表格、列、类型和执行的转换的名称;因此,解析查询会生成类似的观察结果,如示例4-14所示。实际上,我建议对SQL查询使用这种提取策略,因为SQL查询很可能是多次迭代和实验的结果,直到它完全按照预期运行(实际上,与任何代码一样)。因此,SQL查询实现了每次迭代中的所有假设,例如包括以下内容:
在添加了生成数据和应用程序观察结果的摄取功能后,我们可以着手处理另一个领域,这将提供更多的信息——即在部署应用程序时生成有关分析部分的观察结果。
为了生成关于管道中应用程序之间在数据级别的相互作用以及列级别的观察结果,我将在本节中介绍如何生成血统实体,如图4-5所示。
正如在第2章中讨论的那样,连接血统(数据流和转换,在本例中)的模型部分将数据可观察性领域的主要组件的观察结果与应用程序组件连接在一起。它位于所有三个领域的交汇处,即分析可观察性领域。
为了编码这一部分,我们必须生成关于使用哪些数据源(输入)来生成其他数据源(输出)的观察结果。对于摄取来说,股票CSV文件有助于生成月度资产CSV文件。再深入一层,摄取将来自所有列的所有值连接起来,然后将结果写入文件。在字段级别的血统是直接的,这意味着输入和输出列名相同,并且输出仅从每个输入数据源的此列中获取其值。
示例4-15展示了如何在报告应用程序中包含血统(OutputDataLineage)的生成,使用一个专门的类来定义输出与其输入的依赖关系。
class OutputDataLineage:
schema: Schema
input_schemas: list[tuple[Schema, dict]]
id: str
def __init__(self, schema: Schema,
input_schemas_mapping: list[tuple[Schema, dict]]) -> None:
self.schema = schema
self.input_schemas_mapping = input_schemas_mapping
self.id = hashlib.md5(",".join([self.schema.id]).encode("utf-8") \
+ self.linearize().encode("utf-8")).hexdigest()
def to_json(self):
return {"id": self.id, "schema": self.schema.id,
"input_schemas_mapping": self.input_schemas_mapping}
示例4-16展示了OutputDataLineage的一部分静态函数,该函数将在字段级别将每个输入与输出进行映射,以以下方式连接它们:output_field -> 每个输入的输入字段列表。generate_direct_mapping助手使用了一个过于简单的启发式方法来绑定数据源,当字段名匹配时映射输出数据源。 这种策略在大多数实际用例中都会失败,特别是在需要更谨慎的跟踪以管理所有连接的聚合情况下。 通过使用下一章讨论的策略之一,您可以避免出现这种情况。
@staticmethod
def generate_direct_mapping(output_schema: Schema, input_schemas: list[Schema]):
input_schemas_mapping = []
output_schema_field_names = [f[0] for f in output_schema.fields]
for schema in input_schemas:
mapping = {}
for field in schema.fields:
if field[0] in output_schema_field_names:
mapping[field[0]] = [field[0]]
if len(mapping):
input_schemas_mapping.append((schema, mapping))
return input_schemas_mapping
define linearize(self):
[...]
return linearized
然后,在示例4-17中,我定义了与上下文(与应用程序相关)观测相关联的谱系执行ApplicationExecution。
class DataLineageExecution:
lineage: OutputDataLineage
application_execution: ApplicationExecution
start_time: str
id: str
def __init__(self, lineage: OutputDataLineage,
application_execution: ApplicationExecution) -> None:
self.lineage = lineage
self.application_execution = application_execution
self.start_time = datetime.datetime.now().isoformat()
self.id = hashlib.md5(
",".join([self.lineage.id, self.application_execution.id,
self.start_time]).encode("utf-8")).hexdigest()
def to_json(self):
return {"id": self.id, "lineage": self.lineage.id,
"application_execution": self.application_execution.id,
"start_time": self.start_time}
另外,第二章介绍的数据可观性核心模型将数据度量实体与谱系的执行相连接,以便在数据被使用或修改时提供可见性。因此,我们通过添加lineage_execution属性来调整DataMetrics模型,以表示这种关联,如示例4-18所示。
class DataMetrics:
schema: Schema
lineage_execution: DataLineageExecution
metrics: list[tuple[str, float]]
id: str
def __init__(self, metrics: list[tuple[str, float]], schema: Schema,
lineage_execution: DataLineageExecution) -> None:
self.metrics = metrics
self.schema = schema
self.lineage_execution = lineage_execution
id_content = ",".join([self.schema.id, self.lineage_execution.id]
self.id = hashlib.md5(id_content).encode("utf-8")).hexdigest()
def to_json(self):
from functools import reduce
jfields = reduce(lambda x, y: dict(**x, **y),
map(lambda f: {f[0]: f[1]}, self.metrics))
return {"id": self.id, "metrics": jfields, "schema": self.schema.id,
"lineage_execution": self.lineage_execution.id}
@staticmethod
def extract_metrics_from_df(df: pd.DataFrame):
d = df.describe(include='all', datetime_is_numeric=True)
import math
import numbers
metrics = {}
filterf = lambda x: isinstance(x[1], numbers.Number) and not math.isnan(x[1])
mapperf = lambda x: (field + "." + x[0], x[1])
for field in d.columns[1:]:
msd = dict(filter(filterf, map(mapperf, d[field].to_dict().items())))
metrics.update(msd)
return list(metrics.items())
现在所有的组件都准备好生成图4-5中显示的观测数据了。最终的数据摄取脚本可以在GitHub仓库中查看。
在继续分析观察如何帮助解决本节开头介绍的明显和悄悄失败之前,我们将迄今为止所做的工作重新用于报告应用程序的数据可观察性。请参见示例 4-19。
import ApplicationRepository.fetch_git_location
import ApplicationVersion.fetch_git_version
app = Application(Application.fetch_file_name())
app_repo = ApplicationRepository(fetch_git_location(), app)
git_user = User(ApplicationVersion.fetch_git_author())
app_version = ApplicationVersion(fetch_git_version(), git_user, app_repo)
current_user = User("Emanuele Lucchini")
app_exe = ApplicationExecution(app_version, current_user)
all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date'])
apptech = all_assets[all_assets['Symbol'] == 'APCX']
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']
buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open']
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']
kept_values = ['Open', 'Adj Close', 'Intraday_Delta']
buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False)
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)
all_assets_ds = DataSource("data/monthly_assets.csv", "csv")
all_assets_sc = Schema(Schema.extract_fields_from_dataframe(all_assets),
all_assets_ds)
buzzfeed_ds = DataSource("data/report_buzzfeed.csv", "csv")
buzzfeed_sc = Schema(Schema.extract_fields_from_dataframe(buzzfeed), buzzfeed_ds)
apptech_ds = DataSource("data/report_appTech.csv", "csv")
apptech_sc = Schema(Schema.extract_fields_from_dataframe(apptech), apptech_ds)
# First lineage
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
OutputDataLineage.generate_direct_mapping
(buzzfeed_sc, [all_assets_sc]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets),
all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), buzzfeed_sc,
lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
OutputDataLineage.generate_direct_mapping
(apptech_sc, [all_assets_sc]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets),
all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc,
lineage_apptech_exe)
通过以这种方式添加观察,我们使修改与我们在摄取应用程序中所做的类似。这种方法使我们能够建立习惯和抽象,比如一个框架,可以减少所需的更改数量 - 几乎是开发中的一种隐式规律。
在示例 4-19 中,请注意为输入生成的观察已移至末尾。我们为示例的简单性而做出了这种实现选择。一个好处是附加计算在末尾完成,而不会影响业务流程。一个不足之处在于,如果中间发生了故障,将不会发送关于数据源及其架构的观察。当然,通过对代码进行一些调整,可以避免这种情况。另外,对于这个低级API的介绍,我们必须添加一些模板代码以生成正确的信息,这可能听起来像是噪音,与业务代码成比例。但是,请记住,该脚本最初没有日志。一般来说,日志是零散添加的,以生成有关脚本自身行为的一些信息,这就是我们所做的,但是应用于数据。
还要记住以下几点: 我们按原样重用了 OutputDataLineage.generate_direct_mapping 助手来创建输出和输入之间的系谱映射。但是,它不起作用,因为我们从 monthly_assets.csv 文件中聚合了 Adj Close 和 Open 到新的 Intraday_Delta 列中。因为字段名称不相同,"直接" 启发法不会捕捉到这种依赖关系。 关于报告每次都再次报告 monthly_assets 的相同观察结果显示了警告消息。我们之所以这样做,是因为我们对每个输出编码了系谱。现在我们有两个系谱,每个都产生报告的 report_buzzfeed.csv 和 report_AppTech.csv 文件。因为输出重用与输入(经过筛选)相同的数据,所以我们必须报告每个输出的输入是什么样的,以避免它们看起来像是重复的。作为替代方案,我们可以重用观察结果或调整模型以解决此重复情况。您可以考虑以下替代选项:
让我们解决第一个问题,确保系谱能够表示数据源之间的真实连接。为了以一种简化的方式做到这一点,我们将更新示例 4-19 中引入的助手函数,其中现在包括一个参数,用于提供每个输入的非直接映射。稍后的章节将介绍处理此常见用例的更简单、高效、易于维护和准确的策略,例如使用猴子补丁。
@staticmethod
def generate_direct_mapping(output_schema: Schema,
input_schemas: list[tuple[Schema, dict]]):
input_schemas_mapping = []
output_schema_field_names = [f[0] for f in output_schema.fields]
for (schema, extra_mapping) in input_schemas:
mapping = {}
for field in schema.fields:
if field[0] in output_schema_field_names:
mapping[field[0]] = [field[0]]
mapping.update(extra_mapping)
if len(mapping):
input_schemas_mapping.append((schema, mapping))
return input_schemas_mapping
示例 4-21 展示了报告应用程序的最终观察部分。
# First lineage
intraday_delta_mapping = {"Intraday_Delta": ['Adj Close', 'Open']}
a = (all_assets_sc, intraday_delta_mapping)
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
OutputDataLineage.generate_direct_mapping(
buzzfeed_sc, [(all_assets_sc,
intraday_delta_mapping)]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets),
all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed),
buzzfeed_sc, lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
OutputDataLineage.generate_direct_mapping(
apptech_sc, [(all_assets_sc,
intraday_delta_mapping)]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets),
all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc,
lineage_apptech_exe)
现在,我们可以部署、运行和监视我们的管道,使用它们每次运行时生成的观察结果。低级别的API需要相当多的额外工作和坚持才能实现。然而,我们对结果感到满意。在遇到生产问题时,花在这些任务上的每一分钟都会使我们受益百倍,以避免收入损失—遵循总质量成本法则,1-10-100—当问题发生时。
让我们从本节提到的可能出现的问题开始,首先是摄取应用程序的故障:
接下来,我们必须考虑报告应用程序可能会将摄取应用程序视为失败的情况,以及在适用的情况下,观察者可以使用的报告应用程序。这些情况包括:
现在,我们已经准备好处理各种数据是问题来源的情况。没有这些知识,这些情况将需要长时间的高压会议来理解它们,以及消耗几个小时甚至几天的调试工作,可能会变成几个月,因为我们无法访问生产中的数据。
到目前为止讨论的问题是我们知道可能会发生的问题。然而,许多其他问题可能会在整个管道中传播,我们对此知之甚少—未知的未知。例如,由于在CSV导出过程中引入了数据错误,某只股票的一个月前的值是不正确的。这个问题至少会发生在你的一个应用程序和其他类似的应用程序中。
由于这些未知的未知因素,数据观察不能仅限于仅覆盖预定义的情况,而是必须尽可能地报告尽可能多的观察结果—可能在计算资源和时间上有一些约束—以生成关于预期或未满足的情况的可见性。在这个示例中,月度股票价值的分布将有助于以后与其他月份进行比较,并且它们可能提供关于这些值是否相等或相似的提示。
使用低级别的日志记录具有完全灵活性,可以生成任何您可以生成为可见的内容。例如,自定义度量标准和关键绩效指标(KPI)。
所有作业并不相同;每家公司、项目和应用程序都有其自己的具体细节。您可能会控制特定的度量标准,无论它们与消耗的数据还是生成的数据有关。例如,对于一个表,这样的度量标准可以是项目数乘以每单位成本减去从Web服务获得的金额,count(items) * cost_per_unit。结果必须始终大于零。这可以很容易地添加到源代码中,但必须由工程师添加,因为这构成了与业务逻辑(和列的语义)相关的特定度量标准。
自定义观察的另一个原因是关键绩效指标(KPI)—这些是利益相关者请求的对底层业务重要的数字。KPI通常定期报告或根据需要计算,并在随机或固定的时间间隔内使用。然而,它们的含义非常强烈,利益相关者对其寄予了很高的期望,几乎没有时间等待修正。因此,您还必须创建有关KPI随时间如何发展的可见性,因为如果利益相关者对其产生疑虑,他们提问其正确性的时刻一开始,时间就开始流逝。为了增加响应性,您通常必须了解KPI如何发展,识别它们在发生之前如何变化,以及根据其历史值和血统了解它们变化的原因。
正如您在报告应用程序更新过程中所预见的,定义API—模型、编码和函数—不是每个应用程序的任务,而是必须在应用程序之间进行标准化和重用。标准化减少了每个应用程序的工作量。更重要的是,它使观察结果成为均匀的,独立于应用程序,以简化观察者在匹配参与其分析的其他应用程序的行为时的工作。
标准化还有助于跨应用程序重用实体,例如资产月度DataSource,这是摄取应用程序的输出和报告应用程序的输入。通过观察结果的均匀表示,可以通过跨应用程序重用实体来巩固整个管道的状态。
支持数据可观测性的架构的一部分必须包括创建一个外部系统,以系统地聚合观察结果,以构建全局视图。通过拥有依赖于这个聚合视图并对其采取行动的观察者,系统可以发展到执行一些当前由观察者执行的操作,这就是机器学习发挥作用的地方。
本章对数据可观察性在数据源方面的全面探讨及其在提高数据质量和运营卓越方面的重要性。我们深入探讨了在数据应用程序代码中生成数据观察结果的概念,强调了在应用程序的各个组件中合并观察结果生成代码的重要性。这些组件包括应用程序本身、正在使用的数据、它们之间的关系以及它们包含的内容。
此外,我们讨论了在低级别创建数据可观察性Python API的概念,该API为开发人员提供了将数据观察功能无缝集成到其应用程序中的强大工具集。通过这个API,从业者可以生成数据观察结果,跟踪数据流动,并确保其数据的可靠性和准确性。
为了加强这些概念,我们提供了一个完全可工作的示例,展示了将一个非数据可观察性的Python数据管道转化为稳健的、以数据可观察性为驱动的管道。通过利用专用的数据可观察性Python API,我们演示了如何生成、捕获和利用数据观察结果,以增强可见性、识别问题并推动持续改进。
随着我们继续前进,本章探讨的原则和策略成为将数据可观察性纳入数据应用程序的基础。通过采用这些实践,组织可以确保其数据管道稳健、可靠,并能够以高度信任的方式提供有价值的见解。
尽管低级别日志记录具有超高度可自定义性和灵活性,但其采用可能会受到所需的初始工作的阻碍。这个论点也适用于测试的采用。因此,在这个层面上简化使用复杂性至关重要。此外,我们需要探索在促进团队和个人之间广泛采用数据可观察性的同时,补充低级别日志记录的替代方法。接下来的章节将深入探讨这个主题,从探讨基于事件的系统开始。
阅读量:648
点赞量:0
收藏量:0