Java源码示例:org.apache.parquet.VersionParser

示例1
/**
 * @param pageReadStore underlying page storage
 * @param recordConverter the user provided converter to materialize records
 * @param schema the schema we are reading
 * @param createdBy writer version string from the Parquet file being read
 */
public ColumnReadStoreImpl(PageReadStore pageReadStore,
                           GroupConverter recordConverter,
                           MessageType schema, String createdBy) {
  super();
  this.pageReadStore = pageReadStore;
  this.recordConverter = recordConverter;
  this.schema = schema;

  ParsedVersion version;
  try {
    version = VersionParser.parse(createdBy);
  } catch (RuntimeException | VersionParseException e) {
    version = null;
  }
  this.writerVersion = version;
}
 
示例2
@Test
public void test() throws Exception {
  MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
  ColumnDescriptor col = schema.getColumns().get(0);
  MemPageWriter pageWriter = new MemPageWriter();
  ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter,
      ParquetProperties.builder()
          .withDictionaryPageSize(1024).withWriterVersion(PARQUET_2_0)
          .withPageSize(2048).build());
  for (int i = 0; i < rows; i++) {
    columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
    if ((i + 1) % 1000 == 0) {
      columnWriterV2.writePage();
    }
  }
  columnWriterV2.writePage();
  columnWriterV2.finalizeColumnChunk();
  List<DataPage> pages = pageWriter.getPages();
  int valueCount = 0;
  int rowCount = 0;
  for (DataPage dataPage : pages) {
    valueCount += dataPage.getValueCount();
    rowCount += ((DataPageV2)dataPage).getRowCount();
  }
  assertEquals(rows, rowCount);
  assertEquals(rows, valueCount);
  MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
  ValidatingConverter converter = new ValidatingConverter();
  ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
  for (int i = 0; i < rows; i++) {
    assertEquals(0, columnReader.getCurrentRepetitionLevel());
    assertEquals(0, columnReader.getCurrentDefinitionLevel());
    columnReader.writeCurrentValueToConverter();
    columnReader.consume();
  }
  assertEquals(rows, converter.count);
}
 
示例3
@Test
public void testOptional() throws Exception {
  MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
  ColumnDescriptor col = schema.getColumns().get(0);
  MemPageWriter pageWriter = new MemPageWriter();
  ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter,
      ParquetProperties.builder()
          .withDictionaryPageSize(1024).withWriterVersion(PARQUET_2_0)
          .withPageSize(2048).build());
  for (int i = 0; i < rows; i++) {
    columnWriterV2.writeNull(0, 0);
    if ((i + 1) % 1000 == 0) {
      columnWriterV2.writePage();
    }
  }
  columnWriterV2.writePage();
  columnWriterV2.finalizeColumnChunk();
  List<DataPage> pages = pageWriter.getPages();
  int valueCount = 0;
  int rowCount = 0;
  for (DataPage dataPage : pages) {
    valueCount += dataPage.getValueCount();
    rowCount += ((DataPageV2)dataPage).getRowCount();
  }
  assertEquals(rows, rowCount);
  assertEquals(rows, valueCount);
  MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
  ValidatingConverter converter = new ValidatingConverter();
  ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
  for (int i = 0; i < rows; i++) {
    assertEquals(0, columnReader.getCurrentRepetitionLevel());
    assertEquals(0, columnReader.getCurrentDefinitionLevel());
    columnReader.consume();
  }
  assertEquals(0, converter.count);
}
 
示例4
/**
 * Check for corrupted dates in a parquet file. See Drill-4203
 */
public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
                                         List<SchemaPath> columns,
                                         boolean autoCorrectCorruptDates) {
  // old drill files have "parquet-mr" as created by string, and no drill version, need to check min/max values to see
  // if they look corrupt
  //  - option to disable this auto-correction based on the date values, in case users are storing these
  //    dates intentionally

  // migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT"

  // new parquet files are generated with "is.date.correct" property have no corruption dates

  String createdBy = footer.getFileMetaData().getCreatedBy();
  String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
  String writerVersionValue = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.WRITER_VERSION_PROPERTY);
  // This flag can be present in parquet files which were generated with 1.9.0-SNAPSHOT and 1.9.0 drill versions.
  // If this flag is present it means that the version of the drill parquet writer is 2
  final String isDateCorrectFlag = "is.date.correct";
  String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(isDateCorrectFlag);
  if (drillVersion != null) {
    int writerVersion = 1;
    if (writerVersionValue != null) {
      writerVersion = Integer.parseInt(writerVersionValue);
    }
    else if (Boolean.valueOf(isDateCorrect)) {
      writerVersion = DRILL_WRITER_VERSION_STD_DATE_FORMAT;
    }
    return writerVersion >= DRILL_WRITER_VERSION_STD_DATE_FORMAT ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
        // loop through parquet column metadata to find date columns, check for corrupt values
        : checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
  } else {
    // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
    // only applies if there is a date column selected
    if (createdBy == null || createdBy.equals("parquet-mr")) {
      return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
    } else {
      // check the created by to see if it is a migrated Drill file
      try {
        VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy);
        // check if this is a migrated Drill file, lacking a Drill version number, but with
        // "drill" in the parquet created-by string
        if (parsedCreatedByVersion.hasSemanticVersion()) {
          SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
          String pre = semVer.pre + "";
          if (semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
            return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
          }
        }
        // written by a tool that wasn't Drill, the dates are not corrupted
        return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
      } catch (VersionParser.VersionParseException e) {
        // If we couldn't parse "created by" field, check column metadata of date columns
        return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
      }
    }
  }
}
 
示例5
/**
 * Check for corrupted dates in a parquet file. See DRILL-4203
 */
public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
                                         List<SchemaPath> columns,
                                         boolean autoCorrectCorruptDates) {
  // old drill files have "parquet-mr" as created by string, and no drill version, need to check min/max values to see
  // if they look corrupt
  //  - option to disable this auto-correction based on the date values, in case users are storing these
  //    dates intentionally

  // migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT"

  // new parquet files are generated with "is.date.correct" property have no corruption dates

  String createdBy = footer.getFileMetaData().getCreatedBy();
  String dremioVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DREMIO_VERSION_PROPERTY);
  String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
  String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.IS_DATE_CORRECT_PROPERTY);
  String writerVersionValue = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.WRITER_VERSION_PROPERTY);
  logger.debug("Detecting corrupt dates for file created by {}, dremio version {}, writer version value {}, auto correct dates {}",
    createdBy, dremioVersion, writerVersionValue, autoCorrectCorruptDates);
  if (dremioVersion != null || drillVersion != null) {
    // File is generated by either Drill >= 1.3.0 or Dremio (all versions)

    if (writerVersionValue != null && Integer.parseInt(writerVersionValue) >= 2) {
      // If Drill parquet writer version is >=2 -> No date corruption.
      //   1. All parquet files written by Drill version >= 1.10.0 (DRILL-4980)
      return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
    }

    if (Boolean.valueOf(isDateCorrect)) {
      // If the footer contains "is.date.correct" -> No date corruption.
      //   1. File generated by Drill 1.9.0 (DRILL-4203) - This property got removed in 1.10.0 (DRILL-4980)
      //   2. All parquet files generated by Dremio
      return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
    }

    // File is generated using Drill >= 1.3.0 and Drill <= 1.9.0
    return DateCorruptionStatus.META_SHOWS_CORRUPTION;
  } else {
    // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
    // only applies if there is a date column selected
    if (createdBy == null || createdBy.equals("parquet-mr")) {
      // loop through parquet column metadata to find date columns, check for corrupt values
      return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
    } else {
      // check the created by to see if it is a migrated Drill file
      try {
        VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy);
        // check if this is a migrated Drill file, lacking a Drill version number, but with
        // "drill" in the parquet created-by string
        if (parsedCreatedByVersion.hasSemanticVersion()) {
          SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
          String pre = semVer.pre + "";
          if (semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
            return DateCorruptionStatus.META_SHOWS_CORRUPTION;
          }
        }
        // written by a tool that wasn't Drill, the dates are not corrupted
        return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
      } catch (VersionParser.VersionParseException e) {
        // If we couldn't parse "created by" field, check column metadata of date columns
        return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
      }
    }
  }
}