Java源码示例:org.apache.parquet.avro.AvroReadSupport

示例1
public static void applyCommonConfig(Configuration conf, ParquetConfig parquetConfig) {
    if (parquetConfig.getAvroReadCompatibility() != null) {
        conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
                parquetConfig.getAvroReadCompatibility().booleanValue());
    }

    if (parquetConfig.getAvroAddListElementRecords() != null) {
        conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
                parquetConfig.getAvroAddListElementRecords().booleanValue());
    }

    if (parquetConfig.getAvroWriteOldListStructure() != null) {
        conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
                parquetConfig.getAvroWriteOldListStructure().booleanValue());
    }
}
 
示例2
@Override
public void initialize() {
  Preconditions.checkState(state.equals(ReaderWriterState.NEW),
    "A reader may not be opened more than once - current state:%s", state);

  LOG.debug("Opening reader on path:{}", path);

  try {
    final Configuration conf = fileSystem.getConf();
    AvroReadSupport.setAvroReadSchema(conf, readerSchema);
    reader = new AvroParquetReader<E>(
        conf, fileSystem.makeQualified(path));
  } catch (IOException e) {
    throw new DatasetIOException("Unable to create reader path:" + path, e);
  }

  advance();

  state = ReaderWriterState.OPEN;
}
 
示例3
private ParquetReader<GenericRecord> initReader() throws IOException {
    Configuration configuration = getFs().getConf();
    if (this.schema != null) {
        AvroReadSupport.setAvroReadSchema(configuration, this.schema);
    }
    if (this.projection != null) {
        AvroReadSupport.setRequestedProjection(configuration, this.projection);
    }
    return AvroParquetReader
            .<GenericRecord>builder(HadoopInputFile.fromPath(getFilePath(), configuration))
            .build();
}
 
示例4
@Override
@SuppressWarnings("deprecation")
public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
  // Columns are selected from the Parquet file by taking the read context's message type and
  // matching to the file's columns by full path, so this must select columns by using the path
  // in the file's schema.

  MessageType projection = hasIds(fileSchema) ?
    pruneColumns(fileSchema, expectedSchema) :
    pruneColumnsFallback(fileSchema, expectedSchema);

  // override some known backward-compatibility options
  configuration.set("parquet.strict.typing", "false");
  configuration.set("parquet.avro.add-list-element-records", "false");
  configuration.set("parquet.avro.write-old-list-structure", "false");

  // set Avro schemas in case the reader is Avro
  AvroReadSupport.setRequestedProjection(configuration,
      AvroSchemaUtil.convert(expectedSchema, projection.getName()));
  org.apache.avro.Schema avroReadSchema = AvroSchemaUtil.buildAvroProjection(
      AvroSchemaUtil.convert(ParquetSchemaUtil.convert(projection), projection.getName()),
      expectedSchema, ImmutableMap.of());
  AvroReadSupport.setAvroReadSchema(configuration, ParquetAvro.parquetAvroSchema(avroReadSchema));

  // let the context set up read support metadata, but always use the correct projection
  ReadContext context = null;
  if (callInit) {
    try {
      context = wrapped.init(configuration, keyValueMetaData, projection);
    } catch (UnsupportedOperationException e) {
      // try the InitContext version
      context = wrapped.init(new InitContext(
          configuration, makeMultimap(keyValueMetaData), projection));
    }
  }

  return new ReadContext(projection,
      context != null ? context.getReadSupportMetadata() : ImmutableMap.of());
}
 
示例5
@Override
public Schema getSchema(Configuration conf, Path path) throws IOException {
  AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<>();
  ParquetReader.Builder<GenericRecord> builder = ParquetReader.builder(readSupport, path);
  ParquetReader<GenericRecord> parquetReader = builder.withConf(conf).build();
  GenericRecord record;
  Schema schema = null;
  while ((record = parquetReader.read()) != null) {
    schema = avroData.toConnectSchema(record.getSchema());
  }
  parquetReader.close();
  return schema;
}
 
示例6
@Override
public Collection<Object> readData(Configuration conf, Path path) throws IOException {
  Collection<Object> result = new ArrayList<>();
  AvroReadSupport<GenericRecord> readSupport = new AvroReadSupport<>();
  ParquetReader.Builder<GenericRecord> builder = ParquetReader.builder(readSupport, path);
  ParquetReader<GenericRecord> parquetReader = builder.withConf(conf).build();
  GenericRecord record;
  while ((record = parquetReader.read()) != null) {
    result.add(record);
  }
  parquetReader.close();
  return result;
}
 
示例7
/**
 * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
 * return all the rowkeys.
 *
 * @param filePath      The parquet file path.
 * @param configuration configuration to build fs object
 * @param filter        record keys filter
 * @param readSchema    schema of columns to be read
 * @return Set Set of row keys matching candidateRecordKeys
 */
private static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter,
                                                Schema readSchema) {
  Option<RecordKeysFilterFunction> filterFunction = Option.empty();
  if (filter != null && !filter.isEmpty()) {
    filterFunction = Option.of(new RecordKeysFilterFunction(filter));
  }
  Configuration conf = new Configuration(configuration);
  conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
  AvroReadSupport.setAvroReadSchema(conf, readSchema);
  AvroReadSupport.setRequestedProjection(conf, readSchema);
  Set<String> rowKeys = new HashSet<>();
  try (ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build()) {
    Object obj = reader.read();
    while (obj != null) {
      if (obj instanceof GenericRecord) {
        String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        if (!filterFunction.isPresent() || filterFunction.get().apply(recordKey)) {
          rowKeys.add(recordKey);
        }
      }
      obj = reader.read();
    }
  } catch (IOException e) {
    throw new HoodieIOException("Failed to read row keys from Parquet " + filePath, e);

  }
  // ignore
  return rowKeys;
}
 
示例8
/**
 * Fetch {@link HoodieKey}s from the given parquet file.
 *
 * @param filePath      The parquet file path.
 * @param configuration configuration to build fs object
 * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
 */
public static List<HoodieKey> fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) {
  List<HoodieKey> hoodieKeys = new ArrayList<>();
  try {
    if (!filePath.getFileSystem(configuration).exists(filePath)) {
      return new ArrayList<>();
    }

    Configuration conf = new Configuration(configuration);
    conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
    Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
    AvroReadSupport.setAvroReadSchema(conf, readSchema);
    AvroReadSupport.setRequestedProjection(conf, readSchema);
    ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build();
    Object obj = reader.read();
    while (obj != null) {
      if (obj instanceof GenericRecord) {
        String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
        String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
        hoodieKeys.add(new HoodieKey(recordKey, partitionPath));
        obj = reader.read();
      }
    }
  } catch (IOException e) {
    throw new HoodieIOException("Failed to read from Parquet file " + filePath, e);
  }
  return hoodieKeys;
}
 
示例9
protected JavaRDD<HoodieRecord<HoodieRecordPayload>> buildHoodieRecordsForImport(JavaSparkContext jsc,
    String schemaStr) throws IOException {
  Job job = Job.getInstance(jsc.hadoopConfiguration());
  // Allow recursive directories to be found
  job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
  // To parallelize reading file status.
  job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, "1024");
  AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr)));
  ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class));

  return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class,
          job.getConfiguration())
      // To reduce large number of tasks.
      .coalesce(16 * cfg.parallelism).map(entry -> {
        GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
        Object partitionField = genericRecord.get(cfg.partitionKey);
        if (partitionField == null) {
          throw new HoodieIOException("partition key is missing. :" + cfg.partitionKey);
        }
        Object rowField = genericRecord.get(cfg.rowKey);
        if (rowField == null) {
          throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
        }
        String partitionPath = partitionField.toString();
        LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
        if (partitionField instanceof Number) {
          try {
            long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
            partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
          } catch (NumberFormatException nfe) {
            LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
          }
        }
        return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath),
            new HoodieJsonPayload(genericRecord.toString()));
      });
}
 
示例10
private static void setConfigProperties(Configuration conf, Format format,
                                        Schema schema, Class<?> type) {
  GenericData model = DataModelUtil.getDataModelForType(type);
  if (Formats.AVRO.equals(format)) {
    setModel.invoke(conf, model.getClass());
    conf.set(AVRO_SCHEMA_INPUT_KEY, schema.toString());

  } else if (Formats.PARQUET.equals(format)) {
    // TODO: update to a version of Parquet with setAvroDataSupplier
    //AvroReadSupport.setAvroDataSupplier(conf,
    //    DataModelUtil.supplierClassFor(model));
    AvroReadSupport.setAvroReadSchema(conf, schema);
  }
}
 
示例11
@Override
@SuppressWarnings("deprecation")
public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
  // Columns are selected from the Parquet file by taking the read context's message type and
  // matching to the file's columns by full path, so this must select columns by using the path
  // in the file's schema.

  MessageType projection;
  if (ParquetSchemaUtil.hasIds(fileSchema)) {
    projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
  } else if (nameMapping != null) {
    MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping);
    projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
  } else {
    projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
  }

  // override some known backward-compatibility options
  configuration.set("parquet.strict.typing", "false");
  configuration.set("parquet.avro.add-list-element-records", "false");
  configuration.set("parquet.avro.write-old-list-structure", "false");

  // set Avro schemas in case the reader is Avro
  AvroReadSupport.setRequestedProjection(configuration,
      AvroSchemaUtil.convert(expectedSchema, projection.getName()));
  org.apache.avro.Schema avroReadSchema = AvroSchemaUtil.buildAvroProjection(
      AvroSchemaUtil.convert(ParquetSchemaUtil.convert(projection), projection.getName()),
      expectedSchema, ImmutableMap.of());
  AvroReadSupport.setAvroReadSchema(configuration, ParquetAvro.parquetAvroSchema(avroReadSchema));

  // let the context set up read support metadata, but always use the correct projection
  ReadContext context = null;
  if (callInit) {
    try {
      context = wrapped.init(configuration, keyValueMetaData, projection);
    } catch (UnsupportedOperationException e) {
      // try the InitContext version
      context = wrapped.init(new InitContext(
          configuration, makeMultimap(keyValueMetaData), projection));
    }
  }

  return new ReadContext(projection,
      context != null ? context.getReadSupportMetadata() : ImmutableMap.of());
}
 
示例12
@Override
public Iterator<R> getRecordIterator(Schema schema) throws IOException {
  AvroReadSupport.setAvroReadSchema(conf, schema);
  ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
  return new ParquetReaderIterator(reader);
}