大数据 频道

企业大数据平台MapReduce应用之Join实践!

  【IT168 评论】在《Hadoop从入门到精通》大型专题的上半部分(专题链接见文末),我们讲解了Hadoop基本原理并且知道如何在Hadoop中组织、移动和存储数据。接下来,我们将探讨如何简化企业大数据技术应用。本章主要研究大数据模式,针对优化MapReduce操作技术,例如对大型数据集进行连接和排序,这些技术将让任务运行更快,并允许更有效地使用计算资源。

  MapReduce包含许多强大的功能,本章重点介绍连接、排序和采样。这三种模式很重要,因为它们是被频繁使用的大数据操作,并且满足企业集群获取更优性能的目标。

  6.1 Joining

  Join是用于将关系组合在一起的操作。在MapReduce中,Join适用于组合两个或更多数据集的情况,例如,希望将用户(从OLTP数据库中提取)与日志文件(包含用户活动详细信息)组合在一起。在以下情况中,将数据集组合在一起是有用的,例如:

  希望根据用户人口统计信息(例如用户习惯差异,比较青少年和30多岁用户的差异)来汇总数据。

  希望向未在指定天数内使用该网站的用户发送电子邮件。

  希望创建反馈循环来检查用户浏览习惯,允许系统向用户推荐以前未探索的站点功能。

  所有这些场景都要求将数据集连接在一起,两种最常见的Join类型是内连接和外连接。内连接比较关系L和R中的所有元组,并在满足连接谓词时生成结果。相反,外连接不需要基于连接谓词匹配元组,即使不存在匹配也可保留L或R的记录。

  图6.1 不同类型的Join策略,显示为维恩图。阴影区域显示在连接中保留的数据。

  本节,我们将介绍MapReduce的三种Join策略,支持两种最常见的Join类型(内部和外部)。通过利用MapReduce排序—合并架构,可以在map或reduce端进行连接:

  Repartition join——将两个或多个大型数据集连接在一起的情况下减少侧连接;

  Replication join ——map端连接,适用于其中一个数据集足够小可进行缓存的情况;

  Semi-join——map侧连接,其中一个数据集太大而无法容纳到内存中,但经过某些过滤后可以减少到适合内存的大小

  Join data

  这些技术都将利用两个数据集来连接用户和日志。用户数据包含用户名,年龄和状态。完整数据集如下:

  日志数据集显示可以从应用程序或Web服务器日志中提取一些基于用户的活动。数据包括用户名,操作和源IP地址,以下是完整的数据集:

  让我们先看一下应该根据数据选择哪种Join方法。

  实践:为数据选择非常好的Join策略

  本节介绍的每种Join策略都有不同的优点和缺点,确定哪种策略最适合正在使用的数据可能具有挑战性。该技术可以查看数据中的不同特征,并使用该信息选择Join数据的非常好的方法。

  问题

  选择Join数据的非常好的方法。

  解决方案

  使用数据驱动的决策树来选择非常好的的Join策略。

  讨论

  图6.2显示了可以使用的决策树。

  图6.2 用于选择Join策略的决策树

  决策树可归纳为以下三点:

  如果某个数据集小到足以放入mapper内存中,则maponly复制的Join是有效的。

  如果两个数据集都很大,并且其中一个可以通过预过滤缩小与另一个数据集的不匹配,则Semi-join效果更好。

  如果无法预处理数据且数据太大而无法缓存 - 这意味着必须在reducer中执行Join

  无论选择哪种策略,应该在Join中执行的最基本活动之一是使用过滤器和投影。

  过滤器,投影和下推

  在mapper中有效使用过滤器和投影可以减少MapReduce中使用和溢出的数据量。此技术还包括一种称为下推的更高级优化,这可以进一步改善数据管道。

  问题

  正在处理大量数据,并且希望有效管理输入数据以优化作业。

  解决方案

  过滤并投影数据,仅包含将在工作中使用的数据。

  讨论

  过滤和投影数据是在Join数据时以及在处理数据时可以进行的最大优化。这是一种适用于任何OLAP活动的技术,它在Hadoop中同样有效。

  为什么过滤和投影如此重要? 他们减少了处理管道需要处理的数据量。使用较少的数据非常重要,尤其是当跨网络和磁盘边界推送数据时。MapReduce中随机播放步骤非常昂贵,因为数据正在写入本地磁盘和整个网络,因此更少的字节数就意味着作业和MapReduce框架的工作量更少,这意味着CPU,磁盘和网络设备上有更少的负载。

  图6.3 使用过滤器和投影来减少数据大小

  过滤器和投影应尽可能靠近数据源执行,在MapReduce中,这项工作最好在mapper中执行。以下代码显示了过滤器示例,该过滤器排除了30岁以下的用户,并且只投影其名称和状态: 

 

  在连接中使用过滤器的挑战是:所加入的所有数据集都可能包含要过滤的字段。如果是这种情况,请查看使用Bloom过滤器解决此挑战。

  投影和谓词下推

  在使用可以基于下推来跳过记录或整个块的存储格式时,将投影和谓词下推存储格式进一步过滤非常有效。

 

  表6.1存储格式及其下推支持

  关于下推

  很明显,Parquet的一大优势是能够支持两种类型的下推。如果正在处理大型数据集并且只定期处理记录和字段的子集,那么应该将Parquet视为存储格式。

  6.1.1 map端Join

  首先介绍在mapper中执行Join。如果数据可以支持map侧Join,那么将是非常好的Join策略。对比mapper和reducer之间重排数据的开销,减小数据集的大小以便可以在map端Join是明智的。

  本节将介绍三种不同风格的map侧Join。一是其中一个数据集已足够小可以在存储器高速缓存的情况下工作良好;二是要求在过滤掉两个数据集中存在连接键的记录之后,一个数据集可以适合存储器;三是适用于以特定方式对文件进行排序和分布的情况。

  实践:Join数据,其中一个数据集可以适合内存

  replicated join是一个map端连接,它从其函数中获取名称。最小的数据集将复制到所有map主机,其操作取决于一个事实,即Join数据集之一足够小,可以缓存在内存中。

  问题

  希望对数据执行Join,其中一个数据集适合mapper内存。

  解决方案

  使用分布式缓存较小数据集,并在较大数据集流式传输到mapper时执行Join。

  讨论

  使用分布式缓存将小数据集复制到运行map tasks2的节点,并使用每个map任务的初始化方法将其加载到哈希表中。使用从大数据集反馈到map函数的每个记录中的键来查找小数据集哈希表,并在大数据集和小数据集中与Join值匹配的所有记录之间执行Join。 图6.4显示了该方法在MapReduce中的工作原理。

  图6.4仅map的复制Join

  以下代码执行此Join:

 

 

  要执行此Join,首先需要将要加入的两个文件复制到HDFS中的主目录:

  接下来,运行作业并在完成后检查其输出:

 

  Hive

  通过在执行之前配置作业,可以将Hive连接转换为map端连接。重要的是最大的表是查询中的最后一个表,因为这是Hive将在mapper中流式传输的表(其他表将被缓存):

  不再强调map-join提示Hive 0.11实现了一些更改,这些更改表面上删除了map-join提示作为SELECT语句的一部分,但是不清楚在哪些情况下不再需要提示。全连接或右外连接不支持map端Join,它们将作为重新分区Join执行(reduce端连接)。

  总结

  内部和外部Join都可以使用复制Join来支持。此技术实现了内部Join,因为只发出了两个数据集中具有相同键的记录。要将其转换为外部Join,可以流式传输到mapper中的值,这些值在哈希表中没有相应的条目,可以类似地跟踪与流式map记录匹配的哈希表条目并使用方法在map任务结束时从哈希表中发出与任何map输入都不匹配的记录。

  在数据集足够小以便在内存中缓存的情况下,有没有办法进一步优化map侧Join?现在是时候看看Semi-join了。

  实践:在大型数据集上执行Semi-join

  想象一下,正在使用两个大型数据集,例如用户日志和来自OLTP数据库的用户数据。这些数据集中的任何一个都不足够小以在map内存中缓存,因此必须自己执行reducer端连接。问自己一个问题:如果要删除与其他数据集中记录不匹配的所有记录,其中一个数据集是否适合map端内存?

  在示例中,日志中显示的用户很可能只占OLTP数据库整体用户集的一小部分,因此通过删除日志中未显示的所有OLTP用户,可以将数据集缩小到适合内存的大小。如果是这种情况,Semi-join就是解决方案。 图6.5显示了执行Semi-join所需执行的三个MapReduce作业。

  问题

  希望将大型数据集Join在一起,同时避免混洗和排序阶段的开销。

  解决方案

  在此技术中,我们将使用三个MapReduce作业将两个数据集连接在一起,以避免reducer端连接的开销。此技术在处理大型数据集的情况下非常有用,过滤掉与其他数据集不匹配的记录,可以将作业减小到可以放入任务内存的大小。

  讨论

  该技术将分解图6.5中所示的三个作业。

  Job1

  第一个MapReduce作业的功能是生成一组存在于日志文件中的唯一用户名,可以通过让map函数执行用户名的投影来执行此操作,然后使用reducer发出用户名。要减少map和reduce阶段之间传输的数据量,将使map任务缓存HashSet中的所有用户名,并在清理方法中发出HashSet值。图6.6显示了此作业的流程。

  图6.5构成Semi-join的三个MapReduce作业

  图6.6 Semi-join中的第一个作业生成一组存在于日志文件中的唯一用户名。

  以下代码显示了MapReduce作业:

 

  第一个作业的结果是出现在日志文件中的一组唯一用户。

  Job2

  第二步是精心过滤MapReduce作业,其目标是从用户数据集中删除日志数据中不存在的用户。 这是一个仅map端作业,使用复制的Join来缓存日志文件中显示的用户名,并将其与用户数据集Join起来。Job1的唯一用户输出将远远小于整个用户数据集,这使其成为缓存的自然选择。图6.7显示了此作业的流程。

  图6.7 Semi-join中的第二个作业从日志数据中缺少的用户数据集中删除用户。

  Job 3

  在最后一步中,我们将把Job 2生成的过滤用户与原始用户日志结合起来。过滤后的用户现在应该足够少以留在内存中,允许将它们放入分布式缓存中,图6.8显示了此作业的流程。

  图6.8 Semi-join中的第三个作业将Job2生成的用户与原始用户日志组合在一起。

  运行代码并查看前面每个步骤生成的输出:

  输出显示Semi-join和最终Join输出中作业的逻辑进展。

  总结

  在这种技术中,我们研究了如何使用Semi-join将两个数据集组合在一起。Semi-join构造涉及比其他Join更多的步骤,但即使在处理大型数据集时,它也是使用map侧Join的有效方式(需要注意的是,其中一个数据集必须缩小到适合内存的大小))。

  实践:加入预分类和预分区数据

  Map侧Join是最有效的技术,前两个map侧策略都要求其中一个数据集可以加载到内存中。 如果正在使用无法按照先前技术的要求缩小到较小尺寸的大型数据集,该怎么办? 在这种情况下,复合map端连接可能是可行的,但仅当满足以下所有要求时:

  没有任何数据集可以完整地加载到内存中。

  数据集全部按Join键排序。

  每个数据集具有相同数量的文件。

  每个数据集中的文件N包含相同的连接密钥K.

  每个文件都小于HDFS块的大小,因此不会拆分分区。 或者,数据的输入拆分不会拆分文件。

  图6.9显示了有助于复合Join的已排序和分区文件的示例,此技术将介绍如何在作业中使用复合Join。

  问题

  希望对已排序的分区数据执行map端Join。

  图6.9 用作复合Join的输入已排序文件示例

  解决方案

  使用与MapReduce捆绑在一起的CompositeInputFormat。

  讨论

  CompositeInputFormat非常强大,支持内部和外部联接。 以下示例显示如何对数据执行内部联接:

  复合Join要求输入文件按键排序(在我们的示例中是用户名),因此在运行示例之前,需要对这两个文件进行排序并将它们上传到HDFS:

 

  接下来,运行作业并在完成后检查其输出:

 

  Hive

  Hive支持称为排序合并Join的map端连接,其操作方式与此技术的操作方式大致相同。要求在两个表中对所有键进行排序,并且必须将表格划分为相同数量的bucket。需要指定一些配置,并使用MAPJOIN提示来启用此行为:

 

  总结

  复合Join实际上支持N路连接,因此可以Join两个以上的数据集。但是,所有数据集必须符合同一组限制,由于每个mapper使用两个或多个数据输入,因此数据位置只能与其中一个数据集一起存在,因此其余数据集必须从其他数据节点流式传输。对于在运行Join之前数据必须存在的方式,此Join肯定是限制性的,但如果数据已经按照这种方式布局,那么这是一种加入数据并避免基于reducer的shuffle开销的好方法。

0
相关文章