提问者:小点点

Spark 2.2.0AWSEMR写入Parquet会丢弃行


所以我有一个问题,在写入分区Parquet文件时,DataFrame中的某些行会被丢弃。

以下是我的步骤:

  1. 使用指定的模式从S3读取CSV数据文件
  2. 按'date'列分区(DateType)
  3. 写为Parquetmode=append

阅读的第一步按预期工作,没有解析问题。对于质量检查,我执行以下操作:

对于date='2012-11-22'的特定分区,对CSV文件、加载的DataFrame和parque文件执行计数。

以下是一些使用pyspark重现的代码:

logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted

我也试着写HDFS,结果是一样的。这发生在多个分区上。默认/空分区中没有记录。上面的logs_df是准确无误的。

我尝试的第二个实验是编写一个未分区的parque文件。上述代码的唯一区别是省略了的分区By()

logs_df. write.parque('s3://…/logs_2012_parquet/',mode='append')

如上所述加载此镶木地板集并运行计数产生了date='2012-11-22'和其他日期的5000的正确结果。将模式设置为覆盖或不设置(使用默认值)会导致相同的数据丢失。

我的环境是:

  • EMR5.9.0
  • 火花2.2.0
  • Hadoop发行版:Amazon 2.7.3
  • 尝试了EMRFS一致性视图和不一致性视图。然而,大多数测试都是HDFS编写的,以避免任何S3一致性问题。

我将非常感谢修复或解决方法或使用Spark转换为拼花文件的其他方式。

谢谢,

编辑:我无法重现第二个实验。所以让我们说分区和未分区在写入Parquet或JSON时似乎都会丢弃记录。


共1个答案

匿名用户

所以谜团肯定在于模式定义。然而,出乎意料的是,它不是日期或时间戳。事实上是布尔值。

我从Redshift导出了CSV,它将布尔写成tf。当我检查推断的模式时,这些字段被标记为字符串类型。在CSV文件中使用truefalse的简单测试将它们识别为布尔值。

所以我以为日期和时间戳解析会像往常一样出错,但这是布尔值。吸取了教训。

谢谢你的指点。