在火花性能的背后或为什么查询编译很重要

    科技2022-08-01  108

    Criteo is a data-driven company. Every day we digest dozens of terabytes of new data to train recommendation models that serve requests at the scale of the internet. Spark is our tool of choice for processing big data. It is a powerful and flexible instrument, but it has a pretty steep learning curve, and effective usage often requires reading source codes of the framework.

    Criteo是一家数据驱动的公司。 每天,我们都会消化数十兆兆字节的新数据,以训练可满足Internet规模请求的推荐模型。 Spark是我们处理大数据的首选工具。 它是一种功能强大且灵活的工具,但是它的学习曲线相当陡峭,有效使用常常需要阅读框架的源代码。

    The fast processing of big data has a critical business impact for us:

    大数据的快速处理对我们具有至关重要的业务影响:

    we refresh our models often, which brings extra performance for our clients

    我们会经常刷新模型,从而为客户带来额外的性能 we have a low time-to-market for new ML-powered products because we can iterate quickly

    我们无法快速迭代新的机器学习驱动产品的上市时间

    it impacts our infrastructure cost

    它会影响我们的基础设施成本

    In this post, I will discuss writing efficient Spark code and demonstrate on toy examples common pitfalls. I show that Spark SQL (Datasets) should generally be preferred to Spark Core API (RDD) and that by making the right choice, you can win 2 to 10 times in the performance of your big data jobs, which matters.

    在这篇文章中,我将讨论编写有效的Spark代码,并在玩具示例中演示常见的陷阱。 我表明,Spark SQL(数据集)通常应优先于Spark Core API(RDD),并且通过做出正确的选择,您可以在大数据作业的性能中胜出2到10倍,这很重要。

    实验设置 (Experiments setup)

    Spark 2.4.6, Macbook Pro 2017 with 3,5 GHz Intel Core i7

    Spark 2.4.6,配备3.5 GHz Intel Core i7的Macbook Pro 2017

    All time measurements are made on a hot JVM (100 runs of code performed, the average of last 90 is taken). The code in this post uses Scala, but the same conclusions should hold for Python.

    所有时间测量都是在热JVM上进行的(已执行100次代码运行,取最后90次的平均值)。 这篇文章中的代码使用Scala,但是对于Python来说,同样的结论也适用。

    大数据处理的谬误 (The fallacies of big data processing)

    It is a common belief that there are two main performance bottlenecks in the big data processing workflows:

    人们普遍认为,大数据处理工作流存在两个主要的性能瓶颈:

    data shuffling since it requires sending data over network

    数据改组,因为它需要通过网络发送数据 disk I/O since accessing the disk is always much slower than RAM access

    磁盘I / O,因为访问磁盘总是比RAM访问慢得多

    There are historical reasons behind these beliefs — in 2006 when the first release of Hadoop was made, commodity HDDs were slow and unreliable and MapReduce was the main big data processing framework. It is the slowness on the HDDs that triggered the development of in-memory processing tools like Spark. Since then, the hardware has improved a lot.

    这些信念背后有历史原因-在2006年Hadoop的第一个发行版发布时,商品HDD缓慢且不可靠,而MapReduce是主要的大数据处理框架。 HDD的缓慢触发了诸如Spark之类的内存处理工具的开发。 从那时起,硬件有了很大的进步。

    In 2015 Ousterhout, Kay et al.¹ set a goal of analyzing the bottlenecks of Spark jobs and discovered that these jobs are rather CPU bound, than disk I/O and network bound. In particular, they ran a wide selection of queries on three benchmark datasets including TPC-DS² and concluded that:

    2015年, Ousterhout,Kay等人。 ¹设定了分析Spark作业的瓶颈的目标,并发现这些作业是CPU约束的,而不是磁盘I / O和网络约束的。 特别是,他们对包括TPC- DS²在内的三个基准数据集进行了广泛的查询,得出的结论是:

    If the network bandwidth was infinite, the job completion time could be reduced by 2% (median value)

    如果网络带宽是无限的,则作业完成时间可以减少2%(中值) If the disk I/O had infinite bandwidth, the job completion time for a typical analytical workflow could be reduced by 19% (median value)

    如果磁盘I / O具有无限带宽,则典型分析工作流程的作业完成时间可以减少19%(中值)

    A lot is surprising about this result! Disk I/O turns out to have much more impact than network transfers. There are several reasons for this:

    这个结果令人惊讶! 事实证明,磁盘I / O的影响远大于网络传输。 有几个原因:

    Spark uses disk I/O not only when reading the input dataset and writing the result, but also during the job execution for caching and spilling to disk the data that does not fit into RAM.

    Spark不仅在读取输入数据集和写入结果时使用磁盘I / O,还在作业执行期间使用磁盘I / O将不适合RAM的数据缓存和溢出到磁盘上。 Analytic jobs often include aggregations, so the data transferred over the network generally is smaller than the data originally read from disk.

    分析作业通常包括聚合,因此通过网络传输的数据通常小于最初从磁盘读取的数据。

    Interestingly, Databricks arrived at similar conclusions around 2016³, which impacted the vector of Spark development, shifting the focus to optimizing the CPU usage by Spark, which resulted in the introduction of SQL support, DataFrames, and later Datasets APIs.

    有趣的是,Databricks在2016年3月左右得出了类似的结论,这影响了Spark开发的方向,将重点转移到通过Spark优化CPU使用率,从而引入了SQL支持,DataFrames和更高版本的Datasets API。

    Spark有多快? (How fast is Spark?)

    Let’s consider a simple task — counting naively even numbers from 0 to 10⁹. We don’t need Spark for such a job, so let’s write a simple Scala program which does exactly this:

    让我们考虑一个简单的任务-天真地计算从0到10 even的偶数。 我们不需要Spark来完成这样的工作,因此让我们编写一个简单的Scala程序来执行此操作:

    Listing 1 — Count naively 清单1 —天真地计数

    Let’s also compute the same result with Spark RDD and with Spark Datasets. To make the experiment fair, I run Spark in local[1] mode:

    我们还使用Spark RDD和Spark Datasets计算相同的结果。 为了使实验公平,我在local [1]模式下运行Spark:

    Listing 2 — Count with RDDs 清单2 —计算RDD Listing 3 — Count with Datasets 清单3 —数据集计数

    The running time of all snippets is shown below. Not surprisingly, the hand-written code is the most efficient solution. But what is surprising — RDD is five times slower, whereas Dataset computation time is close to the hand-written code.

    所有摘要的运行时间如下所示。 毫不奇怪,手写代码是最有效的解决方案。 但是令人惊讶的是,RDD的速度慢了五倍,而数据集的计算时间却接近手写代码。

    数据集的悖论 (The paradox of Datasets)

    Here is a paradox: Datasets are built on top of RDDs, but they are much faster, almost as fast as the custom task-specific code that you would write. It is surprising. How is this even possible? A new execution model explains this.

    这是一个悖论:数据集建立在RDD之上,但是它们却要快得多,几乎与您将要编写的自定义任务特定代码一样快。 令人惊讶。 这怎么可能? 一个新的执行模型对此进行了解释。

    过去-火山模型 (The past — Volcano model)

    Code written with RDDs is executed with Volcano execution model. In practice, this means that each RDD follows a standard interface:

    用RDD编写的代码通过Volcano执行模型执行。 实际上,这意味着每个RDD都遵循一个标准接口:

    knows its parent RDD

    知道其父RDD

    exposes over a method compute an Iterator[T] which iterates over the elements of this RDD (is private and should only be used by Spark developers)

    公开一个计算 Iterator [T]的方法,该Iter迭代此RDD的元素(是私有的,只应由Spark开发人员使用)

    Listing 4 — RDD.scala 清单4 — RDD.scala

    Given these properties, here is a simplified version of the count function implementation for RDDs. ignoring partitioning:

    给定这些属性,这是用于RDD的count函数实现的简化版本。 忽略分区:

    Listing 5 — Pseudocode for RDD count action 清单5 —用于RDD计数操作的伪代码

    Why is this code so much slower than the hand-written code in Listing 1? Multiple reasons:

    为什么此代码比清单1中的手写代码慢得多? 多个原因:

    Virtual function dispatches for iterators — calls to Iterator.next() have an overhead over non-virtual functions which can be inlined by the compiler or JIT

    迭代器的虚函数分派—对Iterator.next()的调用比非虚函数具有开销,该虚函数可以由编译器或JIT内联

    Lack of CPU optimizations — JVM and JIT are not able to optimize the bytecode produced in listing five as good as the bytecode produced in listing 1. In particular, hand-written code enables JVM and JIT to keep intermediate computation results in CPU register in contrast to putting them into the main memory

    缺乏CPU优化-JVM和JIT无法优化清单5中产生的字节码,而不是清单1中产生的字节码。特别地,手写代码使JVM和JIT能够将中间计算结果保留在CPU寄存器中将它们放入主存储器

    现在—整个阶段的代码生成 (The present — Whole Stage Code Generation)

    The code written with Spark SQL⁵ is executed differently from the code written with RDDs. When an action is triggered, Spark generates code that collapses multiple data transformations into a single function. This process is called “Whole stage code generation⁶”. Spark is trying to reproduce the process of writing custom task-specific code, which avoids virtual function calls and can be more effectively executed by JVM/JIT. In reality, Spark generates quite a bit of code, e.g., for Listing 3, Spark here.

    用Spark SQL written编写的代码与用RDD编写的代码执行方式不同。 触发动作后,Spark会生成将多个数据转换折叠为一个函数的代码。 此过程称为“ 整个阶段代码生成 ⁶”。 Spark正在尝试重现编写自定义任务特定代码的过程,该过程避免了虚函数调用,并且可以由JVM / JIT更有效地执行。 实际上,Spark生成了很多代码,例如清单3的Spark 在此处 。

    Technically, Spark only generates the code and relies on the Janino⁴ compiler for bytecode generation. That is what makes Spark SQL so fast as compared to RDDs.

    从技术上讲,Spark仅生成代码,并依赖Janino⁴编译器来生成字节码。 这就是Spark SQL与RDD相比如此之快的原因。

    有效地做你的火花 (Doing your Spark efficiently)

    Spark currently exposes 3 Scala/Java APIs: RDD, Datasets, and DataFrames (which are now unified with Datasets). RDDs are still a first-class citizen in Spark, and legacy jobs are mostly using RDDs, which makes it tempting to continue using RDDs. However, as the performance benchmarks show, switching to Datasets API can give a massive gain in performance due to optimized CPU usage.

    Spark当前公开了3种Scala / Java API:RDD,数据集和数据帧(现在已与数据集统一)。 RDD仍然是Spark中的一等公民,而遗留工作大多使用RDD,这使它很想继续使用RDD。 但是,如性能基准所示,由于优化了CPU使用率,切换到Datasets API可以大大提高性能。

    做错了–经典方式 (Doing it wrong — the classical way)

    The most common pitfall I encounter in using Spark SQL is an explicit switch to RDDs API. The reason is, it is sometimes more straightforward for a developer to express the computation with Java objects than with restrained Spark SQL language:

    我在使用Spark SQL时遇到的最常见陷阱是显式切换到RDDs API。 原因是,对于开发人员而言,用Java对象来表达计算有时要比使用受约束的Spark SQL语言更为直接:

    Listing 6 — Switching to from Dataset to RDD 清单6 —从数据集切换到RDD

    This code runs in 43 seconds instead of the original 2.1 and does not do anything more useful. An explicit switch to RDD stops the Whole Stage Codegen and triggers a conversion of dataset elements from primitive types to Java objects, which turns out very costly. Notice an extra step in the stage representation for the code in Listing 3 and Listing 6.

    此代码在43秒内运行,而不是原始的2.1秒,并且没有任何其他用处。 显式切换到RDD会停止整个阶段Codegen,并触发将数据集元素从原始类型转换为Java对象,这将导致成本很高。 请注意清单3和清单6中代码的阶段表示中的额外步骤。

    Stage visual representations for Left — Listing 2 and Right — Listing 6 舞台的视觉表示-清单2和右-清单6

    做错了—精美的方法 (Doing it wrong — exquisite way)

    The performance of Spark SQL is surprisingly fragile. This tiny change makes the query time increase three times (to 6 seconds):

    Spark SQL的性能异常脆弱。 这个微小的变化使查询时间增加了三倍(到6秒):

    Listing 7 — Replace Spark SQL Expression with Scala function 清单7 —用Scala函数替换Spark SQL表达式

    Spark is not able to generate efficient code for the condition in the filter. The condition is an anonymous Scala function instead of a Spark SQL expression, and Spark will deserialize each record from the optimized internal representation order to call this function. What is remarkable — this change makes no difference in the visual stage representation, so it is not possible to detect by looking at the job DAG in Spark UI.

    Spark无法为过滤器中的条件生成有效的代码。 条件是一个匿名Scala函数,而不是Spark SQL表达式,Spark将从优化的内部表示顺序中反序列化每个记录,以调用此函数。 值得注意的是-此更改在视觉舞台表示中没有任何区别,因此无法通过查看Spark UI中的作业DAG进行检测。

    Spark SQL’s performance comes at the cost of limiting the types of operations that you can perform. There must be a trade-off somewhere! To get the maximal performance, you should use transformations that operate on Columns: use filter(condition: Column) instead of filter(T => Boolean), and select(…) instead of map(…). This way Spark does not have to reconstruct the Object represented by a single line of the Dataset. And of cause avoid switching to RDDs.

    Spark SQL的性能是以限制您可以执行的操作类型为代价的。 必须在某处进行权衡! 为了获得最佳性能,您应该使用对Columns进行操作的转换:使用filter(condition: Column)代替filter(T => Boolean) ,并select(…)代替map(…) 。 这样,Spark不必重构由数据集的单行表示的对象。 因此,请避免切换到RDD。

    结论和最后评论 (Conclusions and final remarks)

    Simple examples in this post demonstrate that most of the time big data processing jobs are not doing any useful work. A good remedy for this is query compilation, which is available through Spark SQL and enables more efficient usage of modern hardware.

    这篇文章中的简单示例表明,大多数时候大数据处理工作并没有做任何有用的工作。 一个很好的补救方法是查询编译,该编译可通过Spark SQL获得,并可以更有效地使用现代硬件。

    A careful application of query compilation can reduce the processing time by 2 to 10 times, which means faster experiments, lower infrastructure costs, and enjoying the awesomeness of doing things elegantly!

    仔细地应用查询编译可以将处理时间减少2到10倍,这意味着更快的实验,更低的基础结构成本以及优雅的做事享受!

    The code samples used in this post are available here. You can use this repository to benchmark different Spark queries.

    这篇文章中使用的代码示例可在此处获得 。 您可以使用此存储库对不同的Spark查询进行基准测试。

    翻译自: https://medium.com/@zayaz/under-the-hood-of-spark-performance-or-why-query-compilation-matters-c084e749be87

    相关资源:jdk-8u281-windows-x64.exe
    Processed: 0.019, SQL: 9