我正在研究pyspark(3. x)和delta Lake。我在数据类型方面面临一些挑战。我们将数据作为JSON数据类型接收,我们正在对JSON数据集进行一些扁平化,并将其保存为delta表,选项为“合并模式”,如下所示。我们没有在表上强加任何模式。
df.write\
.format("delta")\
.partitionBy("country","city")\
.option("mergeSchema","true")\
.mode("append")\
.save(delta_path)\
我们面临的问题是-JSON字段的数据类型经常发生变化,例如在增量表中,“field_1”的数据类型存储为StringType,但新JSON的“field_1”的数据类型是LongType。因此,我们得到了合并不兼容的异常。
ERROR : Failed to merge fields 'field_1' and 'field_1'. Failed to merge incompatible data types StringType and LongType
如何在增量表中处理这种数据类型的演变,我不想在字段级别处理数据类型的变化,因为我们有300多个字段作为json的一部分。
根据文章《潜入三角洲湖:图式执行》
这篇文章还提示了在你的情况下可以做些什么:
"其他不符合模式演变条件的更改要求通过添加. option("overwrite eSchema","true")
来覆盖模式和数据。例如,如果列"Foo"最初是整数数据类型,而新模式将是字符串数据类型,则需要重新编写所有Parquet(数据)文件。这些更改包括:"
我也采取了类似于nilesh1212的方法,即手动合并模式。
在我的例子中,我的脚本可以处理嵌套类型,可以在这里找到:https://github.com/miguellobato84/spark-delta-schema-evolution
另外,我写了这篇关于这个问题的文章https://medium.com/@miguellobato84/改进-delta-lake-schema-eversion-2cce8db2f0f5
为了解决我的问题,我编写了一个新函数,它本质上合并了增量表的模式(如果增量表存在)和JSON模式。
在高层次上,我创建了一个新模式——这个新模式本质上是增量湖表中的公共列和JSON字段中的新列的组合,通过创建这个新模式,我通过应用这个新模式重新创建了一个数据帧。这解决了我的问题。
def get_merged_schema(delta_table_schema, json_data_schema):
print('str(len(delta_table_schema.fields)) -> ' + str(len(delta_table_schema.fields)))
print('str(len(json_data_schema.fields)) -> '+ str(len(json_data_schema.fields)))
no_commom_elements=False
no_new_elements=False
import numpy as np
struct_field_array=[]
if len(set(delta_table_schema.names).intersection(set(json_data_schema.names))) > 0:
common_col=set(delta_table_schema.names).intersection(set(json_data_schema.names))
print('common_col len: -> '+ str(len(common_col)))
for name in common_col:
for f in delta_table_schema.fields:
if(f.name == name):
struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
else:
no_commom_elements=True
print("no common elements")
if len(np.setdiff1d(json_data_schema.names,delta_table_schema.names)) > 0:
diff_list = np.setdiff1d(json_data_schema.names,delta_table_schema.names)
print('diff_list len: -> '+ str(len(diff_list)))
for name in diff_list:
for f in json_data_schema.fields:
if(f.name == name):
struct_field_array.append(StructField(f.name, f.dataType, f.nullable))
else:
no_new_elements=True
print("no new elements")
print('len(StructType(struct_field_array)) -> '+str(len(StructType(struct_field_array))))
df=spark.createDataFrame(spark.sparkContext.emptyRDD(),StructType(struct_field_array))
if no_commom_elements and no_new_elements:
return StructType(None)
else:
return df.select(sorted(df.columns)).schema