Java源码示例:org.apache.flink.runtime.state.ttl.TtlStateFactory
示例1
/**
* @see KeyedStateBackend
*/
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(keySerializerProvider, "State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
示例2
@Override
@SuppressWarnings("unchecked")
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
stateSnapshotFilters.put(stateDesc.getName(),
(StateSnapshotTransformer<Object>) getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
.computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
.computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
return state;
}
示例3
/**
* @see KeyedStateBackend
*/
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
示例4
@Override
@SuppressWarnings("unchecked")
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
stateSnapshotFilters.put(stateDesc.getName(),
(StateSnapshotTransformer<Object>) getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
.computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
.computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
return state;
}
示例5
/**
* @see KeyedStateBackend
*/
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}
示例6
@Override
@SuppressWarnings("unchecked")
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
stateSnapshotFilters.put(stateDesc.getName(),
(StateSnapshotTransformer<Object>) getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
.computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
.computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
return state;
}
示例7
public void setAndRegisterCompactFilterIfStateTtl(
@Nonnull RegisteredStateMetaInfoBase metaInfoBase,
@Nonnull ColumnFamilyOptions options) {
if (enableTtlCompactionFilter && metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) {
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
}
}
}
示例8
public void setAndRegisterCompactFilterIfStateTtl(
@Nonnull RegisteredStateMetaInfoBase metaInfoBase,
@Nonnull ColumnFamilyOptions options) {
if (enableTtlCompactionFilter && metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) {
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
}
}
}
示例9
public void setAndRegisterCompactFilterIfStateTtl(
@Nonnull RegisteredStateMetaInfoBase metaInfoBase,
@Nonnull ColumnFamilyOptions options) {
if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) {
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
}
}
}