Spark如何处理对象 [英] How spark handles object

查看:101
本文介绍了Spark如何处理对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了测试spark中的序列化异常,我用两种方法编写了一个任务.
第一种方式:

To test the Serialization exception in spark I wrote a task in 2 ways.
First way:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)
    val result = rdd.map(elem => {
      funcs.func_1(elem)
    })        
    println(result.count())
  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

这样,火花效果很好.
当我将其更改为以下方式时,它不起作用并抛出NotSerializableException.
第二种方式:

This way spark works pretty good.
While when I change it to following way, it does not work and throws NotSerializableException.
Second way:

package examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object dd {
  def main(args: Array[String]):Unit = {
    val sparkConf = new SparkConf
    val sc = new SparkContext(sparkConf)

    val data = List(1,2,3,4,5)
    val rdd = sc.makeRDD(data)

    val handler = funcs
    val result = rdd.map(elem => {
      handler.func_1(elem)
    })

    println(result.count())

  }
}

object funcs{
  def func_1(i:Int): Int = {
    i + 1
  }
}

我知道出现错误任务不可序列化"的原因是因为我试图在第二个示例中将非序列化对象funcs从驱动程序节点发送到工作程序节点.对于第二个示例,如果我使对象funcs扩展Serializable,则该错误将消失.

I know the reason I got error "task is not serializable" is because I am trying to send an unserializable object funcs from driver node to worker node in second example. For second example, if I make object funcs extend Serializable, this error will gone.

但是在我看来,因为funcs是一个对象而不是一个类,所以它是一个单例,应该被序列化并从驱动程序传送给工作程序,而不是在工作程序节点本身中实例化.在这种情况下,尽管使用对象funcs的方式不同,但我猜在这两个示例中,无法序列化的对象funcs都是从驱动程序节点传递到工作程序节点的.

But In my view, because funcs is an object rather than a class, it is a singleton and supposed to be serialized and shipped from driver to workers instead of instantiating within a worker node itself. In this scenario, although way to use object funcs is different, I guess the unserializable object funcs is shipped from driver node to worker node in both of these 2 examples.

我的问题是,为什么第一个示例可以成功运行,而第二个示例却因任务无法序列化"异常而失败.

My question is why the first example can be run successfully but second one fails with 'task unserializable' exception.

推荐答案

在RDD闭包中运行代码(映射,过滤器等)时,执行该代码所需的一切都将打包,序列化并发送给执行者以运行.任何被引用的对象(或被引用的字段)都将在此任务中序列化,有时您会在这里获得NotSerializableException.

When you run code in an RDD closure (map, filter, etc...), everything necessary to execute that code will be packaged up, serialized, and sent to the executors to be run. Any objects that are referenced (or whose fields are referenced) will be serialized in this task, and this is where you'll sometimes get a NotSerializableException.

不过,您的用例要复杂一些,并且涉及scala编译器.通常,在scala对象上调用函数等同于调用java静态方法.该对象永远不会真正存在-基本上就像是内联编写代码.但是,如果将对象分配给变量,则实际上是在内存中创建对该对象的引用,并且该对象的行为更像类,并且可能存在序列化问题.

Your use case is a little more complicated, though, and involves the scala compiler. Typically, calling a function on a scala object is the equivalent of calling a java static method. That object never really exists -- it's basically like writing the code inline. However, if you assign an object to a variable, then you're actually creating a reference to that object in memory, and the object behaves more like a class, and can have serialization issues.

scala> object A { 
  def foo() { 
    println("bar baz")
  }
}
defined module A

scala> A.foo()  // static method
bar baz

scala> val a = A  // now we're actually assigning a memory location
a: A.type = A$@7e0babb1

scala> a.foo()  // dereferences a before calling foo
bar baz

这篇关于Spark如何处理对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆