第四章:生成数据观察结果-灵析社区

攻城狮无远

正如第三章所解释的,数据可观察性结合了技术和人员的作用,从数据角度收集系统状态的信息以及对该状态的期望。然后,它利用这些信息来使系统更具适应性或更加弹性。

本章将解释如何应用数据可观察性实践。我将从“数据源的数据可观察性”开始,这是一种引入收集策略到日常数据工作的方法,并向您展示如何最大程度地减少对效率的影响。然后,本章将详细说明如何实现订阅软件交付生命周期的期望,例如持续集成和持续部署(CI/CD)。

与任何新兴的实践和技术一样,为了增加数据可观察性的采用率,您需要降低参与的门槛;这样,人们就没有太多理由反对这种变化。然而,人员也是解决方案的一部分,因为他们在该过程中的参与对于确定他们的期望并对规则进行编码至关重要。为此,您将学习减少生成观察结果所需的工作量的几种方法,并了解如何在开发生命周期的适当阶段引入它们。

在数据源头

第二章解释了帮助观察者的信息的来源和类型。但是,如何从这些来源生成和收集信息呢? 这始于数据源头的数据可观察性。术语“源头”指的是负责读取、转换和写入数据的应用程序。正如第3章所解释的,这些应用程序可能是问题的根本原因,也可能是解决问题的手段。此外,与数据本身不同,这些应用程序在工程师和组织的控制之下。

因此,数据源头的数据可观察性方法依赖于应用程序能够生成遵循第2章中介绍的模型的数据观察,换句话说,这些应用程序被制作成数据可观察的。 纯数据和分析观察,如数据源、模式、血统和指标,与读取、转换和写入活动相关。本节中描述的策略通过解释运行这些活动时要考虑什么来解决这些活动。

在数据源头生成数据观察

在源头生成数据观察始于生成额外的信息,捕捉数据的特定活动行为:读取、转换和写入。例如,开发人员可以添加包含生成有关其应用程序执行的可见性所需信息的日志行。指南使用第2章中介绍的通道,即日志、指标、跟踪或血统,来传达可以集中在日志系统中的观察结果。

在下一节中,您将学习如何创建JSON格式的数据观察,这些观察可以收集(发布)到本地文件、本地服务、远程(网络)服务或类似的目标。例如,根据第2章中的数据可观察核心模型,Postgres表的数据源和模式实体将如示例4-1所示。

{
  "id": "f1813697-339f-5576-a7ce-6eff6eb63249",
  "name": "gold.crm.customer",
  "location": "main-pg:5432/gold/crm/table",
  "format": "postgres"
}
{
  "id": "f1813697-339f-5576-a7ce-6eff6eb63249",
  "data_source_ref": {"by_id": "e21ce0a8-a01e-5225-8a30-5dd809c0952e"},
  "fields": [
  { "name": "lastname", "type": "string", "nullable": true }, 
  { "name": "id", "type": "int", "nullable": false }
  ]
}

能够在一个集中的平台上以相同的模型收集数据观察结果,对于在规模上生成数据可观察性的价值至关重要,比如跨应用程序、团队和部门。这就是为什么使用数据可观察核心模型对于轻松汇总数据观察结果,尤其是沿着血统线的汇总,非常重要。

让我们看看如何使用Python生成数据观察结果,使用低级API,这将用于介绍更高级的抽象(在接下来的章节中介绍)。

Python中的低级API

使用低级API的策略需要大量的时间和参与,因为您需要明确地创建每个观察结果。但是,这种策略也为您提供了最大的灵活性,因为它不涉及任何高级抽象。 另一方面,在这个级别支持数据可观察性,特别是在探索和维护期间,需要开发人员保持一致,并始终考虑他们可能在生产环境中想要观察的内容(例如,任何高级开发人员都应该为日志和检查生成与业务逻辑一样多的行)。在开发过程中,开发人员必须通过生成相关的观察结果来为应用程序的逻辑或行为修改创建可见性。此类观察的示例包括与新表的连接、新文件的创建或带有新字段的结构更改。 在接下来的几节中,您将通过一个完整的示例,了解使用Python编写的数据应用程序,这些应用程序在处理数据时生成数据观察,并完成以下操作:

  1. 了解没有数据可观察性能力的应用程序。
  2. 添加生成数据观察的指令以及它们的目的。
  3. 深入了解使用这种策略的利弊。

数据管道的描述

在本章的其余部分,我将使用一种用Python编写的数据管道,我们将使其具有数据可观测性。GitHub上的管道代码允许您运行本章中的示例。它使用pandas库来处理CSV文件,由两个应用程序组成,即入库和报告,如图4-1所示。


两个应用程序(入库和报告)都使用Python和pandas,并从“每日股票价格”数据源中共享数据,以创建两个下游报告(BuzzFeed股票和AppTech)。

入库应用程序读取股市团队每月提供的每日股票价格的CSV文件。团队按年份和月份对文件进行了分区,然后将价格合并到存储为单独文件的月度视图中,如示例4-2中的入库所示。
import pandas as pd

AppTech = pd.read_csv( 
   "data/AppTech.csv",
   parse_dates=["Date"], 
   dtype={"Symbol": "category"}, 
)
Buzzfeed = pd.read_csv(
   "data/Buzzfeed.csv",
   parse_dates=["Date"],
   dtype={"Symbol": "category"},
)

monthly_assets = pd.concat([AppTech, Buzzfeed]) \ 
   .astype({"Symbol": "category"})
monthly_assets.to_csv(
   "data/monthly_assets.csv", index=False
)

在运行了第一个脚本并且文件可用之后,可以运行其余的流水线脚本来生成BuzzFeed和AppTech股票报告。这个示例只有一个脚本,即报告的Python文件,如示例4-3所示。


import pandas as pd

all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date']) 

apptech = all_assets[all_assets['Symbol'] == 'APCX'] 
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']

buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open'] 
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']

kept_values = ['Open', 'Adj Close', 'Intraday_Delta']

buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False) 
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)

为了正确执行流水线,尊重报告和摄取应用程序之间的依赖关系非常重要。换句话说,必须在运行报告应用程序之前成功运行摄取应用程序。否则,流水线将失败。实现这一目标的方法是使用一个编排器,在报告之前运行摄取应用程序,比如Dagster和Airflow。事实上,编排器是另一个需要配置的应用程序,需要在数据级别明确硬编码应用程序之间的依赖关系。但是,应用程序本身仍然不了解它们的下游依赖关系。

在编排器中硬编码依赖关系的反面是,这是一个需要维护的新资产(例如,明确依赖关系的准确性)。此外,在创建流水线时会施加额外的约束,因为团队必须保持独立,因此不能简单地更新现有的流水线以满足他们自己的需求。因此,必须在更高的级别创建一个扩展,通过添加一个新的分离的DAG来实现,这可能会断开明确的依赖关系。

回到我们的流水线,让我们讨论应用程序之间的功能依赖关系;也就是说,在运行报告之前,必须成功运行摄取。但成功是什么意思呢?

数据流水线的状态定义

确定数据流水线的执行是否成功,我将分析相反的问题:摄取过程中可能发生哪些失败?明确的失败会导致应用程序崩溃。如果您使用编排器,这种类型的故障很容易处理,因为它是一个标志,用于不触发下一个应用程序,例如我们的示例中的报告应用程序。

另一方面,静默失败是指应用程序在没有错误代码或日志的情况下完成。因为它没有按预期运行,所以您必须考虑第二章介绍的期望的概念。

摄取应用程序的观察者可能会遇到以下明确的失败:

  • 文件未找到错误:如果文件夹中的任何数据文件(例如Buzzfeed.csv)不可用,因为其名称已更改为小写,或者在运行摄取应用程序之前未创建文件,则会发生此类错误。
  • 类型错误(TypeError):如果某些值不能强制转换以匹配read_csv函数提供的类型,例如,当符号应该是类别时,出现此类错误。
  • 固定名称错误:当代码中明确用于访问值的任何字段(例如示例4-3中的列名Date和此案例中的Symbol)不存在或名称已更改时,会发生此类错误。
  • 文件系统错误:如果文件不可读取或文件夹对于运行应用程序的用户来说不可写,则会发生此类错误。
  • 内存错误:当文件增大到分配给应用程序的内存不再足够时,会发生此类错误。
  • 系统错误:如果磁盘没有足够的空间来写入汇总结果,则会触发此类错误。

但从工程师观察的角度来看,以下示例表明了静默失败:

  • 日期列无法解析为日期,因为其格式错误、模式更改或时区不一致。在这种情况下,该列不再是日期时间,而是一个对象。
  • 日期列包含值,但不是当前月份的值。所有值都是过去或未来的日期。
  • 日期列包含未来的值,因为生成器可能稍后运行并生成与其正在处理的月份进行比较的未来信息。这种情况可能会导致后续产生重复项,或者相同日期的不一致值,并且可能会导致某些聚合失败。
  • 报告应用程序也可能将摄取视为失败,因为它在报告开始时未写入月度汇总文件,使得在按照给定间隔运行报告工具时不可用。
  • 用于在算术中筛选的任何字段不可用或其名称已更改。

由于这些故障中的任何一个都可能发生,因此必须在它们发生时具备可见性,并且更好的做法是在摄取应用程序中及早预防它们的传播(请参阅“快速失败和安全失败”)。已经将明确的失败制作成了可见性,作为开发实践,以明确捕获这些错误(在Python中使用try...except)。但是,为了使观察者能够识别和发现静默失败,他们需要应用程序生成适当的观察结果。

数据流水线的数据观察

在本节中,我将概述数据流水线必须生成的数据观察结果。为此,让我们快速查看图4-2,该图显示了低级API如何实现第2章中提出的模型。有趣的是,它们具有相似的结构,甚至一些实体(标记为)也相同;在接下来的段落中,我将逐个详细介绍每个部分,以突出这些事实。

在此图中,您会注意到用大写字母A、B、C和D标记的实体,它们位于圆圈内。 "A" 数据源突出显示了由摄取应用程序生成的观察结果,这些观察结果涉及其生成的数据,以及由报告应用程序读取数据时生成的观察结果,因此明确了隐式依赖关系。 实际上,这两个应用程序都生成了多个类似的观察结果,这些观察结果代表将它们联系在一起的所有依赖关系。在图4-2中,还突出显示了以下类似的观察结果:

  • “B” 实体观察检索数据的服务器。
  • “C” 实体观察执行命令的用户。
  • “D” 实体观察由摄取生成的数据的模式,由报告应用程序读取。 让我们深入探讨需要添加到应用程序代码中以生成图4-2中显示的观察结果的内容。由于代码是用Python编写的,我们将使用logging模块来打印编码为JSON的观察结果。

生成上下文数据观察

在这一部分,我将介绍生成关于摄取应用程序执行上下文的观察所需的代码,如图4-3所示(请注意,报告可以重用相同的代码)。

将示例4-4中的代码插入到文件开头,以生成摄取应用程序的观察。

app_user = getpass.getuser() 
repo = git.Repo(os.getcwd(), search_parent_directories=True) 
code_repo = repo.remote().url
commit = repo.head.commit
code_version = commit.hexsha
code_author = commit.author.name
application_name = os.path.basename(os.path.realpath(__file__)) 
application_start_time = datetime.datetime.now().isoformat()

示例4-5中的额外指令创建了用于观察的变量,但目前还没有使用它们。如前所述,要记录信息,我们使用与示例4-5中编码的信息模型的JSON表示。

application_observations = { 
   "name": application_name,
   "code": {
       "repo": code_repo,
       "version": code_version,
       "author": code_author
   },
   "execution": {
       "start": application_start_time,
       "user": app_user
   }
}

此代码创建了一个包含到目前为止创建的所有观察结果的JSON。但是,本节是关于使用数据可观察性的低级API。随着我们的继续,我们将遇到类似的模式,这为我们提供了创建函数的机会,以简化代码并在未来的数据收集和报告应用程序中共享它们。为了创建API,我们创建一个模型,模仿JSON中的观察核心模型,将每个实体转换为一个类,并将关系转换为引用(请参见示例4-6)。


class Application: 
       name: str
    
       def __init__(self, name: str, repository: ApplicationRepository) -> None:
           self.name = name
           self.repository = repository
    
       def to_json(self):
           return {"name": self.name, "repository": self.repository.to_json()}
    
    class ApplicationRepository: 
       location: str
    
       def __init__(self, location: str) -> None:
           self.location = location
    
       def to_json(self):
           return {"location": self.location}
    
    app_repo = ApplicationRepository(code_repo)
    app = Application(application_name, app_repo)

这意味着应用程序实体必须具有一个Application类,具有一个能够保存文件名的属性name,该属性可以将文件名存储为一个application_name变量,并引用一个ApplicationRepository实例。这个ApplicationRepository实体将被编码为一个ApplicationRepository类,具有属性location,设置为git远程位置。

这个结构将有助于构建模型并生成更容易重用且能够导致标准化的JSON表示。 将概念编码成API类的一个附加好处是,它们有责任提供助手来提取相关的观察结果,就像示例4-7中所示。

location: str

   # [...]

   @staticmethod
   def fetch_git_location(): 
       import git
       code_repo = git.Repo(os.getcwd(), search_parent_directories=True).remote().url
       return code_repo


class Application:
   name: str

   # [...]
   @staticmethod
   def fetch_file_name(): 
       import os
       application_name = os.path.basename(os.path.realpath(__file__))
       return application_name


app_repo = ApplicationRepository(ApplicationRepository.fetch_git_location())
app = Application(Application.fetch_file_name(), app_repo)

这种策略可能是实现该模型的一种直接方式。然而,我们更喜欢另一种方法,它减弱了实体之间的联系。在示例4-8中,所有信息都将记录在一个JSON中,实体分散在信息树中,其中Application是根实体。这种编码方式迫使我们在记录根实体之前创建所有观察结果,而根实体就是Application实例。Application构造函数将变成类似示例4-8的内容。

class Application:
    name: str
    def __init__(self, name: str, version: ApplicationVersion, 
                 repo: ApplicationRepository, execution: ApplicationExecution, 
                 server: Server, author: User) -> None:
        pass

为了避免这种复杂性和约束,更好的方法是颠倒实体之间的依赖关系。不再让Application包含其ApplicationVersion或ApplicationRepository,而是创建单独的Application,然后在ApplicationVersion和ApplicationRepository内部添加对它的弱引用。示例4-9展示了这种模型的外观。

class ApplicationRepository:
   location: str
   application: Application
   id: str

   def __init__(self, location: str, application: Application) -> None:
       self.location = location
       self.application = application
       id_content = ",".join([self.location, self.application.id])
       self.id = md5(content.encode("utf-8")).hexdigest() 

   def to_json(self):
       return { "id": self.id, 
                "location": self.location, 
                "application": self.application.id }

   @staticmethod
   def fetch_git_location():
       import git
       code_repo = git.Repo(os.getcwd(), 
                            search_parent_directories=True).remote().url
       return code_repo

有了这个模型,我们可以单独记录每个观察结果——logging.info的两个调用——从而减少需要保存的信息量。因为我们需要重新构建实体之间的关系,所以引入了id变量,以减少需要记录的信息和需要链接的观察结果数量。使用这些日志,id可以通过它们的id来重建模型中的链接,例如ApplicationRepositoryApplication之间的依赖关系,因为它们已经被记录下来了。

在这个示例中,应用程序在本地生成了id,导致了一个设计上的问题,使其在多次执行中不一致。为了解决这个问题,我们必须定义一个功能性的id,可以在多次执行、部署和应用程序之间标识实体。这个概念在建模中称为主键。您可以将主键用作散列算法的输入,例如使用hashlib以确定性方式生成id,在本例中使用md5

示例4-9说明了如何使用主键来一致生成id,例如通过使用md5。在本章的后续部分,我们将使用这种策略来生成实体。

生成与数据相关的观察结果

现在让我们讨论主要的数据可观测性组件的观察结果:数据源、模式和数据指标。图4-4显示了我们必须记录的观察结果。


要生成的观察结果与所读取和写入的数据源相关。然后,处理转换(谱系)的问题。在摄入代码中,许多源使用pandas read_csv函数读取。

数据源的观察主要是提供给read_csv函数作为参数的文件路径。另一个观察是数据源格式,由于read_csv没有使用任何特定的解析器属性,如sep,因此数据是“真正的”CSV。

此外,read_csv没有提供关于标头的任何信息。因此,第一个非空行应该是列名,这将有助于创建关于摄入应用程序访问的数据的观察。

列的类型是可以观察的另一项信息。 pandas推断出大多数数值类型。但是,字符串、类别和日期保留为对象。 read_csv函数提供了两个提示:Date是日期信息,Symbol是分类数据,或者在这种情况下是字符串。

通过这些值,pandas创建了一个DataFrame,一种我们可以使用来捕获到目前为止列出的信息的表格。此外,DataFrame还允许我们计算关于值本身的附加内在信息,例如描述性统计信息。我们使用describe函数来计算这些基本统计信息,包括数字值和其他类型(如分类或日期值)的统计信息,使用参数:include='all'。

最后,摄入应用程序使用to_csv函数将月度资产CSV文件写入到保存要写入的值的内存中的DataFrame中。此函数提供的信息几乎与read_csv函数相同。因此,我们可以得出文件路径、列和类型。

让我们看看如何在可重用的API中建模这些观察结果。第一个类是DataSource,相对来说比较容易建模,如示例4-10所示。我们主要想知道格式以及应用程序正在读取或写入的数据源的位置。因此,它将模拟到文件的路径。

class DataSource:
   location: str
   format: str
   id: str

   def __init__(self, location: str, format: str = None) -> None: 
       self.location = location
       self.format = format
       id_content = ",".join([self.location, self.application.id])
       self.id = md5(content.encode("utf-8")).hexdigest() 

   def to_json(self):
       return {"id": self.id, "location": self.location, "format": self.format}

此外,id是基于提供的路径生成的,该路径是相对路径(以./开头),应扩展为使用绝对路径。此外,如果我们使用数据库表,例如除read_csv之外的其他方法,我们需要添加其他辅助方法来处理连接字符串。

接下来,示例4-11可以对应用程序在数据源中操作的模式进行建模。

class Schema:
   fields: list[tuple[str, str]]
   data_source: DataSource
   id: str

   def __init__(self, fields: list[tuple[str, str]], data_source: DataSource) -> None:
       self.fields = fields
       self.data_source = data_source
       linearized_fields = ",".join(list(map(lambda x: x[0] + "-" + x[1], 
                                    sorted(self.fields))))
       id_content = ",".join([linearized_fields, self.data_source.id])
       self.id = hashlib.md5(id_content.encode("utf-8")).hexdigest()

   def to_json(self):
       from functools import reduce
       jfields = reduce(lambda x, y: dict(**x, **y), 
                        map(lambda f: {f[0]: f[1]}, 
                        self.fields))
       return {"id": self.id, "fields": jfields, "data_source": self.data_source.id}

   @staticmethod
   def extract_fields_from_dataframe(df: pd.DataFrame): 
       fs = list(zip(df.columns.values.tolist(), map(lambda x: str(x), 
             df.dtypes.values.tolist())))
       return fs

Schema类具有fields属性,用于模拟CSV文件的列。在这里,我们可以从pandas DataFrame的元数据中提取它:columns和dtypes。我们选择将字段表示为一对(字段名称,字段类型)的列表。

这部分的最后一个类是DataMetrics,它将模拟摄取应用程序读取和写入的文件的指标。但是,这是该类的不完整版本,因为它只编码了与Schema的关系。必须扩展它以确保它提供了特定用途的数据指标的可见性。这一目标需要一个后续的lineage(参见示例4-15)。

当前的类看起来像示例4-12。

class DataMetrics:
   schema: Schema
   metrics: list[tuple[str, float]]
   id: str

   def __init__(self, metrics: list[tuple[str, float]], schema: Schema) -> None:
       self.metrics = metrics
       self.schema = schema
       self.id = hashlib.md5(",".join([self.schema.id]).encode("utf-8")).hexdigest()

   def to_json(self):
       from functools import reduce
       jfields = reduce(lambda x, y: dict(**x, **y), 
                        map(lambda f: {f[0]: f[1]}, 
                        self.metrics))
       return {"id": self.id, "metrics": jfields, "schema": self.schema.id}

   @staticmethod
   def extract_metrics_from_dataframe(df: pd.DataFrame): 
       d = df.describe(include='all', datetime_is_numeric=True) 
       import math
       import numbers
       metrics = {}
       filterf = lambda x: isinstance(x[1], numbers.Number) and not math.isnan(x[1])
       mapperf = lambda x: (field + "." + x[0], x[1])
       for field in d.columns[1:]:
           msd = dict(filter(filterf, map(mapperf, d[field].to_dict().items())))
           metrics.update(msd)
       # metrics looks like: 
       # {"Symbol.count": 20, "Symbol.unique": 1, "Symbol.freq": 20, 
       #   "Open.count": 20.0, "Open.mean": 3.315075, "Open.min": 1.68, 
       #   "Open.25%": 1.88425"Open.75%": 2.37, "Open.max": 14.725, 
       #   "Open.std": 3.7648500643766463, ...}
       return list(metrics.items())

这里选择了这种简化形式的指标,仅表示数字描述统计信息,以一对(指标名称,数值)的列表形式。这些观察结果将直接从应用程序在包含数据值的DataFrame上调用describe函数的结果转换而来。

现在我们已经将所有实体定义为类,我们可以更新我们的代码(见示例4-13)以确保它适当地生成这些观察结果。

app = Application(Application.fetch_file_name())
app_repo = ApplicationRepository(ApplicationRepository.fetch_git_location(), app)

AppTech = pd.read_csv(
   "data/AppTech.csv",
   parse_dates=["Date"],
   dtype={"Symbol": "category"},
)
AppTech_DS = DataSource("data/AppTech", "csv")
AppTech_SC = Schema(Schema.extract_fields_from_dataframe(AppTech), AppTech_DS)
AppTech_M = DataMetrics(DataMetrics.extract_metrics_from_dataframe(AppTech), 
             AppTech_SC)

Buzzfeed = pd.read_csv(
   "data/Buzzfeed.csv",
   parse_dates=["Date"],
   dtype={"Symbol": "category"},
)
Buzzfeed_DS = DataSource("data/Buzzfeed", "csv")
Buzzfeed_SC = Schema(Schema.extract_fields_from_dataframe(Buzzfeed), Buzzfeed_DS)
Buzzfeed_M = DataMetrics(DataMetrics.extract_metrics_from_dataframe(Buzzfeed), 
                          Buzzfeed_SC)

monthly_assets = pd.concat([AppTech, Buzzfeed]) \
   .astype({"Symbol": "category"})
monthly_assets.to_csv(
   "data/monthly_assets.csv", index=False
)
monthly_assets_DS = DataSource("data/monthly_assets", "csv")
monthly_assets_SC = Schema(Schema.extract_fields_from_dataframe(monthly_assets), 
                           monthly_assets_DS)
monthly_assets_M = DataMetrics(
                    DataMetrics.extract_metrics_from_dataframe(monthly_assets),
                                monthly_assets_SC)

现在我们有了一个生成观察结果所需的代码,用于观察运行摄取应用程序时的数据行为。然而,大部分代码几乎是相同的。这个示例中的函数可以用来消除大部分的噪音。

def observations_for_df(df_name: str, df_format: str, df: pd.DataFrame) -> None:
   ds = DataSource(df_name, df_format)
   sc = Schema(Schema.extract_fields_from_dataframe(df), ds)
   ms = DataMetrics(DataMetrics.extract_metrics_from_dataframe(df), sc)

在这一部分,我们处理的是文件,但如果你需要处理表格,使用SQL来读取、转换和写入数据呢?如果直接使用SQL执行这些操作,它包含与Python代码相同的操作。也就是说,它读取表格,进行转换,最终写入它们;例如,插入子查询。然而,SQL并不提供在中间生成信息的多种能力,比如元数据提取或度量。SQL的另一个应用程序(应该由数据库服务器负责)运行SQL。

请记住,SQL包含了大量信息,你的应用程序可以利用它。它包括表格、列、类型和执行的转换的名称;因此,解析查询会生成类似的观察结果,如示例4-14所示。实际上,我建议对SQL查询使用这种提取策略,因为SQL查询很可能是多次迭代和实验的结果,直到它完全按照预期运行(实际上,与任何代码一样)。因此,SQL查询实现了每次迭代中的所有假设,例如包括以下内容:

  • IS NOT NULL 表示列可以为null,并且过滤掉它们是可以接受的(假设它们的数量不会影响最终逻辑)。
  • cast("Amount" as "INT64") 表示金额始终可以转换为整数的假设。

在添加了生成数据和应用程序观察结果的摄取功能后,我们可以着手处理另一个领域,这将提供更多的信息——即在部署应用程序时生成有关分析部分的观察结果。

生成与数据血统相关的数据观察结果

为了生成关于管道中应用程序之间在数据级别的相互作用以及列级别的观察结果,我将在本节中介绍如何生成血统实体,如图4-5所示。

正如在第2章中讨论的那样,连接血统(数据流和转换,在本例中)的模型部分将数据可观察性领域的主要组件的观察结果与应用程序组件连接在一起。它位于所有三个领域的交汇处,即分析可观察性领域。

为了编码这一部分,我们必须生成关于使用哪些数据源(输入)来生成其他数据源(输出)的观察结果。对于摄取来说,股票CSV文件有助于生成月度资产CSV文件。再深入一层,摄取将来自所有列的所有值连接起来,然后将结果写入文件。在字段级别的血统是直接的,这意味着输入和输出列名相同,并且输出仅从每个输入数据源的此列中获取其值。

示例4-15展示了如何在报告应用程序中包含血统(OutputDataLineage)的生成,使用一个专门的类来定义输出与其输入的依赖关系。

class OutputDataLineage: 
   schema: Schema
   input_schemas: list[tuple[Schema, dict]]
   id: str

   def __init__(self, schema: Schema, 
                input_schemas_mapping: list[tuple[Schema, dict]]) -> None:
       self.schema = schema
       self.input_schemas_mapping = input_schemas_mapping
       self.id = hashlib.md5(",".join([self.schema.id]).encode("utf-8") \
                  + self.linearize().encode("utf-8")).hexdigest()

   def to_json(self):
       return {"id": self.id, "schema": self.schema.id, 
              "input_schemas_mapping": self.input_schemas_mapping}

示例4-16展示了OutputDataLineage的一部分静态函数,该函数将在字段级别将每个输入与输出进行映射,以以下方式连接它们:output_field -> 每个输入的输入字段列表。generate_direct_mapping助手使用了一个过于简单的启发式方法来绑定数据源,当字段名匹配时映射输出数据源。 这种策略在大多数实际用例中都会失败,特别是在需要更谨慎的跟踪以管理所有连接的聚合情况下。 通过使用下一章讨论的策略之一,您可以避免出现这种情况。

@staticmethod
   def generate_direct_mapping(output_schema: Schema, input_schemas: list[Schema]): 
       input_schemas_mapping = []
       output_schema_field_names = [f[0] for f in output_schema.fields]
       for schema in input_schemas:
           mapping = {}
           for field in schema.fields:
               if field[0] in output_schema_field_names:
                   mapping[field[0]] = [field[0]]
           if len(mapping):
               input_schemas_mapping.append((schema, mapping))
       return input_schemas_mapping

   define linearize(self):
       [...]
       return linearized

然后,在示例4-17中,我定义了与上下文(与应用程序相关)观测相关联的谱系执行ApplicationExecution。

class DataLineageExecution:
   lineage: OutputDataLineage
   application_execution: ApplicationExecution 
   start_time: str
   id: str

   def __init__(self, lineage: OutputDataLineage, 
                application_execution: ApplicationExecution) -> None:
       self.lineage = lineage
       self.application_execution = application_execution
       self.start_time = datetime.datetime.now().isoformat()
       self.id = hashlib.md5(
           ",".join([self.lineage.id, self.application_execution.id, 
                    self.start_time]).encode("utf-8")).hexdigest()

   def to_json(self):
       return {"id": self.id, "lineage": self.lineage.id, 
              "application_execution": self.application_execution.id,
               "start_time": self.start_time}

另外,第二章介绍的数据可观性核心模型将数据度量实体与谱系的执行相连接,以便在数据被使用或修改时提供可见性。因此,我们通过添加lineage_execution属性来调整DataMetrics模型,以表示这种关联,如示例4-18所示。

class DataMetrics:
   schema: Schema
   lineage_execution: DataLineageExecution 
   metrics: list[tuple[str, float]]
   id: str

   def __init__(self, metrics: list[tuple[str, float]], schema: Schema,
                lineage_execution: DataLineageExecution) -> None:
       self.metrics = metrics
       self.schema = schema
       self.lineage_execution = lineage_execution
       id_content = ",".join([self.schema.id, self.lineage_execution.id]
       self.id = hashlib.md5(id_content).encode("utf-8")).hexdigest()

   def to_json(self):
       from functools import reduce
       jfields = reduce(lambda x, y: dict(**x, **y), 
                        map(lambda f: {f[0]: f[1]}, self.metrics))
       return {"id": self.id, "metrics": jfields, "schema": self.schema.id,
               "lineage_execution": self.lineage_execution.id}

   @staticmethod
   def extract_metrics_from_df(df: pd.DataFrame):
       d = df.describe(include='all', datetime_is_numeric=True)
       import math
       import numbers
       metrics = {}
       filterf = lambda x: isinstance(x[1], numbers.Number) and not math.isnan(x[1])
       mapperf = lambda x: (field + "." + x[0], x[1])
       for field in d.columns[1:]:
           msd = dict(filter(filterf, map(mapperf, d[field].to_dict().items())))
           metrics.update(msd)
       return list(metrics.items())

现在所有的组件都准备好生成图4-5中显示的观测数据了。最终的数据摄取脚本可以在GitHub仓库中查看。

总结:可观察数据流水线

在继续分析观察如何帮助解决本节开头介绍的明显和悄悄失败之前,我们将迄今为止所做的工作重新用于报告应用程序的数据可观察性。请参见示例 4-19。

import ApplicationRepository.fetch_git_location
import ApplicationVersion.fetch_git_version
app = Application(Application.fetch_file_name())
app_repo = ApplicationRepository(fetch_git_location(), app)
git_user = User(ApplicationVersion.fetch_git_author())
app_version = ApplicationVersion(fetch_git_version(), git_user, app_repo)
current_user = User("Emanuele Lucchini")
app_exe = ApplicationExecution(app_version, current_user)

all_assets = pd.read_csv("data/monthly_assets.csv", parse_dates=['Date'])

apptech = all_assets[all_assets['Symbol'] == 'APCX']
buzzfeed = all_assets[all_assets['Symbol'] == 'BZFD']

buzzfeed['Intraday_Delta'] = buzzfeed['Adj Close'] - buzzfeed['Open']
apptech['Intraday_Delta'] = apptech['Adj Close'] - apptech['Open']

kept_values = ['Open', 'Adj Close', 'Intraday_Delta']

buzzfeed[kept_values].to_csv("data/report_buzzfeed.csv", index=False)
apptech[kept_values].to_csv("data/report_appTech.csv", index=False)

all_assets_ds = DataSource("data/monthly_assets.csv", "csv")
all_assets_sc = Schema(Schema.extract_fields_from_dataframe(all_assets), 
                 all_assets_ds)
buzzfeed_ds = DataSource("data/report_buzzfeed.csv", "csv")
buzzfeed_sc = Schema(Schema.extract_fields_from_dataframe(buzzfeed), buzzfeed_ds)
apptech_ds = DataSource("data/report_appTech.csv", "csv")
apptech_sc = Schema(Schema.extract_fields_from_dataframe(apptech), apptech_ds)
# First lineage
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
                                    OutputDataLineage.generate_direct_mapping
                                    (buzzfeed_sc, [all_assets_sc]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                    all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), buzzfeed_sc, 
               lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
                                    OutputDataLineage.generate_direct_mapping
                                    (apptech_sc, [all_assets_sc]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                    all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc, 
              lineage_apptech_exe)

通过以这种方式添加观察,我们使修改与我们在摄取应用程序中所做的类似。这种方法使我们能够建立习惯和抽象,比如一个框架,可以减少所需的更改数量 - 几乎是开发中的一种隐式规律。

在示例 4-19 中,请注意为输入生成的观察已移至末尾。我们为示例的简单性而做出了这种实现选择。一个好处是附加计算在末尾完成,而不会影响业务流程。一个不足之处在于,如果中间发生了故障,将不会发送关于数据源及其架构的观察。当然,通过对代码进行一些调整,可以避免这种情况。另外,对于这个低级API的介绍,我们必须添加一些模板代码以生成正确的信息,这可能听起来像是噪音,与业务代码成比例。但是,请记住,该脚本最初没有日志。一般来说,日志是零散添加的,以生成有关脚本自身行为的一些信息,这就是我们所做的,但是应用于数据。

还要记住以下几点: 我们按原样重用了 OutputDataLineage.generate_direct_mapping 助手来创建输出和输入之间的系谱映射。但是,它不起作用,因为我们从 monthly_assets.csv 文件中聚合了 Adj Close 和 Open 到新的 Intraday_Delta 列中。因为字段名称不相同,"直接" 启发法不会捕捉到这种依赖关系。 关于报告每次都再次报告 monthly_assets 的相同观察结果显示了警告消息。我们之所以这样做,是因为我们对每个输出编码了系谱。现在我们有两个系谱,每个都产生报告的 report_buzzfeed.csv 和 report_AppTech.csv 文件。因为输出重用与输入(经过筛选)相同的数据,所以我们必须报告每个输出的输入是什么样的,以避免它们看起来像是重复的。作为替代方案,我们可以重用观察结果或调整模型以解决此重复情况。您可以考虑以下替代选项:

  • 如果我们改变策略,每次访问数据时读取数据,而不是将其加载到内存中,那么如果在两次写操作之间更改了数据,则观察结果将不再相同。如果一个输出存在问题,我们更喜欢将输入的观察与此系谱同步。当考虑到每个写操作可能需要数小时而不是数秒时,出现这种情况的概率会增加。
  • 报告应用程序在每次运行时生成所有输出,但以后可以进行重构以更改此操作并进行参数化。例如,只生成一个输出,例如 BuzzFeed。因此,每个报告数据集都由独立运行生成。在这种情况下,观察结果已适当地表示这一点,因此我们无需调整逻辑。换句话说,将给定输入的观察结果发送与创建输出的次数一样经常,以表示现实,而不是尝试优化以减少可能看起来像是重复的数据。

让我们解决第一个问题,确保系谱能够表示数据源之间的真实连接。为了以一种简化的方式做到这一点,我们将更新示例 4-19 中引入的助手函数,其中现在包括一个参数,用于提供每个输入的非直接映射。稍后的章节将介绍处理此常见用例的更简单、高效、易于维护和准确的策略,例如使用猴子补丁。

@staticmethod
def generate_direct_mapping(output_schema: Schema, 
                            input_schemas: list[tuple[Schema, dict]]):
   input_schemas_mapping = []
   output_schema_field_names = [f[0] for f in output_schema.fields]
   for (schema, extra_mapping) in input_schemas:
       mapping = {}
       for field in schema.fields:
           if field[0] in output_schema_field_names:
               mapping[field[0]] = [field[0]]
       mapping.update(extra_mapping) 
       if len(mapping):
           input_schemas_mapping.append((schema, mapping))
   return input_schemas_mapping

示例 4-21 展示了报告应用程序的最终观察部分。

# First lineage
intraday_delta_mapping = {"Intraday_Delta": ['Adj Close', 'Open']}
a = (all_assets_sc, intraday_delta_mapping)
lineage_buzzfeed = OutputDataLineage(buzzfeed_sc,
                                    OutputDataLineage.generate_direct_mapping(
                                    buzzfeed_sc, [(all_assets_sc, 
                                    intraday_delta_mapping)]))
lineage_buzzfeed_exe = DataLineageExecution(lineage_buzzfeed, app_exe)
all_assets_ms_1 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   all_assets_sc, lineage_buzzfeed_exe)
buzzfeed_ms = DataMetrics(DataMetrics.extract_metrics_from_df(buzzfeed), 
               buzzfeed_sc, lineage_buzzfeed_exe)
# Second lineage
lineage_apptech = OutputDataLineage(apptech_sc,
                                   OutputDataLineage.generate_direct_mapping(
                                    apptech_sc, [(all_assets_sc, 
                                                 intraday_delta_mapping)]))
lineage_apptech_exe = DataLineageExecution(lineage_apptech, app_exe)
all_assets_ms_2 = DataMetrics(DataMetrics.extract_metrics_from_df(all_assets), 
                   all_assets_sc, lineage_apptech_exe)
apptech_ms = DataMetrics(DataMetrics.extract_metrics_from_df(apptech), apptech_sc, 
              lineage_apptech_exe)

利用数据观察来解决数据管道的故障问题

现在,我们可以部署、运行和监视我们的管道,使用它们每次运行时生成的观察结果。低级别的API需要相当多的额外工作和坚持才能实现。然而,我们对结果感到满意。在遇到生产问题时,花在这些任务上的每一分钟都会使我们受益百倍,以避免收入损失—遵循总质量成本法则,1-10-100—当问题发生时。

让我们从本节提到的可能出现的问题开始,首先是摄取应用程序的故障:

  1. 找不到输入文件 DataSource 观察结果每次运行都会发送。因此,当文件缺失时,它们将不会被发送。即使对于没有关于应用程序逻辑的任何了解的人来说,目前使用的文件已经缺失是很明显的。
  2. 读取时出现类型错误 Schema 观察结果被发送,并包含与其类型相关联的字段名称。因此,观察者可以清楚地知道所期望的类型,而无需查看应用程序或之前月份的文件。
  3. 由于缺少字段而导致的错误 与“读取时出现类型错误”相同的观察结果有助于观察者迅速识别出当前运行中缺少的字段。
  4. 文件系统错误 pandas库通常会抛出异常,异常通常提供导致错误的路径。对于观察者来说,用于标识问题的其余信息是使用的服务器。与DataSource相关的服务器观察结果可以立即提供关于与该路径相关的服务器的可见性。
  5. 内存错误 这个问题通常发生在数据突然增加的情况下。虽然有许多情况可以考虑,但它们大多数是根据DataMetrics观察结果直观地由观察者处理的,其中包含了行数、字段数或DataSource数量的模式。但是,可能需要在脚本结束之前更早地发送观察结果,例如在以下两种情况下: 一个输入文件的大小大于以前。该文件很容易被检测到,因为它没有DataMetrics可用。 输出已经大大增加,因为所有文件都已经增加。可以通过缺少输出的DataMetrics来检测大小差异。此外,输入的DataMetrics显示了行数的增加。
  6. 文件系统空间错误 这些错误最有可能发生在输出时进行写入,考虑到我们在此处处理数据可观测性情况。因此,与“内存错误”相同的信息会立即向观察者提供可见性,说明为什么可用空间不再足够以及哪些文件无法写入。
  7. 日期无法解析为日期 在这种情况下,模式观察结果的日期字段类型将从日期更改为其他类型,例如字符串或对象。
  8. 日期列不包含当前年/月的值 DataMetrics观察结果包括时间戳的最小值和最大值,从而立即提供了可用数据的执行时间与当前数据之间的差异。例如,假设时间戳的最大值是数据源读取的两天前,那么如果可接受的周期只有一天,那么数据可以被认为过时了。
  9. 日期列包含未来的值 这是容易的,因为与先前因为当前月份的值缺失而失败的情况相同的观察结果将为您提供此可见性。
  10. 符号类别更改 如果我们仅考虑数值型DataMetrics,我们可以通过增加输出文件中的类别数来快速识别此情况。其中一个或一些文件将不再一致,因为它们将引用不同的类别。

接下来,我们必须考虑报告应用程序可能会将摄取应用程序视为失败的情况,以及在适用的情况下,观察者可以使用的报告应用程序。这些情况包括:

  1. 月度汇总文件不可用 摄取应用程序没有发送DataSource观察结果或DataMetrics观察结果。
  2. 聚合使用了缺失的字段,如 Close 摄取应用程序发送的月度数据的模式缺少这些字段。
  3. 读写访问、大小和空间方面的错误 对于报告观察者,与摄取观察者相同的解决方案适用。没有跨团队或团队成员之间的信息偏见。
  4. APCX 和 ENFA 符号 摄取观察者报告的类别数已经发生了变化,为某些情况提供了线索。然而,我们可以扩展DataMetrics以报告非数值观察结果,并报告类别。
  5. Adj Close 或 Open 缺少值导致异常值 DataMetrics "null数"可以覆盖这种情况,因为当null数大于零时,Intraday_Delta的计算将返回NAs。
  6. 月度资产中的日期错误 与日期故障相关的摄取观察者使用的解决方案也适用于此处。例如,应用程序可以使用与当前报告月份的最小值和最大值相比来检查日期列的值。

现在,我们已经准备好处理各种数据是问题来源的情况。没有这些知识,这些情况将需要长时间的高压会议来理解它们,以及消耗几个小时甚至几天的调试工作,可能会变成几个月,因为我们无法访问生产中的数据。

到目前为止讨论的问题是我们知道可能会发生的问题。然而,许多其他问题可能会在整个管道中传播,我们对此知之甚少—未知的未知。例如,由于在CSV导出过程中引入了数据错误,某只股票的一个月前的值是不正确的。这个问题至少会发生在你的一个应用程序和其他类似的应用程序中。

由于这些未知的未知因素,数据观察不能仅限于仅覆盖预定义的情况,而是必须尽可能地报告尽可能多的观察结果—可能在计算资源和时间上有一些约束—以生成关于预期或未满足的情况的可见性。在这个示例中,月度股票价值的分布将有助于以后与其他月份进行比较,并且它们可能提供关于这些值是否相等或相似的提示。

使用低级别的日志记录具有完全灵活性,可以生成任何您可以生成为可见的内容。例如,自定义度量标准和关键绩效指标(KPI)。

所有作业并不相同;每家公司、项目和应用程序都有其自己的具体细节。您可能会控制特定的度量标准,无论它们与消耗的数据还是生成的数据有关。例如,对于一个表,这样的度量标准可以是项目数乘以每单位成本减去从Web服务获得的金额,count(items) * cost_per_unit。结果必须始终大于零。这可以很容易地添加到源代码中,但必须由工程师添加,因为这构成了与业务逻辑(和列的语义)相关的特定度量标准。

自定义观察的另一个原因是关键绩效指标(KPI)—这些是利益相关者请求的对底层业务重要的数字。KPI通常定期报告或根据需要计算,并在随机或固定的时间间隔内使用。然而,它们的含义非常强烈,利益相关者对其寄予了很高的期望,几乎没有时间等待修正。因此,您还必须创建有关KPI随时间如何发展的可见性,因为如果利益相关者对其产生疑虑,他们提问其正确性的时刻一开始,时间就开始流逝。为了增加响应性,您通常必须了解KPI如何发展,识别它们在发生之前如何变化,以及根据其历史值和血统了解它们变化的原因。

正如您在报告应用程序更新过程中所预见的,定义API—模型、编码和函数—不是每个应用程序的任务,而是必须在应用程序之间进行标准化和重用。标准化减少了每个应用程序的工作量。更重要的是,它使观察结果成为均匀的,独立于应用程序,以简化观察者在匹配参与其分析的其他应用程序的行为时的工作。

标准化还有助于跨应用程序重用实体,例如资产月度DataSource,这是摄取应用程序的输出和报告应用程序的输入。通过观察结果的均匀表示,可以通过跨应用程序重用实体来巩固整个管道的状态。

支持数据可观测性的架构的一部分必须包括创建一个外部系统,以系统地聚合观察结果,以构建全局视图。通过拥有依赖于这个聚合视图并对其采取行动的观察者,系统可以发展到执行一些当前由观察者执行的操作,这就是机器学习发挥作用的地方。

总结

本章对数据可观察性在数据源方面的全面探讨及其在提高数据质量和运营卓越方面的重要性。我们深入探讨了在数据应用程序代码中生成数据观察结果的概念,强调了在应用程序的各个组件中合并观察结果生成代码的重要性。这些组件包括应用程序本身、正在使用的数据、它们之间的关系以及它们包含的内容。

此外,我们讨论了在低级别创建数据可观察性Python API的概念,该API为开发人员提供了将数据观察功能无缝集成到其应用程序中的强大工具集。通过这个API,从业者可以生成数据观察结果,跟踪数据流动,并确保其数据的可靠性和准确性。

为了加强这些概念,我们提供了一个完全可工作的示例,展示了将一个非数据可观察性的Python数据管道转化为稳健的、以数据可观察性为驱动的管道。通过利用专用的数据可观察性Python API,我们演示了如何生成、捕获和利用数据观察结果,以增强可见性、识别问题并推动持续改进。

随着我们继续前进,本章探讨的原则和策略成为将数据可观察性纳入数据应用程序的基础。通过采用这些实践,组织可以确保其数据管道稳健、可靠,并能够以高度信任的方式提供有价值的见解。

尽管低级别日志记录具有超高度可自定义性和灵活性,但其采用可能会受到所需的初始工作的阻碍。这个论点也适用于测试的采用。因此,在这个层面上简化使用复杂性至关重要。此外,我们需要探索在促进团队和个人之间广泛采用数据可观察性的同时,补充低级别日志记录的替代方法。接下来的章节将深入探讨这个主题,从探讨基于事件的系统开始。

阅读量:648

点赞量:0

收藏量:0