Java源码示例:org.apache.parquet.avro.AvroReadSupport
示例1
public static void applyCommonConfig(Configuration conf, ParquetConfig parquetConfig) {
if (parquetConfig.getAvroReadCompatibility() != null) {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
parquetConfig.getAvroReadCompatibility().booleanValue());
}
if (parquetConfig.getAvroAddListElementRecords() != null) {
conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
parquetConfig.getAvroAddListElementRecords().booleanValue());
}
if (parquetConfig.getAvroWriteOldListStructure() != null) {
conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
parquetConfig.getAvroWriteOldListStructure().booleanValue());
}
}
示例2
@Override
public void initialize() {
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
"A reader may not be opened more than once - current state:%s", state);
LOG.debug("Opening reader on path:{}", path);
try {
final Configuration conf = fileSystem.getConf();
AvroReadSupport.setAvroReadSchema(conf, readerSchema);
reader = new AvroParquetReader<E>(
conf, fileSystem.makeQualified(path));
} catch (IOException e) {
throw new DatasetIOException("Unable to create reader path:" + path, e);
}
advance();
state = ReaderWriterState.OPEN;
}
示例3
private ParquetReader<GenericRecord> initReader() throws IOException {
Configuration configuration = getFs().getConf();
if (this.schema != null) {
AvroReadSupport.setAvroReadSchema(configuration, this.schema);
}
if (this.projection != null) {
AvroReadSupport.setRequestedProjection(configuration, this.projection);
}
return AvroParquetReader
.<GenericRecord>builder(HadoopInputFile.fromPath(getFilePath(), configuration))
.build();
}
示例4
@Override
@SuppressWarnings("deprecation")
public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
// Columns are selected from the Parquet file by taking the read context's message type and
// matching to the file's columns by full path, so this must select columns by using the path
// in the file's schema.
MessageType projection = hasIds(fileSchema) ?
pruneColumns(fileSchema, expectedSchema) :
pruneColumnsFallback(fileSchema, expectedSchema);
// override some known backward-compatibility options
configuration.set("parquet.strict.typing", "false");
configuration.set("parquet.avro.add-list-element-records", "false");
configuration.set("parquet.avro.write-old-list-structure", "false");
// set Avro schemas in case the reader is Avro
AvroReadSupport.setRequestedProjection(configuration,
AvroSchemaUtil.convert(expectedSchema, projection.getName()));
org.apache.avro.Schema avroReadSchema = AvroSchemaUtil.buildAvroProjection(
AvroSchemaUtil.convert(ParquetSchemaUtil.convert(projection), projection.getName()),
expectedSchema, ImmutableMap.of());
AvroReadSupport.setAvroReadSchema(configuration, ParquetAvro.parquetAvroSchema(avroReadSchema));
// let the context set up read support metadata, but always use the correct projection
ReadContext context = null;
if (callInit) {
try {
context = wrapped.init(configuration, keyValueMetaData, projection);
} catch (UnsupportedOperationException e) {
// try the InitContext version
context = wrapped.init(new InitContext(
configuration, makeMultimap(keyValueMetaData), projection));
}
}
return new ReadContext(projection,
context != null ? context.getReadSupportMetadata() : ImmutableMap.of());
}
示例5
@Override
public Schema getSchema(Configuration conf, Path path) throws IOException {
AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<>();
ParquetReader.Builder<GenericRecord> builder = ParquetReader.builder(readSupport, path);
ParquetReader<GenericRecord> parquetReader = builder.withConf(conf).build();
GenericRecord record;
Schema schema = null;
while ((record = parquetReader.read()) != null) {
schema = avroData.toConnectSchema(record.getSchema());
}
parquetReader.close();
return schema;
}
示例6
@Override
public Collection<Object> readData(Configuration conf, Path path) throws IOException {
Collection<Object> result = new ArrayList<>();
AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<>();
ParquetReader.Builder<GenericRecord> builder = ParquetReader.builder(readSupport, path);
ParquetReader<GenericRecord> parquetReader = builder.withConf(conf).build();
GenericRecord record;
while ((record = parquetReader.read()) != null) {
result.add(record);
}
parquetReader.close();
return result;
}
示例7
/**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
* return all the rowkeys.
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
* @param filter record keys filter
* @param readSchema schema of columns to be read
* @return Set Set of row keys matching candidateRecordKeys
*/
private static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter,
Schema readSchema) {
Option<RecordKeysFilterFunction> filterFunction = Option.empty();
if (filter != null && !filter.isEmpty()) {
filterFunction = Option.of(new RecordKeysFilterFunction(filter));
}
Configuration conf = new Configuration(configuration);
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
AvroReadSupport.setAvroReadSchema(conf, readSchema);
AvroReadSupport.setRequestedProjection(conf, readSchema);
Set<String> rowKeys = new HashSet<>();
try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build()) {
Object obj = reader.read();
while (obj != null) {
if (obj instanceof GenericRecord) {
String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
if (!filterFunction.isPresent() || filterFunction.get().apply(recordKey)) {
rowKeys.add(recordKey);
}
}
obj = reader.read();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read row keys from Parquet " + filePath, e);
}
// ignore
return rowKeys;
}
示例8
/**
* Fetch {@link HoodieKey}s from the given parquet file.
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
* @return {@link List} of {@link HoodieKey}s fetched from the parquet file
*/
public static List<HoodieKey> fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) {
List<HoodieKey> hoodieKeys = new ArrayList<>();
try {
if (!filePath.getFileSystem(configuration).exists(filePath)) {
return new ArrayList<>();
}
Configuration conf = new Configuration(configuration);
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
AvroReadSupport.setAvroReadSchema(conf, readSchema);
AvroReadSupport.setRequestedProjection(conf, readSchema);
ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build();
Object obj = reader.read();
while (obj != null) {
if (obj instanceof GenericRecord) {
String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
hoodieKeys.add(new HoodieKey(recordKey, partitionPath));
obj = reader.read();
}
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read from Parquet file " + filePath, e);
}
return hoodieKeys;
}
示例9
protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext jsc,
String schemaStr) throws IOException {
Job job = Job.getInstance(jsc.hadoopConfiguration());
// Allow recursive directories to be found
job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
// To parallelize reading file status.
job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));
return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
job.getConfiguration())
// To reduce large number of tasks.
.coalesce(16 * cfg.parallelism).map(entry -> {
GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField = genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException("partition key is missing. :" + cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
}
String partitionPath = partitionField.toString();
LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
} catch (NumberFormatException nfe) {
LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
}
}
return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath),
new HoodieJsonPayload(genericRecord.toString()));
});
}
示例10
private static void setConfigProperties(Configuration conf, Format format,
Schema schema, Class<?> type) {
GenericData model = DataModelUtil.getDataModelForType(type);
if (Formats.AVRO.equals(format)) {
setModel.invoke(conf, model.getClass());
conf.set(AVRO_SCHEMA_INPUT_KEY, schema.toString());
} else if (Formats.PARQUET.equals(format)) {
// TODO: update to a version of Parquet with setAvroDataSupplier
//AvroReadSupport.setAvroDataSupplier(conf,
// DataModelUtil.supplierClassFor(model));
AvroReadSupport.setAvroReadSchema(conf, schema);
}
}
示例11
@Override
@SuppressWarnings("deprecation")
public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
// Columns are selected from the Parquet file by taking the read context's message type and
// matching to the file's columns by full path, so this must select columns by using the path
// in the file's schema.
MessageType projection;
if (ParquetSchemaUtil.hasIds(fileSchema)) {
projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
} else if (nameMapping != null) {
MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping);
projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
} else {
projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
}
// override some known backward-compatibility options
configuration.set("parquet.strict.typing", "false");
configuration.set("parquet.avro.add-list-element-records", "false");
configuration.set("parquet.avro.write-old-list-structure", "false");
// set Avro schemas in case the reader is Avro
AvroReadSupport.setRequestedProjection(configuration,
AvroSchemaUtil.convert(expectedSchema, projection.getName()));
org.apache.avro.Schema avroReadSchema = AvroSchemaUtil.buildAvroProjection(
AvroSchemaUtil.convert(ParquetSchemaUtil.convert(projection), projection.getName()),
expectedSchema, ImmutableMap.of());
AvroReadSupport.setAvroReadSchema(configuration, ParquetAvro.parquetAvroSchema(avroReadSchema));
// let the context set up read support metadata, but always use the correct projection
ReadContext context = null;
if (callInit) {
try {
context = wrapped.init(configuration, keyValueMetaData, projection);
} catch (UnsupportedOperationException e) {
// try the InitContext version
context = wrapped.init(new InitContext(
configuration, makeMultimap(keyValueMetaData), projection));
}
}
return new ReadContext(projection,
context != null ? context.getReadSupportMetadata() : ImmutableMap.of());
}
示例12
@Override
public Iterator<R> getRecordIterator(Schema schema) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, schema);
ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
return new ParquetReaderIterator(reader);
}