Spark in Action第二版翻译 第十四章 使用用户自定义的函数来扩展转换

发布于 2021年10月02日

无论你是耐心地阅读了本书的前13章,还是用直升机式的阅读方法从一章跳到另一章,你肯定会认为Spark很好,但是......Spark是可扩展的吗?你可能会问,"我怎样才能把我现有的库带入到这个组合中?我是否必须只使用DataFrame API和Spark SQL来实现我想要的所有转换?"

从本章的标题中,你可以想象到第一个问题的答案是肯定的:Spark是可扩展的。本章的其余部分通过教你如何使用用户定义的函数(UDF)来完成这些任务来回答其他问题。让我们来看看这一章阐述了什么。

你首先会看到Spark是如何通过涉及UDF的架构以及UDF对你的部署的影响来实现可扩展的。

然后,在第14.2节,你将深入到使用UDF来解决一个问题:寻找南都柏林(爱尔兰)的图书馆何时开放。你将注册、调用和实现这个UDF。扩展Spark的一种方法是使用你现有的库,以UDF为手段,就像水管工把你的热水器和你的淋浴器连接起来一样。本节还包含了一个关于做一个好水管工的提醒,我相信你也是如此(当然,作为一个软件水管工,我们将跳过你可能需要水管工胶带的部分)。

UDFs是执行数据质量规则的绝佳选择,无论你是自己建立规则还是使用外部资源,如库。在第14.3节,我将教你如何使用UDFs来提高数据质量。

最后,在第14.4节中,你将学习与UDFs相关的约束条件,虽然不是很多,而且约束条件也不是很激烈。但是,在你设计和实现这些新的用户定义函数之前,牢记这些约束条件总是好的。

LAB 本章的例子可在GitHub上找到:https://github.com/jgperrin/net.jgp.books.spark.ch14。

扩展Apache Spark

在本节中,让我们回顾一下扩展Spark的必要性以及这些扩展是如何实现的。当然,和所有的事情一样,你会看一下扩展带来的一些缺点。本节在深入研究代码之前提供了一点理论。

Apache Spark提供了相当丰富的函数集,我们在前面的章节中几乎没有触及这些函数的表面。然而,正如你所想象的,这些函数并没有涵盖所有可能的使用情况。因此,Spark有一个规定,可以添加你自己的函数:用户定义的函数。

UDFs在你的数据框架的列上工作。一个自定义函数仍然是一个函数:它有参数和一个返回类型。它们可以接受0到22个参数,并且总是返回一个参数。返回值将是存储在列中的值。

图14.1说明了从Spark调用UDF的基本机制。

你可能记得的第6章,代码在工作节点上执行。这意味着你的UDF代码必须是可序列化的(以便被传输到节点上)。Spark为你处理了传输部分。如果你有一个需要外部库的UDF,你必须确保它们被部署在工作节点上,如图14.2所示。

正如你在第4章中所学到的,在你调用一个动作之前,Spark在一个有向无环图(DAG)中堆叠了你所有的转换。在你调用Action的时候,Spark要求Catalyst(Spark的一个内部组件)在执行任务之前优化DAG。

由于UDF的内部结构对Catalyst来说是不可见的,所以UDF被视为优化器的一个黑匣子。Spark将无法优化UDF。同样,Spark也无法分析UDF被调用的上下文;如果你在之前或之后调用了数据框架API,Catalyst就无法优化整个转换过程。

当你考虑性能的时候,要谨慎对待这个问题。我建议,在可能的情况下,让你的UDF在你的转换的开始或结束。

注册和调用一个UDF

在上一节中,你了解了什么是UDF。现在,让我们来看看一个典型的用例 涉及代码的典型案例。在这一节中,我描述了要解决的问题并展示了数据。

然后你将使用一个UDF来实现解决方案。要做到这一点,你要注册该UDF。 通过数据框架API使用它,通过Spark SQL使用它,实现UDF,并编写 服务代码。

LAB 这是实验室#200,OpenedLibrariesApp,来自net.jgp.books.spark.ch14.lab200_library_open软件包。

图书馆的数据集来自于智能都柏林的开放数据门户网站;更确切地说,南都柏林郡议会。你可以从以下网站下载该数据集:https://data .smartdublin.ie/dataset/libraries。

下面的列表显示了所需的输出:一个带有日期的图书馆列表和图书馆是否在该日开放。

清单14.2和14.3显示了你将在本方案中使用的两个数据集的摘录。

情景中使用的两个数据集。清单14.2显示了南都柏林图书馆的开放时间。每一天 都有自己的栏目,从周一的开放时间到周六的开放时间(南都柏林人没有机会接触到图书馆)南都柏林人在周日没有机会接触文化)) 这一栏的数值可以是这样的

09:45-20:00,14:00-17:00和18:00-20:00,或者10:00-17:00(7月和8月为16:00)--午餐时间不开放 12:00-17:00。8月)--午餐时间12:30-13:00休息。正如你所看到的,确定一个图书馆是否图书馆是否开放并不是一个超级明显的操作。

时间戳数据集不是来自一个文件,而是以编程方式建立的,如下表所示 在下面的列表中说明了。createDataframe()方法直接返回一个要分析的时间戳的DataFrame。

现在你已经有了时间戳数据集,你可以准备进入应用程序了。图14.3显示了本实验的过程。

在Spark注册UDF

当你想使用一个UDF的时候,第一个操作就是注册它。这就是本小节的全部内容:通知Spark你要使用一个UDF。所以,让我们开始吧。

图14.4说明了注册的过程,具体如下。

1 你首先需要通过从Spark会话中调用udf()方法来访问UDF注册函数。方法来访问UDF注册函数。

2 然后,你通过使用函数的名称、实现UDF的类的实例、以及注册()来注册。实现UDF的类的实例,以及返回类型。调用UDF后,任何需要创建的 需要创建的新列都有这个数据类型。该名称应该是 是一个有效的Java方法名称。

你可以在附录L中找到有效数据类型的列表,函数的参数。

UDFs没有正式的参考文档。然而,这些Javadoc链接

提供了关于UDF的更多细节。

在接下来的列表中,你将看到会话的创建(在这个阶段有点像一个已知的东西 在这个阶段)和UDF的注册。像往常一样,我把所有的导入都留在了 代码中,以避免任何混淆。

在DataFrame API中使用UDF

在本小节中,你将在DataFrame API中使用UDF。你将首先加载和清理要使用的数据,然后开始进行一些转换。在你的数据上调用UDF 数据调用UDF是这些转换的一部分。

现在Spark已经知道了你的新UDF,让我们来接收数据并执行转换,如清单14.5所示。关于调用UDF的部分是用粗体字表示的。

下一步是摄取库中的数据集。在摄取数据集的过程中,在同一个操作中,你可以删除很多你不打算使用的列,包括 城镇、电话或邮政编码(就是美国以外的邮政编码)。你将 通过createDataframe()方法创建第二个数据集,如清单14.3所示。

一旦你有了两个数据集,你可以交叉连接(也称为笛卡尔连接 )它们。你在第12章中学习了连接,附录M中详细介绍了连接的方法。产生的数据集将包含与所有时间戳相关的所有库。这里有7个库和3个时间戳;这意味着新的DataFrame,由连接产生的含有21条记录(如21=7×3)。是的,笛卡尔连接可以变得很大。最后,你可以创建一个名为open的列,使用withColumn()方法和 callUDF()静态函数创建一个名为open的列。

这个UDF函数需要八个参数:前七列是周一至周日的营业时间 时间,最后一个是时间戳。

请注意,图书馆的数据集没有星期天的时间,但UDF处理的是一周中的任何一天。因此,你可以用一个特定的(或字面的)值来使用lit()。将值传递给函数。这对保持你的函数尽可能的通用是很重要的。尽可能的通用。

用SQL操纵UDFs 在上一节中,你用DataFrame API使用了一个UDF。现在,在本节中,你将使用SQL来操作一个UDF。这个实验室是# 210,几乎是#200实验室的分叉。它使用了很多相同的资源。

这里是SQL语句本身,没有任何双引号或格式化,这样你就可以 可以更好地理解你要做什么。

```
SELECT 
Council_ID, Name, date, 
isOpen( 
Opening_Hours_Monday, Opening_Hours_Tuesday, Opening_Hours_Wednesday, 
Opening_Hours_Thursday, Opening_Hours_Friday, Opening_Hours_Saturday, 
'closed', date) AS open 
FROM libraries
```

请注意,你不必对周日的时间使用lit(),因为你必须在实验室中使用 # 200中使用DataFrame API。在SQL中,你只需传递值(作为一个字面意思)。下面的列表显示了如何在代码中的SQL语句中使用你的函数,以便 利用UDF函数。

实施UDF

你刚刚注册并开始使用UDF;你甚至开始使用UDF与DataFrame API和SQL。但是,你还没有实现它。在本小节中,你将建立一个实现UDF的类。

正如你在Spark的会话中注册函数时看到的,UDF是作为一个类来实现的。该类实现了一个UDF基类,如清单14.7中所示。

当你构建一个UDF时,你写一个实现org.apache.spark.sql.api.java.UDF0到 org.apache.spark.sql.api.java.UDF22的类,这取决于你需要的参数数量。

根据你需要的参数的数量。在这个实验室中,你将需要八个参数。

一周七天的时间和时间戳。记住,营业时间 小时只是一个包含开门和关门时间的字符串。

由于Java是强类型的,你需要指定每个参数的类型和 返回类型,如图14.5所示。

你需要实现的关键方法是call(),它接收参数并返回声明的类型。在这个实验室中,这意味着七个字符串类型的参数,一个时间戳类型的参数。参数是时间戳类型,而返回类型是布尔型。

你可能还记得,在第九章中,我谈到了做一个好水管工。一个好的水管工要确保处理代码与 "胶水"(或水管代码是不同的代码) 因此,检查库(或任何业务)是否开放的实际代码是开放的,被隔离在它自己的服务类中。作为这种设计的一个直接后果,如果变化,你的业务逻辑代码就会被屏蔽掉。

此外,你可以在其他地方重复使用这个服务,并能够更容易地使用单元测试。这就是为什么这个UDF的业务逻辑不在UDF中,而在IsOpenService类中(以及清单14.8中)。这种管道范式是更广泛的关注点分离(SoC)设计原则的一部分。

编写服务本身

现在,在完成了所有的管道工程后,你仍然需要编写服务代码本身。

希望你能从你当前组织的现有库中重用服务代码(如果没有,现在是开始这样做的好时机)。本小节更多的是属于

而不是 "必须阅读 "的类别(尽管这种分类是主观的)。

IsOpenService服务是一个基本解析器,支持以下语法:

  • 09:45-20:00

  • 14:00-17:00 and 18:00-20:00

  • closed

请记住,爱尔兰的时间是以24小时为基础的,在美国则称为军事时间。IsOpenService服务不支持更复杂的语法,比如10:00-17:00(7月和8月16:00)--12:30-13:0关闭午餐。

IsOpenService服务在列表14.8中描述。

isOpen()方法把所有的开放时间和时间戳作为参数。

然后该方法做了以下工作。

1 从时间戳中找到一周中的哪一天

2 只关注该日的开放时间

3 检查该日是否为非营业日

4 试图解析字符串以检查时间是否在范围之内

对于那些时间和日期的操作,我依靠的是Java的日历对象。你可以如果你愿意,也可以使用LocalDate。

你现在已经建立了你的服务,将其连接到Apache Spark,并通过DataFrame API调用。你也可以通过SQL调用你的UDF。

使用UDFs来确保高水平的数据质量

我最喜欢的用户定义函数的用例之一是实现更好的数据质量。

在本节中,你将了解你的分析之旅的这一重要步骤,即使用 Apache Spark进行分析的重要步骤。

从第1章开始,你可能记得图14.6。在你开始转换或任何形式的分析之前,包括机器学习(ML)和人工智能(AI),你需要确保你的原始数据通过数据质量的净化过程。

实现数据质量的方式有很多,包括脚本和外部应用程序。图14.7说明了一个聚合过程,其中数据质量规则是在Spark外部执行的,图14.8显示了Apache Spark内部的流程。

当数据质量过程是在Spark外部进行时,你需要管理文件和 当你与他们互动时,他们的潜在问题是窃听。在图14.7的例子中 图14.7中的例子,你将需要45GB的存储空间(20+3+19+3),只是为了在摄取数据前准备好 摄取前的数据。你知道你什么时候可以删除这些文件吗?出于安全考虑,你知道谁可以访问这些文件吗?

当你先摄入数据时,如图14.8,你仍然可以使用外部数据质量流程。你也可以通过UDF拥有你的数据质量(DQ)流程,这样做有很大的好处。

  • 避免了潜在的复杂和繁重的文件生命周期(复制、删除、备份和额外的安全)。

  • 使用并行化/分布式计算,因为数据质量流程将在每个节点上运行

  • 重复使用现有的Java(或Scala)脚本和库,你可能已经在你的组织中拥有。

考虑到UDFs的约束

在前面的章节中,你了解了Spark的用户定义函数的好处和用途。然而,UDF并不是每个问题的完美解决方案,所以让我们

来看看UDF最重要的约束条件。

  • 可序列化-实现函数的类本身必须是可序列化的。

  • 一些Java工件,如静态变量,是不可序列化的。

  • 在工作节点上执行--函数本身在工作节点上执行,这意味着函数运行所需的资源、环境和依赖关系必须位于工作节点上。举个例子,如果你的函数对照数据库检查数值,工作节点必须能够访问这个数据库。如果他们预装了一个值表来比较,每个节点都会这样做;这意味着如果你有10,000个工作节点,就会有10,000个连接到数据库。

  • 优化器的黑匣子-催化剂是关键组件,它对DAG进行优化。催化剂不知道你的函数在做什么。

  • 没有多态性(特别是方法/函数重载)-你不能有两个具有相同名称和不同签名的函数,即使其返回类型相同。类型是一样的。在源代码库中,你可以学习实验# 910、#911和#912,我想在其中添加两个字符串或两个整数,但它们都失败了 因为方法的签名不匹配。

  • 一个UDF不能有超过22个参数。然而,我们可以将一个列到一个UDF。仓库中的实验#920说明了这一点。

摘要

  • 你可以通过使用UDFs来扩展Spark的函数集。

  • Spark在工作节点上执行UDFs。

  • 要使用一个UDF,需要用一个唯一的名字、一个实现UDF的类的实例和一个返回类型来注册你的UDF。

  • 一个UDF可以有0到22个参数。

  • 你可以通过DataFrame API的callUDF()方法或直接在Spark SQL中调用一个UDF。

  • 你可以通过org.apache.spark.sql.api.java包中的一个实现接口的Java类来实现一个UDF,该类基于参数的数量,从UDF0到UDF22。

  • 一个好的做法是避免在UDF本身有任何业务逻辑代码,把实现留在服务中:通过屏蔽服务代码,把服务和Spark代码之间的接口留在UDF中,成为一个好水管工。

  • UDF是在Spark中实现数据质量的好方法,因为通过使用UDF,你可以避免操作文件,使用分布式计算,并有可能重复使用现有的库和工具。

  • 一个UDF的实现必须是可序列化的。

  • UDF对于Catalyst(Spark的优化器)来说是一个黑盒子。

  • 你不能将多态性应用于UDFs。



评论