在本章中,你将学习如何使用数据框架。你将了解到数据框架在Spark应用中是如此重要,因为它通过一个模式包含类型化的数据,并提供了一个强大的API。
正如你在前面的章节中看到的,Spark是一个了不起的分布式分析引擎。维基百科将操作系统(OS)定义为 "管理计算机硬件[和]软件资源的系统软件,并为计算机程序提供通用服务"。在第1章中,我甚至将Spark定性为操作系统,因为它提供了构建应用程序和管理资源所需的所有服务。要以编程的方式使用Spark,你需要了解它的一些关键API。为了执行分析和数据操作,Spark需要存储,包括逻辑存储(在应用层面)和物理存储(在硬件层面)。
在逻辑层面,最喜欢的存储容器是DataFrame,这是一种类似于关系型数据库世界中表的数据结构。在本章中,你将深入了解DataFrame的结构,并学习如何通过其API使用DataFrame。
变换是你对数据进行的操作,例如从日期中提取年份,合并两个字段,对数据进行归一化等等。在本章中,您将学习如何使用DataFrame的特定函数来执行转换,以及直接附加到DataFrame API 的方法。你将通过使用类似 SQL union 的操作将两个DataFrame合并成一个。你还会看到数据集和数据框架之间的区别,以及如何从一个数据集到另一个DataFrame。
最后,你会看到弹性分布式数据集(RDD),它是Spark中的第一代存储。DataFrame是建立在RDD概念之上的,你可能会在讨论和项目中遇到RDD。
本章中的例子分为实验室。在本章的最后,您将在两个DataFrame中摄取两个文件,修改它们的模式使其匹配,并将结果联合起来。当你完成这些操作时,你将看到Spark是如何处理存储的。在不同的步骤中,您将检查DataFrame。
LAB 本章的实例可以在GitHub上找到,网址是:https://github.com/jgperrin/net.jgp.books.spark.ch03
DataFrame在Spark中的重要作用
在本节中,你将学习什么是DataFrame以及它是如何组织的。你还将学习到不可更改性。
一个DataFrame既是一个数据结构,也是一个API,如图3.1所示。
Spark的DataFrame API在Spark SQL、Spark Streaming、MLlib(用于机器学习)和GraphX中使用,以在Spark中操作基于图的数据结构。使用这个统一的API可以极大地简化对这些技术的访问。你将不必为每个子库学习一个API。
将DataFrame描述为雄伟的DataFrame可能很奇怪,但这个限定词非常适合它。就像雄伟的艺术品吸引着人们的好奇心,雄伟的橡树主宰着森林,雄伟的城墙保护着城堡一样,DataFrame在Spark的世界里也是雄伟的。
DataFrame的组织
在本节中,你将学习DataFrame是如何组织数据的。一个DataFrame是一组记录,组织成命名的列。它相当于关系型数据库中的一个表或Java中的一个结果集。图3.2展示了一个DataFrame。
DataFrame可以从广泛的来源中构建,如文件、数据库或自定义数据源。数据框的关键概念是它的API,它在Java、Python、Scala和R中都可以使用,在Java中,数据框由行的数据集来表示。Dataset<Row>。
根据Spark目前的策略,存储可以是在内存中,也可以是在磁盘上,但它会尽可能多地使用内存。
Dataframes以StructType的形式包含了模式,它可以用于反省。Dataframes还包括一个printSchema()方法,可以更快速地调试你的数据框。足够的理论--让我们来实践吧。
不变不是一句脏话
DataFrame,以及数据集和RDD(在3.4节中讨论),被认为是不可改变的存储。不可变性被定义为不可改变。当应用于一个对象时,它意味着它的状态在创建后不能被修改。
我认为这个术语是反直觉的。当我刚开始使用Spark时,我很难接受这个概念。让我们使用这个为数据处理而设计的出色技术 但数据是不可改变的。你希望我处理数据,但它不能改变?
图3.3给出了一个解释:在最初的状态下,数据是不可变的;然后你开始修改它,::但Spark只存储了你转换的步骤,而不是转换后数据的每一步。::让我重新表述一下。Spark以不可改变的方式,存储数据的初始状态,然后保留配方(转换的列表)。中间的数据不被存储。第4章会更深入地挖掘变换的内容。
当你增加节点时,原因就变得容易理解了。图3.3展示的是一个典型的Spark流程,只有一个节点,而图3.4展示的是更多节点。
当你以分布式的方式思考时,不可更改性就变得非常重要。在存储方面,你有两种选择。
* 你存储数据,每一次修改都会在每个节点上立即完成,就像在关系型数据库中一样。
* 你在各个节点上保持数据同步,只与各个节点共享改造配方。
Spark使用的是第二种解决方案,因为在每个节点上同步一个recipe比同步所有数据要快。第4章介绍了通过Catalyst进行优化。Catalyst是Spark处理中负责优化的酷小孩。不变性和配方是这个优化引擎的基石。
虽然不可变性被Spark出色地用作优化数据处理的基础,但在你开发应用时,你将不必考虑太多。Spark,就像任何一个好的操作系统一样,将为你处理资源。
通过实例使用DataFrame
没有什么比一个小例子更适合开始了。你摄取了第1章和第2章的文件。但之后会发生什么呢?
在本节中,你将执行两个简单的摄取。然后,你将研究它们的模式和存储,以便了解DataFrame在应用程序中使用时的行为。第一个摄取是北卡罗来纳州维克县的餐馆列表。第二个数据集由北卡罗来纳州达勒姆县的餐馆组成。然后,您将转换数据集,以便您可以通过联合来合并它们。
这些都是你作为Spark开发者将要执行的关键操作,所以了解它们背后的原理将为你提供所需的基础。图3.5说明了这个过程。
联合操作后的目标(也是最终)DataFrame,在两次转换后需要有相同的模式,如图3.6所示。
一个简单的CSV摄取后的DataFrame
在本节中,您将首先摄取数据,然后您将查看DataFrame中的数据,以了解模式。这个过程是你理解Spark工作方式的重要一步。
这个例子的目标是规范化一个数据集,使它符合特定的标准,就像你刚才在图3.6中看到的那样。我打赌你喜欢去餐馆。也许不是每天都去,也许不是每一种,但你们每个人都有偏好:食物的类型、离家的距离、公司、噪音水平等等。Yelp或OpenTable等网站有丰富的数据集,但我们还是来探讨一些开放的数据。图3.7说明了这个例子中的过程。
你的第一个数据集来自北卡罗来纳州的Wake县,网址是http://mng.bz/5AM7。它包含了该县的餐馆列表。这些数据可以直接从 http://mng.bz/Jz2P 下载。
现在您将进行数据框架的摄取和转换,使其与输出相匹配(通过重命名和丢弃列);然后您将对数据分区进行范围划分。当你在摄取和转换数据时,你还会统计记录的数量。图3.8说明了映射的情况。
LAB 你可以从GitHub上下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03中的实验室#200.lab200_ingestion_schema_manipulation。
您要实现的可视化结果是一个餐厅列表,与图3.8中定义的映射相匹配。请注意,以下输出已被修改,以适合本页面:
因为分布在多行的记录有点难读,我把记录作为截图添加到图3.9中。
要显示这些数据集(也就是DataFrame),你的代码会像下面这样:
到目前为止,这种摄取与第1章摄取简单的书单,以及第2章摄取作者名单的方法类似。摄取的方式总是一样的,第7、8、9章提供了进一步的细节。让我们更深入地了解一下数据框架。你可以通过使用printSchema()将模式打印到标准输出(stdout)。结果如下。
附录H提供了更多关于类型的细节。您的简单调用如下。
df.printSchema();
有一个简单的方法来计算你的数据框架中的记录数。说你想显示这个。
We have 3440 records.
你只需使用以下方法:
System.out.println("We have " + df.count() + " records.");
本节的目标是让你合并两个DataFrame,就像你可以执行两个表的SQL联合一样。为了使联合有效,你需要在两个DataFrame中使用类似命名的列。为了达到这个目的,你可以很容易地想象,你的第一个数据集的模式也被修改了。这就是它的样子。
让我们来走一遍转换的过程。注意方法链的强烈使用。正如第2章所定义的那样,Java API可以使用方法链,如SparkSession .builder().appName(...).master(...).getOrCreate(),而不是在每一步都创建一个对象并将其传递给下一个操作。
你将使用数据框架的四个方法和两个静态函数。你可能很熟悉静态函数:它们是那些被 "分组 "在一个类中的函数,但不需要实例化该类。
方法很容易理解:它们被附加到对象本身。当你直接对列中的值进行操作时,静态函数是很有用的。在阅读本书的过程中,你会看到越来越多地使用这些静态函数,在第13章和附录G中会有更详细的描述。
如果你没有找到一个能完成你想要的功能的函数(例如,一个特定的转换形成或对你可能拥有的现有库的调用),你可以编写自己的函数。这些函数被称为用户定义函数(UDF),你将在第16章学习。
现在让我们看看你需要的方法和函数。
* withColumn() 方法-从一个表达式或一个列中创建一个新的列。
* withColumnRenamed() 方法-重命名一个列。
* withColumnRenamed()方法--从列名中获取一个列。有些方法会使用列名作为参数,有些方法需要一个Column对象。
* drop()方法--从数据框架中删除一列。这个方法接受一个列对象的实例或列名。
* lit()函数-创建一个带有值的列;字面意思是一个文字值。
* concat()函数--将一组列中的值进行汇编。
你可以看看代码了:
你可能需要为每条记录提供一个唯一的标识符。你可以调用这个列id,并通过连接以下内容来建立它:
1 国家
2 一个下划线(_)
3 县
4 一个下划线(_)
5 数据集中的标识符
代码是这样的:
df = df.withColumn("id", concat( df.col("state"), lit("_"), df.col("county"), lit("_"), df.col("datasetId")));
最后,你可以显示五条记录并打印模式:
System.out.println("*** Dataframe transformed");
df.show(5);
df.printSchema();
数据存储在分区中
现在你已经加载了数据,你可以看到数据的存储位置。这将向你展示Spark内部是如何存储数据的。数据不是物理地存储在数据框架中,而是存储在分区中,如图3.1和简化的图3.10所示。
分区不能直接从数据框架中访问,您需要通过RDDs查看分区。你将在稍后的 3.4 节中了解更多关于 RDDs 的信息。
分区会被创建,数据会根据你的基础设施(节点数量和数据集的大小)自动分配到每个分区。由于数据集的大小和我使用的笔记本电脑,在这个方案中只使用一个分区。你可以通过下面的代码找出你有多少个分区。
你可以通过使用repartition()方法对DataFrame进行重新分区,以使用四个分区。重新分区可以提高性能。
df = df.repartition(4);
System.out.println("Partition count after repartition: " +
df.rdd().partitions().length);
挖掘模式
在上一节中,你学习了通过使用printSchema()来访问模式。了解数据的结构,特别是Spark如何看待数据是很重要的。你可以通过调用schema()方法了解更多关于模式的细节。
查看net.jgp.books.spark.ch03.lab210_schema _introspection包中的SchemaIntrospectionApp,可以了解schema()使用的细节。为了简化阅读,我把下一个lab的输出-放限制在每种情况下的前三个字段。
比如说你想输出以下内容。
*** Schema as a tree:
root
|-- OBJECTID: string (nullable = true)
|-- datasetId: string (nullable = true)
|-- name: string (nullable = true)
...
你可以像之前那样使用数据框架的printSchema()方法,或者使用StructType的printTreeString()方法。
你也可以将模式显示为一个简单的字符串。
*** Schema as string:
StructField(OBJECTID,StringType,true)
StructField(datasetId,StringType,true) StructField(name,StringType,true)...
要做到这一点,你使用以下代码。
而且你甚至可以将模式显示为JSON结构。
你可以使用以下代码。
高级模式操作是可能的,你将在第17章看到。
一个JSON摄取后的DataFrame
JSON文件可能比CSV更复杂一些,因为它们的嵌套结构。你要做的实验室与之前类似,但这次餐厅数据的来源是一个JSON文件。本节主要介绍与之前实验室的不同之处,并假设你已经阅读过。
使用Spark,你将读取一个JSON文件,其中包含的餐厅数据与3.2.1节中的数据集结构相似。你将对摄入的数据进行转换,以匹配之前数据集的转换结构。你这样做是为了让你可以通过联合过程合并它们。图3.11说明了这个过程的这一部分。
您的第二个数据集来自北卡罗来纳州的另一个县,达勒姆。达勒姆县是威克的邻居,其数据集可以在https://live-durhamnc .opendata.arcgis.com/ 找到。
LAB 你可以从GitHub上下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03中的实验室#220.lab220_json_ingestion_schema_manipulation。
因为JSON比CSV更难可视化,所以接下来的列表显示了一个只有两家餐厅的数据集节选。JSON绝对更啰嗦,不是吗?我删除了第二条记录的一些字段。
和CSV数据集一样,让我们来走一遍JSON转换。第一部分是JSON摄取,将产生以下内容(以及图3.12)。
DataFrame包含嵌套的字段和数组。使用show()方法是有用的,但结果不是很可读。模式会给你带来更多的信息。
当然,这个模式树的结构与清单3.1中的JSON文档的结构类似。而且这个结构现在看起来肯定比CSV文件更像一棵树。制作这个的代码也与摄取和转换CSV数据集的代码类似。
SparkSession spark = SparkSession.builder() .appName("Restaurants in Durham County, NC") .master("local")
.getOrCreate();
Dataset<Row> df = spark.read().format("json").load("data/Restaurants_in_Durham_County_NC.json");
System.out.println("*** Right after ingestion");
df.show(5);
df.printSchema();
一旦数据在DataFrame中,操作数据的API是一样的。你可以开始转换DataFrame了。你的目标结构是扁平的,所以映射(如图 3.13 所示)必须包含嵌套字段。
下面是你将产生的内容(图3.14以截图形式显示了DataFrame的内容)。
其嵌套结构的模式如下:
要访问结构中的字段,可以在路径中使用点(.)符号。要访问一个数组中的元素,可以使用getItem()方法。下面是实际操作的代码。
就像你创建了所有的字段和列一样,创建id字段的操作和你对CSV文件的操作是一样的:
df = df.withColumn("id", concat(df.col("state"), lit("_"),
df.col("county"), lit("_"),
df.col("datasetId")));
System.out.println("*** Dataframe transformed");
df.show(5);
df.printSchema();
最后,看分区也是同等的,就像你的CSV文件一样。
这里是代码:
System.out.println("*** Looking at partitions");
Partition[] partitions = df.rdd().partitions();
int partitionCount = partitions.length;
System.out.println("Partition count before repartition: " + partitionCount);
df = df.repartition(4);
System.out.println("Partition count after repartition: " + df.rdd().partitions().length);
现在你有了两个数据框,具有相同的核心列集。下一步是将它们组合起来。
合并两个数据帧
在本节中,您将学习如何在类似于SQL的联合中结合两个数据集,以建立一个更大的数据集。这将允许你对更多的数据点进行分析。
在上一节中,你摄取了两个数据集,对它们进行了转换,并对它们进行了分析。与关系数据库中的表一样,你可以在它们之间进行很多操作:加入它们、组合它们等等。
现在,你要把两个数据集合并起来,这样以后就可以对合并后的数据集进行分析。图3.15说明了这个过程的细节。
LAB 你可以从GitHub上下载代码:https://github.com/jgperrin/net.jgp.books.spark.ch03。这是包net.jgp.books.spark.ch03.lab230_dataframe_union中的实验室#230。
你当然可以想象,你可以重用你为摄取和转换所写的大部分代码。然而,要执行联合,你必须确保模式是严格相同的。否则,Spark将无法执行联合。
图 3.16 展示了你要做的映射。
您的应用程序的最终输出如下(图3.17为完整的截图)。
与图3.17对应的模式如下。
我们先来看看代码。导入是一样的。为了更简单一些,SparkSession的实例是一个私有成员,在start()方法中初始化。代码的其余部分被隔离在三个方法中。
* buildWakeRestaurantsDataframe() 构建包含Wake县餐馆的数据框架。
* buildDurhamRestaurantsDataframe()建立包含Durham县餐馆的数据框。
* combineDataframes()通过使用类似SQL的联合来合并两个数据框。现在,不用担心生成的数据帧的内存使用问题。在第4章,你会看到数据帧是自优化的。
我们来分析一下这段代码:
这是最简单的部分,对吧?让我们来分析一下这些方法,首先是buildWakeRestaurantsDataframe(),它从CSV文件中读取数据集。这个你应该很熟悉,因为你之前在3.2.1节看到过这个。
现在你已经准备好处理第二个数据集了。
请注意,当你放弃一个父列时,所有的嵌套列也会被放弃。字段和几何字段下的嵌套列被删除是因为你删除了父列。因此,当你放弃字段列时,所有的子字段如风险、座位、污水处理等都会同时被放弃。
现在,你有两个具有相同列数的数据框。因此,你可以在 combineDataframes() 方法中把它们联合起来。在类似于SQL的联合中,有两种方法可以将两个数据框组合起来:你可以使用union()或unionByName()方法。
union()方法不关心列的名称,只关心它们的顺序。这个方法总是会把第一个数据框中的第1列和第二个数据框中的第1列联合起来,然后移动到第2列,再移动到第3列,而不管它们的名字是什么。在经过几次转换操作(在这些操作中,你创建了新的列,重命名它们,转储它们,或者合并它们)之后,可能很难记住列的顺序是否正确。如果字段不匹配,在最坏的情况下,你可能会有不一致的数据,最好的情况是程序停止。另一方面,unionByName()通过名称来匹配列,这样更安全。
这两种方法都要求你在数据集的两边有相同数量的列。下面的代码显示了联合操作,并查看了结果的分区。
你可以联合更多的数据集,但不能同时联合。
当你在一个数据框架中加载一个小的(通常是128MB以下)数据集时,Spark会将只创建一个分区。然而,在这种情况下,Spark为基于CSV的数据集创建一个分区,为基于JSON的数据集创建一个分区。在两个不同的数据框中的两个数据集会导致至少两个分区(每个数据集至少一个)。连接它们将创建一个独特的数据帧,但它将依赖于两个原始分区(或更多)。你可以尝试通过玩repartition()来修改这个例子,看看Spark将如何创建数据集和分区。玩分区不会带来很大的好处。在第17章中,你会看到分区可以提高分割在几个节点上的较大数据集的性能,特别是(但不只是)在join操作中。
DataFrame是一个Dataset<Row>
在本节中,你将了解更多关于DataFrame的实现。你几乎可以拥有任何Plain Old Java对象(POJO)的数据集,但只有行的数据集(Dataset<Row>)才被称为DataFrame。让我们来探讨一下数据框的好处,并仔细看看如何操作这些特定的数据集。
重要的是要明白,你可以拥有其他POJO的数据集,因为你可以重用你的库中可能已经有的或对你的应用更特殊的POJO。第9章甚至说明了如何基于现有的POJOs来摄取数据。
然而,作为行的数据集(Dataset<Row>)实现的数据框架具有更丰富的API。你将看到如何在需要时来回转换,从数据框到数据集。
LAB 你可以从GitHub下载代码,网址是https://github.com/jgperrin/net.jgp.books.spark.ch03。你将从包net.jgp .books.spark.ch03.lab300_dataset中的实验室#300开始。
重复使用您的POJOs
让我们来探讨一下在你的数据集API中直接重用POJOs的好处,并多了解一些关于Spark存储的知识。使用数据集而不是数据框架的主要好处是,你可以直接在Spark中重用你的POJOs。使用数据集与你的POJOs可以让你使用你熟悉的对象,而没有Row可能带来的任何限制,比如从中提取数据。
这完全不是一个问题,但应该是一个预期的特征。例如,考虑一个基于书籍的数据集。如果你通过分组来统计按年份出版的书籍数量,你的书籍POJO中不会有一个计数字段,所以Spark会自动创建一个数据框架来存储结果。
最后,Row使用了名为Tungsten的高效存储。这不是你的POJO的情况。
Tungsten:疯狂快速的Java存储
性能优化是一个永无止境的故事。Project Tungsten是Apache Spark的一个集成部分,它专注于增强三个关键领域:内存管理和二进制处理,缓存感知计算,以及代码生成。让我们快速看看第一个领域和Java存储对象的方式。
在Java中,我喜欢的第一件事(来自C++)是你不必跟踪内存使用和对象生命周期:所有这些都由垃圾收集器(GC)完成。虽然GC在大多数情况下都表现得很好,但当你玩数据集时,它可能很快就会因为创建数百万个对象而不堪重负。
在Java(8及以下)中存储一个四字符的字符串,如Java,将需要48个字节;在使用UTF-8/ASCII编码时,存储这个字符串应该只需要4个字节。Java虚拟机(JVM)原生String实现的存储方式不同,它用UTF-16编码对每个字符使用2个字节进行编码,每个String对象还包含一个12字节的头和8字节的哈希码。当你调用Java(或其他任何基于JVM的语言).length()操作时,JVM仍然会返回4,因为那是以字符为单位的字符串长度,而不是它在内存中的物理表示。查看Java对象布局(JOL)工具,http://openjdk.java.net/projects/code-tools/jol/,了解更多关于物理存储的信息。
GC和对象存储本身都不差。然而,在高性能和可预测的工作负载中,本可以取得进步。因此,一个更高效的存储系统诞生了。Tungsten直接管理内存块,压缩数据,并有新的数据容器,使用与操作系统的低级交互,提供16倍到100倍的性能提升。
你可以在http://mng.bz/7zyg 阅读更多关于Project Tungsten的信息。
创建一个字符串的数据集
为了理解如何使用数据集而不是DataFrame,让我们看看如何创建一个简单的String数据集。这将通过使用一个我们都熟悉的简单对象--字符串来说明数据集的用法。然后你将能够创建更复杂对象的数据集。
你的应用程序将从一个简单的包含字符串的Java数组中创建一个String的数据集,然后显示结果--没有任何花哨的东西。下面是预期的输出。
而你可以通过使用以下应用程序来重现这个输出:
要使用数据框架的扩展方法来代替数据集,你可以通过调用toDF()方法轻松地将一个数据集转换为一个数据框架。请看实验室#310 (net.jgp.books.spark.ch03.lab310_dataset_to_dataframe.ArrayToDatasetToDataframeApp)。它在start()方法的结尾处添加了以下片段。
Dataset<Row> df = ds.toDF();
df.show();
df.printSchema();
输出与本节中的前一个实验室(实验室#300)完全相同,但现在你有了一个DataFrame!你可以在这里找到一个DataFrame。
来回转换
在本节中,你将学习如何将一个DataFrame转换为一个数据集并返回。如果你想操作你现有的POJOs和只适用于DataFrame的扩展API,这种转换是有用的。
你将读取一个CSV文件,其中包含一个DataFrame中的书籍。你将把DataFrame转换为书籍的数据集,然后再回到数据框架。虽然这听起来是一个令人讨厌的流程,但作为Spark工程师,你可能会参与其中的部分或全部操作。
想象一下下面的用例。你在你的库中有一个现有的bookProcessor()方法。这个方法接收一个图书POJO,并通过API将其发布到一个商家网站上,如Amazon、Fnac或Flipkart。你绝对不希望重写这个方法只在Spark上工作。你要继续发送图书POJO。你可以加载成千上万的书籍,将它们存储在书籍的数据集中,当你要对它们进行迭代时,你可以使用分布式处理来调用你现有的bookProcessor()方法,无需修改。
创建数据集
让我们专注于第一部分:摄取文件,并将DataFrame变成书籍的数据集。输出结果如下。
在你将你的数据框架转换为数据集后,字段会被排序。这不是你要求应用程序做的事情;这是一个额外的奖励(或者说 "malus",取决于日期)。然而,如果你打算在这之后合并数据集,记得使用 unionByName()(而不是 union()),因为字段可能在起飞过程中发生了移动(类似于飞行时东西在头顶的舱室里被移动,这意味着你不知道当你打开盖子时你会得到什么)。
你的申请在以下列表中。
map()方法是一种有趣的动物,一开始看起来有点吓人,但却像小狗一样可爱。map()方法之所以让人望而生畏,是因为它需要更多的编码,而且它并不总是一个容易理解的概念。这个方法将
* 翻阅数据集的每一条记录
* 在MapFunction类的call()方法中做一些事情。
* 返回一个数据集
让我们深入了解一下map()方法的签名。在Java中,泛型并不总是简单的。
Dataset<U> map(MapFunction<T, U>, Encoder<U>)
调用时,map()方法将
* 遍历数据框架的每一条记录。
* 调用一个实现MapFunction<Row, Book>的类的实例;在你的例子中,就是BookMapper。注意,无论你要处理多少条记录,它都只被实例化一次。
* 返回一个Dataset<Book>(你的目标)。
当你实现你的方法时,确保你有正确的签名和实现,因为这可能很棘手。包括签名和所需方法在内的骨架如下:
下面的列表将此骨架应用到您正在构建的BookMapper mapper类中。附录I列出了这些类型的转换的参考资料,包括类签名。
你还需要一个简单的POJO来代表一本书(Book POJO),这是下一个列表。我去掉了大部分的getters和setters,以简化可读性;我很确定你可以在心理上添加缺少的方法。我将所有常见的工件存储在一个x子包中,以增加项目的可读性;在Eclipse中,x代表额外的。
创建数据帧
现在你已经有了数据集,你可以把它转换回数据框架,这样你就可以,例如,执行连接或聚合操作。
所以,让我们把数据集转换回数据框架来研究这部分机制。你将研究一个有趣的日期案例,因为日期被分割成一个嵌套结构。下面的列表显示了输出结果。
现在,你已经准备好将数据集转换为数据框架,然后执行一些转换,比如将日期从这个可恶的结构改为数据框架中的日期列。
Dataset<Row> df2 = bookDs.toDF();
好吧,这并不难,对吧?要将一个数据集转换为数据框架,你只需使用toDF()方法。然而,你仍然有这种奇怪的日期格式,所以我们来纠正一下。第一步是将日期转换为一个具有日期表示的字符串。在这种情况下,你将使用ANSI/ISO格式。YYYY-MM-DD, 如1971-10-05.
请记住,Java中的年份是从1900年开始的,所以1971年是71,而2004年是104。同样,月份从0开始,所以10月,也就是一年中的第10个月,是第9个月。 使用Java方法来构建日期需要使用一个映射函数,就像你在清单3.3中做的那样。这是通过对数据的迭代来构建数据集或数据框架的方法。你也可以使用UDF,定义在第16章。
expr()静态函数将计算一个类似于SQL的表达式并返回一个列。它可以使用字段名。表达式releaseDate.year + 1900将在这个转换过程中被Spark评估,并变成一个包含该值的列。releaseDate.year中的点符号表示数据的路径,你可以在清单3.5的模式中看到。通过实例你将看到更多的静态函数,并将在第13章以及附录G中研究转换。
一旦你有了一个作为字符串的日期,你可以使用to_date()静态函数将其转换为作为日期的日期。
你也可以drop()releaseDate列,它的结构很奇怪,它不是很有用。现在你应该可以建立一个包含任何POJO的数据集,并将其转换为一个数据框架。
Dataframe的祖先:RDD
在前面的章节中,你广泛地学习了数据集和数据框架。然而,Spark并不是天生就有这些组件的。让我们来了解一下为什么要记住弹性分布式数据集的作用。
在数据帧之前,Spark专门使用RDD。不幸的是,你仍然会发现一些老卫士只信奉RDD,而无视或忽略数据帧。为了避免疯狂的讨论,你应该知道什么是RDD,以及为什么数据帧在大多数应用中肯定更容易使用,但如果没有RDD,它们将无法工作。
Spark最著名的创始人之一Matei Zaharia将RDD定义为一种分布式内存抽象,让程序员以容错的方式在大型集群上进行内存内计算。
RDDs的第一个实现是在Spark中。这个想法是通过一组可靠(弹性)的节点来实现内存内计算:如果一个节点发生故障,没有什么大不了的;另一个节点会接力,就像RAID 5磁盘架构一样。RDD的诞生,就是考虑到了不可变的概念(定义在3.1.2节)。
尽管围绕着数据框架做出了重大努力,但RDD并没有消失。而且没有人希望它们消失,它们仍然是Spark使用的低级存储层。看看我的朋友Jules Damji的文章,比较Spark的各种存储结构,网址是http://mng.bz/omdD 但要小心,他是偏向Scala的。
你可以看到数据框和RDDs的一种方式是,数据框是RDDs的扩展。
如果说数据框是雄伟的,那么RDDs绝对不是丑陋和懦弱的。RDDs把所有的存在感都带到了存储层。你应该考虑RDDs,当
* 你不需要模式。
* 你正在开发低级别的转换和动作。
* 你有遗留的代码。
RDD是数据框的基础。正如你所看到的,在许多用例中,dataframe比RDD更容易使用,性能也更优化,但不要因为数据框的雄伟品质而戏弄RDD的粉丝,好吗?
概要
* dataframe是一个不可改变的分布式数据集合,组织成命名的列。基本上,一个dataframe就是一个带有schema的RDD。
* dataframe是Dataset及Row的泛型的一个特例,代码形式为:Dataset<Row>
* 一个dataset除了包含Row形式的数据集,还可以是任何的类型,如Dataset<String>、Dataset<Book>或Dataset<SomePojo> 。
* Dataframe可以存储列式信息,就像CSV文件一样,也可以存储嵌套字段和数组,就像JSON文件一样。无论你是在使用CSV文件、JSON文件还是其他格式,dataframe API都是一样的。
* 在JSON文件中,可以使用点号(.)访问嵌套字段。
* dataframe的 API 可以在 http://mng.bz/qXYE 找到;关于如何使用dataframe的细节,请参见参考章节。
* 静态方法的API可以在http://mng.bz/5AQD(和附录G)中找到;关于如何使用静态方法的细节,请参见参考章节。
* 如果你在联合两个dataframe时不关心列名,请使用 union()。
* 如果你在联合两个dataframe时不关心列名,请使用 unionByName()。
* 在Spark中,你可以直接在一个数据集中重用你的POJO。
* 如果你想让一个对象作为数据集的一部分,它必须是可序列化的。
* 数据集的drop()方法可以删除dataframe中的一列。
* 数据集的col()方法根据名称返回数据集的列。
* to_date() 静态函数将字符串中的日期转换为日期。
* expr() 静态函数使用字段名来计算表达式的结果。
* lit() 静态函数返回具有文字值的列。
* 弹性分布式数据集(RDD)是数据元素的不可改变的分布式集合。
* 当性能至关重要时,你应该在RDD上使用dataframe。
* Tungsten存储依赖于dataframe。
* Catalyst 是转换优化器(见第 4 章)。它依靠dataframe来优化操作和转换。
* 跨Spark库的API(graph, SQL, machine learning, 和streaming)正在变得统一在dataframe API下。