Java源码示例:org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider

示例1
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
	Serializable[][] queryParameters = new String[2][1];
	queryParameters[0] = new String[]{TEST_DATA[3].author};
	queryParameters[1] = new String[]{TEST_DATA[0].author};
	ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(paramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every queryParameters row)
	Assert.assertEquals(queryParameters.length, splits.length);

	verifySplit(splits[0], TEST_DATA[3].id);
	verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);

	jdbcInputFormat.closeInputFormat();
}
 
示例2
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
	Serializable[][] queryParameters = new String[2][1];
	queryParameters[0] = new String[]{TEST_DATA[3].author};
	queryParameters[1] = new String[]{TEST_DATA[0].author};
	ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(paramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every queryParameters row)
	Assert.assertEquals(queryParameters.length, splits.length);

	verifySplit(splits[0], TEST_DATA[3].id);
	verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);

	jdbcInputFormat.closeInputFormat();
}
 
示例3
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
	Serializable[][] queryParameters = new String[2][1];
	queryParameters[0] = new String[]{TEST_DATA[3].author};
	queryParameters[1] = new String[]{TEST_DATA[0].author};
	ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
			.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
			.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(paramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every queryParameters row)
	Assert.assertEquals(queryParameters.length, splits.length);

	verifySplit(splits[0], TEST_DATA[3].id);
	verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);

	jdbcInputFormat.closeInputFormat();
}
 
示例4
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
	final int fetchSize = 1;
	final long min = TEST_DATA[0].id;
	final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
	ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(pramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every id)
	Assert.assertEquals(TEST_DATA.length, splits.length);
	int recordCount = 0;
	Row row =  new Row(5);
	for (InputSplit split : splits) {
		jdbcInputFormat.open(split);
		while (!jdbcInputFormat.reachedEnd()) {
			Row next = jdbcInputFormat.nextRecord(row);

			assertEquals(TEST_DATA[recordCount], next);

			recordCount++;
		}
		jdbcInputFormat.close();
	}
	jdbcInputFormat.closeInputFormat();
	Assert.assertEquals(TEST_DATA.length, recordCount);
}
 
示例5
@Test
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
	final long min = TEST_DATA[0].id;
	final long max = TEST_DATA[TEST_DATA.length - 1].id;
	final long fetchSize = max + 1; //generate a single split
	ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(pramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//assert that a single split was generated
	Assert.assertEquals(1, splits.length);
	int recordCount = 0;
	Row row =  new Row(5);
	for (InputSplit split : splits) {
		jdbcInputFormat.open(split);
		while (!jdbcInputFormat.reachedEnd()) {
			Row next = jdbcInputFormat.nextRecord(row);

			assertEquals(TEST_DATA[recordCount], next);

			recordCount++;
		}
		jdbcInputFormat.close();
	}
	jdbcInputFormat.closeInputFormat();
	Assert.assertEquals(TEST_DATA.length, recordCount);
}
 
示例6
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
	final int fetchSize = 1;
	final long min = TEST_DATA[0].id;
	final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
	ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(pramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every id)
	Assert.assertEquals(TEST_DATA.length, splits.length);
	int recordCount = 0;
	Row row =  new Row(5);
	for (InputSplit split : splits) {
		jdbcInputFormat.open(split);
		while (!jdbcInputFormat.reachedEnd()) {
			Row next = jdbcInputFormat.nextRecord(row);

			assertEquals(TEST_DATA[recordCount], next);

			recordCount++;
		}
		jdbcInputFormat.close();
	}
	jdbcInputFormat.closeInputFormat();
	Assert.assertEquals(TEST_DATA.length, recordCount);
}
 
示例7
@Test
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
	final long min = TEST_DATA[0].id;
	final long max = TEST_DATA[TEST_DATA.length - 1].id;
	final long fetchSize = max + 1; //generate a single split
	ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DRIVER_CLASS)
			.setDBUrl(DB_URL)
			.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(pramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//assert that a single split was generated
	Assert.assertEquals(1, splits.length);
	int recordCount = 0;
	Row row =  new Row(5);
	for (InputSplit split : splits) {
		jdbcInputFormat.open(split);
		while (!jdbcInputFormat.reachedEnd()) {
			Row next = jdbcInputFormat.nextRecord(row);

			assertEquals(TEST_DATA[recordCount], next);

			recordCount++;
		}
		jdbcInputFormat.close();
	}
	jdbcInputFormat.closeInputFormat();
	Assert.assertEquals(TEST_DATA.length, recordCount);
}
 
示例8
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
	final int fetchSize = 1;
	final long min = TEST_DATA[0].id;
	final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
	ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
			.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
			.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(pramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//this query exploit parallelism (1 split for every id)
	Assert.assertEquals(TEST_DATA.length, splits.length);
	int recordCount = 0;
	Row row =  new Row(5);
	for (InputSplit split : splits) {
		jdbcInputFormat.open(split);
		while (!jdbcInputFormat.reachedEnd()) {
			Row next = jdbcInputFormat.nextRecord(row);

			assertEquals(TEST_DATA[recordCount], next);

			recordCount++;
		}
		jdbcInputFormat.close();
	}
	jdbcInputFormat.closeInputFormat();
	Assert.assertEquals(TEST_DATA.length, recordCount);
}
 
示例9
@Test
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
	final long min = TEST_DATA[0].id;
	final long max = TEST_DATA[TEST_DATA.length - 1].id;
	final long fetchSize = max + 1; //generate a single split
	ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
	jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
			.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
			.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
			.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID)
			.setRowTypeInfo(ROW_TYPE_INFO)
			.setParametersProvider(pramProvider)
			.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
			.finish();

	jdbcInputFormat.openInputFormat();
	InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
	//assert that a single split was generated
	Assert.assertEquals(1, splits.length);
	int recordCount = 0;
	Row row =  new Row(5);
	for (InputSplit split : splits) {
		jdbcInputFormat.open(split);
		while (!jdbcInputFormat.reachedEnd()) {
			Row next = jdbcInputFormat.nextRecord(row);

			assertEquals(TEST_DATA[recordCount], next);

			recordCount++;
		}
		jdbcInputFormat.close();
	}
	jdbcInputFormat.closeInputFormat();
	Assert.assertEquals(TEST_DATA.length, recordCount);
}
 
示例10
public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
	format.parameterValues = parameterValuesProvider.getParameterValues();
	return this;
}
 
示例11
public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
	format.parameterValues = parameterValuesProvider.getParameterValues();
	return this;
}