提问者:小点点

并行性:rdd.并行化(……)vsdataSet.map(…)?


我已经使用DataFrame/DataSet和RDD实现了一个Spark应用程序。我将应用程序提交到我的本地开发环境Spark 2.1.1。我的PC有八个CPU核心。

数据框/数据集

val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a, b) = runJob.run(d, date) // returns a tuple of (int, java.sql.Date), which are the passed parameters.
  s"$a, $b"
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}
processed.write.text("c:\\temp\\mpa")

RDD

val itemList = itemListJob.run(rc, priority).select("id").rdd.map(r => r(0).asInstanceOf[Int]).collect()

val processed = sc.parallelize(itemList).map(d => {
  runJob.run(d, rc) // returns a tuple of (int, LocalDate), which are the passed parameters.
})
processed.saveAsTextFile("c:\\temp\\mpa")

RDD应用程序拆分生成了八个文本文件,而Dataframe/DataSet只生成了一个文件。这是否意味着RDD并行运行八个runJob.run(),而DataFrame/DataSet方法一次只运行一个而没有并发?

我希望runJob.run(),它做主要工作负载,也将进行jdbc调用,分布式并行运行。


共2个答案

匿名用户

是的。但是将数据收集回驱动程序以进行并行化实际上是不必要的。您可以调用Dataset.重新分区(…)将您的一个分区拆分为多个分区。更好的方法是使用其他jdbc重载从JDBC加载数据,例如http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url: String,table:String,谓词:Array[String],ConnectionProperties:java.util.Properties):org.apache.park.sql.DataFrame或http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,谓词:Array[String],ConnectionProperties:java.util.Properties):org.apache.park.sql.DataFrame以避免混洗。

匿名用户

是的,生成的文件数量是最后一步并行度的一个很好的指标。(我可以想到一些可能不是这种情况的边角情况,但这在这里无关紧要)

本地运行时应根据核数进行拆分。

但是,在这两种情况下,您都将只使用1个内核来读取jdbc连接,在RDD情况下,您还需要收集()数据返回驱动程序,然后并行化返回任务。

首选的方法是使用重新分区而不是收集,然后并行化。更好的方法是始终并行执行操作。在通过jdbc加载数据帧的情况下,请查看使用参数分隔栏、lowerBound、upperBound、num分区(链接)是否适用,以便从一开始就并行运行。