Java源码示例:org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper

示例1
/**
 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
 */
private void restoreKVStateData() throws IOException, RocksDBException {
	//for all key-groups in the current state handle...
	try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db)) {
		for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
			int keyGroup = keyGroupOffset.f0;

			// Check that restored key groups all belong to the backend
			Preconditions.checkState(keyGroupRange.contains(keyGroup),
				"The key group must belong to the backend");

			long offset = keyGroupOffset.f1;
			//not empty key-group?
			if (0L != offset) {
				currentStateHandleInStream.seek(offset);
				try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
					DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
					int kvStateId = compressedKgInputView.readShort();
					ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
					//insert all k/v pairs into DB
					boolean keyGroupHasMoreKeys = true;
					while (keyGroupHasMoreKeys) {
						byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
						byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
						if (hasMetaDataFollowsFlag(key)) {
							//clear the signal bit in the key to make it ready for insertion again
							clearMetaDataFollowsFlag(key);
							writeBatchWrapper.put(handle, key, value);
							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
							kvStateId = END_OF_KEY_GROUP_MARK
								& compressedKgInputView.readShort();
							if (END_OF_KEY_GROUP_MARK == kvStateId) {
								keyGroupHasMoreKeys = false;
							} else {
								handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
							}
						} else {
							writeBatchWrapper.put(handle, key, value);
						}
					}
				}
			}
		}
	}
}
 
示例2
/**
 * Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
 * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
 * real restore instance and then the temporary instance is discarded.
 */
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {

	// Prepare for restore with rescaling
	KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
		restoreStateHandles, keyGroupRange);

	// Init base DB instance
	if (initialHandle != null) {
		restoreStateHandles.remove(initialHandle);
		initDBWithRescaling(initialHandle);
	} else {
		openDB();
	}

	// Transfer remaining key-groups from temporary instance into base DB
	byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
	RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

	byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
	RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

	for (KeyedStateHandle rawStateHandle : restoreStateHandles) {

		if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
			throw new IllegalStateException("Unexpected state handle type, " +
				"expected " + IncrementalRemoteKeyedStateHandle.class +
				", but found " + rawStateHandle.getClass());
		}

		Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
		try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
			(IncrementalRemoteKeyedStateHandle) rawStateHandle,
			temporaryRestoreInstancePath);
			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {

			List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
			List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;

			// iterating only the requested descriptors automatically skips the default column family handle
			for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
				ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);

				ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
					null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
					.columnFamilyHandle;

				try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {

					iterator.seek(startKeyGroupPrefixBytes);

					while (iterator.isValid()) {

						if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
							writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
						} else {
							// Since the iterator will visit the record according to the sorted order,
							// we can just break here.
							break;
						}

						iterator.next();
					}
				} // releases native iterator resources
			}
		} finally {
			cleanUpPathQuietly(temporaryRestoreInstancePath);
		}
	}
}
 
示例3
/**
 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
 */
private void restoreKVStateData() throws IOException, RocksDBException {
	//for all key-groups in the current state handle...
	try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db)) {
		for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
			int keyGroup = keyGroupOffset.f0;

			// Check that restored key groups all belong to the backend
			Preconditions.checkState(keyGroupRange.contains(keyGroup),
				"The key group must belong to the backend");

			long offset = keyGroupOffset.f1;
			//not empty key-group?
			if (0L != offset) {
				currentStateHandleInStream.seek(offset);
				try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
					DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
					int kvStateId = compressedKgInputView.readShort();
					ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
					//insert all k/v pairs into DB
					boolean keyGroupHasMoreKeys = true;
					while (keyGroupHasMoreKeys) {
						byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
						byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
						if (hasMetaDataFollowsFlag(key)) {
							//clear the signal bit in the key to make it ready for insertion again
							clearMetaDataFollowsFlag(key);
							writeBatchWrapper.put(handle, key, value);
							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
							kvStateId = END_OF_KEY_GROUP_MARK
								& compressedKgInputView.readShort();
							if (END_OF_KEY_GROUP_MARK == kvStateId) {
								keyGroupHasMoreKeys = false;
							} else {
								handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
							}
						} else {
							writeBatchWrapper.put(handle, key, value);
						}
					}
				}
			}
		}
	}
}
 
示例4
/**
 * Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
 * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
 * real restore instance and then the temporary instance is discarded.
 */
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {

	// Prepare for restore with rescaling
	KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
		restoreStateHandles, keyGroupRange);

	// Init base DB instance
	if (initialHandle != null) {
		restoreStateHandles.remove(initialHandle);
		initDBWithRescaling(initialHandle);
	} else {
		openDB();
	}

	// Transfer remaining key-groups from temporary instance into base DB
	byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
	RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

	byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
	RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

	for (KeyedStateHandle rawStateHandle : restoreStateHandles) {

		if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
			throw new IllegalStateException("Unexpected state handle type, " +
				"expected " + IncrementalRemoteKeyedStateHandle.class +
				", but found " + rawStateHandle.getClass());
		}

		Path temporaryRestoreInstancePath = new Path(instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString());
		try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
			(IncrementalRemoteKeyedStateHandle) rawStateHandle,
			temporaryRestoreInstancePath);
			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db)) {

			List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
			List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;

			// iterating only the requested descriptors automatically skips the default column family handle
			for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
				ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);

				ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
					null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
					.columnFamilyHandle;

				try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {

					iterator.seek(startKeyGroupPrefixBytes);

					while (iterator.isValid()) {

						if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
							writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
						} else {
							// Since the iterator will visit the record according to the sorted order,
							// we can just break here.
							break;
						}

						iterator.next();
					}
				} // releases native iterator resources
			}
		} finally {
			cleanUpPathQuietly(temporaryRestoreInstancePath);
		}
	}
}
 
示例5
/**
 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
 */
private void restoreKVStateData() throws IOException, RocksDBException {
	//for all key-groups in the current state handle...
	try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
		for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
			int keyGroup = keyGroupOffset.f0;

			// Check that restored key groups all belong to the backend
			Preconditions.checkState(keyGroupRange.contains(keyGroup),
				"The key group must belong to the backend");

			long offset = keyGroupOffset.f1;
			//not empty key-group?
			if (0L != offset) {
				currentStateHandleInStream.seek(offset);
				try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
					DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
					int kvStateId = compressedKgInputView.readShort();
					ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
					//insert all k/v pairs into DB
					boolean keyGroupHasMoreKeys = true;
					while (keyGroupHasMoreKeys) {
						byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
						byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
						if (hasMetaDataFollowsFlag(key)) {
							//clear the signal bit in the key to make it ready for insertion again
							clearMetaDataFollowsFlag(key);
							writeBatchWrapper.put(handle, key, value);
							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
							kvStateId = END_OF_KEY_GROUP_MARK
								& compressedKgInputView.readShort();
							if (END_OF_KEY_GROUP_MARK == kvStateId) {
								keyGroupHasMoreKeys = false;
							} else {
								handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
							}
						} else {
							writeBatchWrapper.put(handle, key, value);
						}
					}
				}
			}
		}
	}
}
 
示例6
/**
 * Recovery from multi incremental states with rescaling. For rescaling, this method creates a temporary
 * RocksDB instance for a key-groups shard. All contents from the temporary instance are copied into the
 * real restore instance and then the temporary instance is discarded.
 */
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {

	// Prepare for restore with rescaling
	KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
		restoreStateHandles, keyGroupRange);

	// Init base DB instance
	if (initialHandle != null) {
		restoreStateHandles.remove(initialHandle);
		initDBWithRescaling(initialHandle);
	} else {
		openDB();
	}

	// Transfer remaining key-groups from temporary instance into base DB
	byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
	RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);

	byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
	RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);

	for (KeyedStateHandle rawStateHandle : restoreStateHandles) {

		if (!(rawStateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
			throw new IllegalStateException("Unexpected state handle type, " +
				"expected " + IncrementalRemoteKeyedStateHandle.class +
				", but found " + rawStateHandle.getClass());
		}

		Path temporaryRestoreInstancePath = instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
		try (RestoredDBInstance tmpRestoreDBInfo = restoreDBInstanceFromStateHandle(
			(IncrementalRemoteKeyedStateHandle) rawStateHandle,
			temporaryRestoreInstancePath);
			RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, writeBatchSize)) {

			List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
			List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;

			// iterating only the requested descriptors automatically skips the default column family handle
			for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
				ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);

				ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterStateColumnFamilyHandle(
					null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
					.columnFamilyHandle;

				try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {

					iterator.seek(startKeyGroupPrefixBytes);

					while (iterator.isValid()) {

						if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(iterator.key(), stopKeyGroupPrefixBytes)) {
							writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
						} else {
							// Since the iterator will visit the record according to the sorted order,
							// we can just break here.
							break;
						}

						iterator.next();
					}
				} // releases native iterator resources
			}
		} finally {
			cleanUpPathQuietly(temporaryRestoreInstancePath);
		}
	}
}