提问者:小点点

将数据帧转换为强类型数据集?


我有以下类,run从数据库表返回整数列表。

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
  }
}

以下代码

val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d) // d expected to be int
})
processed.saveAsTextFile("c:\\temp\\mpa")

得到的错误

[error] ...\src\main\scala\main.scala:39: type mismatch;
[error]  found   : org.apache.spark.sql.Row
[error]  required: Int
[error]       runJob.run(d)
[error]                  ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

我试过了

  1. val处理=itemListJob.run(rc,优先级). select("id").as[Int].map(d=

他们俩都犯了错误

找不到存储在数据集中的类型的编码器。通过导入火花.电码,支持原始类型(Int、String等)和产品类型(大小写类)。_将在未来的版本中添加对序列化其他类型的支持。

更新:我正在尝试添加导入隐含语句

>

  • import sc. int._有错误

    值隐含不是org. apache.park.SparkContext的成员

    import sqlContext. impl的。_是可以的。但是,的后面的语句进行了处理。saveAsTextFile("c:\\temp\\mpa")得到了错误

    value saveAsTextFile不是org. apache.park.sql.Dataset[(Int,java.time.LocalDate)]的成员


  • 共1个答案

    匿名用户

    您应该简单地将带有select("id")的行更改为如下:

    select("id").as[Int]
    

    您应该导入将Row转换为Ints的含义。

    import sqlContext.implicits._ // <-- import implicits that add the "magic"
    

    您还可以更改run以包含以下转换(注意我添加的行的注释):

    class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
      def run(date: LocalDate) = {
        import sqlContext.implicits._ // <-- import implicits that add the "magic"
        sqlContext.read.format("jdbc").options(Map(
          "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
          "url" -> jdbcSqlConn,
          "dbtable" -> s"dbo.GetList('$date')"
        )).load()
        .select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster
        .as[Int] // <-- convert Row into Int
      }
    }
    

    value saveAsTextFile不是org. apache.park.sql.Dataset[(Int,java.time.LocalDate)]的成员

    编译错误是因为您尝试对不可用的Dataset使用saveAsTextFile操作。

    在SparkSQL中写入是通过使用写运算符可用的DataFrameWriter:

    write: DataFrameWriter[T]接口,用于将非流数据集的内容保存到外部存储中。

    因此,您应该执行以下操作:

    processed.write.text("c:\\temp\\mpa")
    

    完成!