Java源码示例:org.apache.flink.table.factories.TableSourceFactory

示例1
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
	TableSource<?> tableSource;
	Optional<TableFactory> tableFactory = catalog.getTableFactory();
	if (tableFactory.isPresent()) {
		TableFactory tf = tableFactory.get();
		if (tf instanceof TableSourceFactory) {
			tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
		} else {
			throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
				catalog.getClass()));
		}
	} else {
		tableSource = TableFactoryUtil.findAndCreateTableSource(table);
	}

	if (!(tableSource instanceof StreamTableSource)) {
		throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
	}

	return new TableSourceTable<>(
		tableSource,
		!((StreamTableSource<?>) tableSource).isBounded(),
		FlinkStatistic.UNKNOWN()
	);
}
 
示例2
@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
	CatalogTable table = checkNotNull(context.getTable());
	Preconditions.checkArgument(table instanceof CatalogTableImpl);

	boolean isGeneric = Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

	if (!isGeneric) {
		return new HiveTableSource(
				new JobConf(hiveConf),
				context.getConfiguration(),
				context.getObjectIdentifier().toObjectPath(),
				table);
	} else {
		return TableFactoryUtil.findAndCreateTableSource(context);
	}
}
 
示例3
private Optional<TableSource<?>> findAndCreateTableSource() {
	Optional<TableSource<?>> tableSource = Optional.empty();
	try {
		if (lookupResult.getTable() instanceof CatalogTable) {
			// Use an empty config for TableSourceFactoryContextImpl since we can't fetch the
			// actual TableConfig here. And currently the empty config do not affect the logic.
			ReadableConfig config = new Configuration();
			TableSourceFactory.Context context =
				new TableSourceFactoryContextImpl(tableIdentifier, (CatalogTable) lookupResult.getTable(), config);
			TableSource<?> source = TableFactoryUtil.findAndCreateTableSource(context);
			if (source instanceof StreamTableSource) {
				if (!isStreamingMode && !((StreamTableSource<?>) source).isBounded()) {
					throw new ValidationException("Cannot query on an unbounded source in batch mode, but " +
						tableIdentifier.asSummaryString() + " is unbounded.");
				}
				tableSource = Optional.of(source);
			} else {
				throw new ValidationException("Catalog tables only support " +
					"StreamTableSource and InputFormatTableSource.");
			}
		}
	} catch (Exception e) {
		tableSource = Optional.empty();
	}
	return tableSource;
}
 
示例4
private Table convertCatalogTable(ObjectPath tablePath, CatalogTable table) {
	TableSource<?> tableSource;
	Optional<TableFactory> tableFactory = catalog.getTableFactory();
	if (tableFactory.isPresent()) {
		TableFactory tf = tableFactory.get();
		if (tf instanceof TableSourceFactory) {
			tableSource = ((TableSourceFactory) tf).createTableSource(tablePath, table);
		} else {
			throw new TableException(String.format("Cannot query a sink-only table. TableFactory provided by catalog %s must implement TableSourceFactory",
				catalog.getClass()));
		}
	} else {
		tableSource = TableFactoryUtil.findAndCreateTableSource(table);
	}

	if (!(tableSource instanceof StreamTableSource)) {
		throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
	}

	return new TableSourceTable<>(
		tableSource,
		// this means the TableSource extends from StreamTableSource, this is needed for the
		// legacy Planner. Blink Planner should use the information that comes from the TableSource
		// itself to determine if it is a streaming or batch source.
		isStreamingMode,
		FlinkStatistic.UNKNOWN()
	);
}
 
示例5
private void testSourceConfig(boolean fallbackMR, boolean inferParallelism) throws Exception {
	HiveTableFactory tableFactorySpy = spy((HiveTableFactory) hiveCatalog.getTableFactory().get());

	doAnswer(invocation -> {
		TableSourceFactory.Context context = invocation.getArgument(0);
		return new TestConfigSource(
				new JobConf(hiveCatalog.getHiveConf()),
				context.getConfiguration(),
				context.getObjectIdentifier().toObjectPath(),
				context.getTable(),
				fallbackMR,
				inferParallelism);
	}).when(tableFactorySpy).createTableSource(any(TableSourceFactory.Context.class));

	HiveCatalog catalogSpy = spy(hiveCatalog);
	doReturn(Optional.of(tableFactorySpy)).when(catalogSpy).getTableFactory();

	TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
	tableEnv.getConfig().getConfiguration().setBoolean(
			HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, fallbackMR);
	tableEnv.getConfig().getConfiguration().setBoolean(
			HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, inferParallelism);
	tableEnv.getConfig().getConfiguration().setInteger(
			ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
	tableEnv.registerCatalog(catalogSpy.getName(), catalogSpy);
	tableEnv.useCatalog(catalogSpy.getName());

	List<Row> results = Lists.newArrayList(
			tableEnv.sqlQuery("select * from db1.src order by x").execute().collect());
	assertEquals("[1,a, 2,b]", results.toString());
}
 
示例6
@Override
public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
	Configuration conf = new Configuration();
	context.getTable().getOptions().forEach(conf::setString);

	return new FileSystemTableSource(
			context.getTable().getSchema(),
			getPath(conf),
			context.getTable().getPartitionKeys(),
			conf.get(PARTITION_DEFAULT_NAME),
			context.getTable().getProperties());
}
 
示例7
private Table convertCatalogTable(
		ObjectIdentifier identifier,
		CatalogTable table,
		TableSchema resolvedSchema,
		@Nullable TableFactory tableFactory) {
	final TableSource<?> tableSource;
	final TableSourceFactory.Context context = new TableSourceFactoryContextImpl(
			identifier, table, tableConfig.getConfiguration());
	if (tableFactory != null) {
		if (tableFactory instanceof TableSourceFactory) {
			tableSource = ((TableSourceFactory<?>) tableFactory).createTableSource(context);
		} else {
			throw new TableException(
				"Cannot query a sink-only table. TableFactory provided by catalog must implement TableSourceFactory");
		}
	} else {
		tableSource = TableFactoryUtil.findAndCreateTableSource(context);
	}

	if (!(tableSource instanceof StreamTableSource)) {
		throw new TableException("Catalog tables support only StreamTableSource and InputFormatTableSource");
	}

	return new TableSourceTable<>(
		resolvedSchema,
		tableSource,
		// this means the TableSource extends from StreamTableSource, this is needed for the
		// legacy Planner. Blink Planner should use the information that comes from the TableSource
		// itself to determine if it is a streaming or batch source.
		isStreamingMode,
		FlinkStatistic.UNKNOWN()
	);
}
 
示例8
@Override
public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
	TableSchema schema = context.getTable().getSchema();
	final DescriptorProperties params = new DescriptorProperties(true);
	params.putProperties(context.getTable().toProperties());
	final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
	final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
	return new TestTableSource(
		schema,
		context.getTable().getProperties().get(testProperty),
		proctime.orElse(null),
		rowtime);
}