Java源码示例:org.apache.kafka.streams.TopologyDescription

示例1
@Test void should_doNothing_whenAllDisabled() {
  // Given: configs
  Duration traceTtl = Duration.ofMillis(5);
  Duration traceTtlCheckInterval = Duration.ofMinutes(1);
  List<String> autocompleteKeys = Collections.singletonList("environment");
  // When: topology provided
  Topology topology = new TraceStorageTopology(
      spansTopic,
      autocompleteKeys,
      traceTtl,
      traceTtlCheckInterval,
      0,
      false,
      false).get();
  TopologyDescription description = topology.describe();
  // Then:
  assertThat(description.subtopologies()).hasSize(0);
  // Given: streams config
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  testDriver.close();
}
 
示例2
@Test void should_doNothing_whenDisabled() {
  // Given: configs
  Duration dependenciesRetentionPeriod = Duration.ofMinutes(1);
  Duration dependenciesWindowSize = Duration.ofMillis(100);
  // When: topology created
  Topology topology = new DependencyStorageTopology(
      dependencyTopic,
      dependenciesRetentionPeriod,
      dependenciesWindowSize,
      false).get();
  TopologyDescription description = topology.describe();
  // Then: topology with 1 thread
  assertThat(description.subtopologies()).hasSize(0);
  // Given: streams configuration
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  testDriver.close();
}
 
示例3
@Test void should_doNothing_whenAggregationDisabled() {
  Duration traceTimeout = Duration.ofSeconds(1);
  Topology topology = new SpanAggregationTopology(
      spansTopic,
      traceTopic,
      dependencyTopic,
      traceTimeout,
      false).get();
  TopologyDescription description = topology.describe();
  // Then: single threaded topology
  assertThat(description.subtopologies()).hasSize(0);
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  testDriver.close();
}
 
示例4
public void start() {
    this.schemaRegistry.start();
    this.properties
            .setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl());
    try {
        this.stateDirectory = Files.createTempDirectory("fluent-kafka-streams");
    } catch (final IOException e) {
        throw new UncheckedIOException("Cannot create temporary state directory", e);
    }
    this.properties.setProperty(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath().toString());
    final Topology topology = this.topologyFactory.apply(this.properties);
    this.testDriver = new TopologyTestDriver(topology, this.properties);

    this.inputTopics.clear();
    this.outputTopics.clear();

    for (final TopologyDescription.Subtopology subtopology : topology.describe().subtopologies()) {
        for (final TopologyDescription.Node node : subtopology.nodes()) {
            if (node instanceof TopologyDescription.Source) {
                for (final String topic : ((Source) node).topicSet()) {
                    addExternalTopics(this.inputTopics, topic);
                }
            } else if (node instanceof TopologyDescription.Sink) {
                addExternalTopics(this.outputTopics, ((TopologyDescription.Sink) node).topic());
            }
        }
    }

    for (final GlobalStore store : topology.describe().globalStores()) {
        store.source().topicSet().forEach(name -> addExternalTopics(this.inputTopics, name));
    }
}
 
示例5
@Test
public void shouldBuildSourceNode() {
  setupTopicClientExpectations(1, 1);
  buildJoin();
  final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
  final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(node.predecessors(), equalTo(Collections.emptySet()));
  assertThat(successors, equalTo(Collections.singletonList(MAPVALUES_NODE)));
  assertThat(node.topics(), equalTo("[test2]"));
}
 
示例6
@Test
public void shouldHaveLeftJoin() {
  setupTopicClientExpectations(1, 1);
  buildJoin();
  final Topology topology = builder.build();
  final TopologyDescription.Processor leftJoin
      = (TopologyDescription.Processor) getNodeByName(topology, "KSTREAM-LEFTJOIN-0000000014");
  final List<String> predecessors = leftJoin.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(leftJoin.stores(), equalTo(Utils.mkSet("KSTREAM-REDUCE-STATE-STORE-0000000003")));
  assertThat(predecessors, equalTo(Collections.singletonList("KSTREAM-SOURCE-0000000013")));
}
 
示例7
@Test
public void shouldBuildSourceNode() throws Exception {
  final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(SOURCE_NODE);
  final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(node.predecessors(), equalTo(Collections.emptySet()));
  assertThat(successors, equalTo(Collections.singletonList(SOURCE_MAPVALUES_NODE)));
  assertThat(node.topics(), equalTo("[test1]"));
}
 
示例8
@Test
public void shouldBuildSourceNode() {
  final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), PlanTestUtil.SOURCE_NODE);
  final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(node.predecessors(), equalTo(Collections.emptySet()));
  assertThat(successors, equalTo(Collections.singletonList(PlanTestUtil.MAPVALUES_NODE)));
  assertThat(node.topics(), equalTo("[topic]"));
}
 
示例9
@Test

  public void shouldBuildSourceNode() {
    build();
    final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
    final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
    assertThat(node.predecessors(), equalTo(Collections.emptySet()));
    assertThat(successors, equalTo(Collections.singletonList(MAPVALUES_NODE)));
    assertThat(node.topics(), equalTo("[test1]"));
  }
 
示例10
@Test
public void shouldHaveTwoSubTopologies() {
  // We always require rekey at the moment.
  buildRequireRekey();
  final TopologyDescription description = builder.build().describe();
  assertThat(description.subtopologies().size(), equalTo(2));
}
 
示例11
@Test
public void shouldHaveSourceNodeForSecondSubtopolgy() {
  buildRequireRekey();
  final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010");
  final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(node.predecessors(), equalTo(Collections.emptySet()));
  assertThat(successors, equalTo(Collections.singletonList("KSTREAM-AGGREGATE-0000000007")));
  assertThat(node.topics(), containsString("[KSTREAM-AGGREGATE-STATE-STORE-0000000006"));
  assertThat(node.topics(), containsString("-repartition]"));
}
 
示例12
@Test
public void shouldHaveSinkNodeWithSameTopicAsSecondSource() {
  buildRequireRekey();
  TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), "KSTREAM-SINK-0000000008");
  final TopologyDescription.Source source = (TopologyDescription.Source) getNodeByName(builder.build(), "KSTREAM-SOURCE-0000000010");
  assertThat(sink.successors(), equalTo(Collections.emptySet()));
  assertThat("[" + sink.topic() + "]", equalTo(source.topics()));
}
 
示例13
@Test
public void shouldBuildSourceNode() {
  final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(builder.build(), SOURCE_NODE);
  final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(node.predecessors(), equalTo(Collections.emptySet()));
  assertThat(successors, equalTo(Collections.singletonList(MAPVALUES_NODE)));
  assertThat(node.topics(), equalTo("[input]"));
}
 
示例14
@Test
public void shouldBuildOutputNode() {
  final TopologyDescription.Sink sink = (TopologyDescription.Sink) getNodeByName(builder.build(), OUTPUT_NODE);
  final List<String> predecessors = sink.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(sink.successors(), equalTo(Collections.emptySet()));
  assertThat(predecessors, equalTo(Collections.singletonList(MAPVALUES_OUTPUT_NODE)));
  assertThat(sink.topic(), equalTo("output"));
}
 
示例15
static TopologyDescription.Node getNodeByName(final Topology topology, final String nodeName) {
  final TopologyDescription description = topology.describe();
  final Set<TopologyDescription.Subtopology> subtopologies = description.subtopologies();
  List<TopologyDescription.Node> nodes = subtopologies.stream().flatMap(subtopology -> subtopology.nodes().stream()).collect(Collectors.toList());
  final Map<String, List<TopologyDescription.Node>> nodesByName = nodes.stream().collect(Collectors.groupingBy(TopologyDescription.Node::name));
  return nodesByName.get(nodeName).get(0);
}
 
示例16
static void verifyProcessorNode(final TopologyDescription.Processor node,
                                 final List<String> expectedPredecessors,
                                 final List<String> expectedSuccessors) {
  final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  final List<String> predecessors = node.predecessors().stream().map(TopologyDescription.Node::name).collect(Collectors.toList());
  assertThat(predecessors, equalTo(expectedPredecessors));
  assertThat(successors, equalTo(expectedSuccessors));
}
 
示例17
@Test void should_persistSpans_and_onlyQueryTraces_whenEnabled() {
  // Given: configs
  Duration traceTtl = Duration.ofMillis(5);
  Duration traceTtlCheckInterval = Duration.ofMinutes(1);
  List<String> autocompleteKeys = Collections.singletonList("environment");
  SpansSerde spansSerde = new SpansSerde();
  // When: topology provided
  Topology topology = new TraceStorageTopology(
      spansTopic,
      autocompleteKeys,
      traceTtl,
      traceTtlCheckInterval,
      0,
      true,
      false).get();
  TopologyDescription description = topology.describe();
  // Then: 1 thread prepared
  assertThat(description.subtopologies()).hasSize(1);
  // Given: streams config
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  // When: a trace is passed
  ConsumerRecordFactory<String, List<Span>> factory =
      new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
  Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
      .timestamp(10000L).duration(11L)
      .putTag("environment", "dev")
      .build();
  Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
      .timestamp(10000L).duration(10L)
      .build();
  Span c = Span.newBuilder().traceId("c").id("c").name("op_a").kind(Span.Kind.CLIENT)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
      .timestamp(10000L).duration(11L)
      .putTag("environment", "dev")
      .build();
  List<Span> spans = Arrays.asList(a, b, c);
  testDriver.pipeInput(factory.create(spansTopic, a.traceId(), spans, 10L));
  // Then: trace stores are filled
  KeyValueStore<String, List<Span>> traces = testDriver.getKeyValueStore(TRACES_STORE_NAME);
  assertThat(traces.get(a.traceId())).containsExactlyElementsOf(spans);
  KeyValueStore<Long, Set<String>> spanIdsByTs =
      testDriver.getKeyValueStore(SPAN_IDS_BY_TS_STORE_NAME);
  KeyValueIterator<Long, Set<String>> ids = spanIdsByTs.all();
  assertThat(ids).hasNext();
  assertThat(ids.next().value).containsExactly(a.traceId());
  // Then: service name stores are filled
  KeyValueStore<String, String> serviceNames =
      testDriver.getKeyValueStore(SERVICE_NAMES_STORE_NAME);
  assertThat(serviceNames).isNull();
  KeyValueStore<String, Set<String>> spanNames =
      testDriver.getKeyValueStore(SPAN_NAMES_STORE_NAME);
  assertThat(spanNames).isNull();
  KeyValueStore<String, Set<String>> autocompleteTags =
      testDriver.getKeyValueStore(AUTOCOMPLETE_TAGS_STORE_NAME);
  assertThat(autocompleteTags).isNull();
  // Finally close resources
  testDriver.close();
  spansSerde.close();
}
 
示例18
@Test void should_persistSpans_and_searchQueryTraces_whenAllEnabled() {
  // Given: configs
  Duration traceTtl = Duration.ofMillis(5);
  Duration traceTtlCheckInterval = Duration.ofMinutes(1);
  List<String> autocompleteKeys = Collections.singletonList("environment");
  SpansSerde spansSerde = new SpansSerde();
  // When: topology provided
  Topology topology = new TraceStorageTopology(
      spansTopic,
      autocompleteKeys,
      traceTtl,
      traceTtlCheckInterval,
      0,
      true,
      true).get();
  TopologyDescription description = topology.describe();
  // Then: 1 thread prepared
  assertThat(description.subtopologies()).hasSize(1);
  // Given: streams config
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  // When: a trace is passed
  ConsumerRecordFactory<String, List<Span>> factory =
      new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
  Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
      .timestamp(10000L).duration(11L)
      .putTag("environment", "dev")
      .build();
  Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
      .timestamp(10000L).duration(10L)
      .build();
  Span c = Span.newBuilder().traceId("c").id("c").name("op_a").kind(Span.Kind.CLIENT)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
      .timestamp(10000L).duration(11L)
      .putTag("environment", "dev")
      .build();
  List<Span> spans = Arrays.asList(a, b, c);
  testDriver.pipeInput(factory.create(spansTopic, a.traceId(), spans, 10L));
  // Then: trace stores are filled
  KeyValueStore<String, List<Span>> traces = testDriver.getKeyValueStore(TRACES_STORE_NAME);
  assertThat(traces.get(a.traceId())).containsExactlyElementsOf(spans);
  KeyValueStore<Long, Set<String>> spanIdsByTs =
      testDriver.getKeyValueStore(SPAN_IDS_BY_TS_STORE_NAME);
  KeyValueIterator<Long, Set<String>> ids = spanIdsByTs.all();
  assertThat(ids).hasNext();
  assertThat(ids.next().value).containsExactly(a.traceId());
  // Then: service name stores are filled
  KeyValueStore<String, String> serviceNames =
      testDriver.getKeyValueStore(SERVICE_NAMES_STORE_NAME);
  List<String> serviceNameList = new ArrayList<>();
  serviceNames.all().forEachRemaining(serviceName -> serviceNameList.add(serviceName.value));
  assertThat(serviceNameList).hasSize(2);
  assertThat(serviceNames.get("svc_a")).isEqualTo("svc_a");
  assertThat(serviceNames.get("svc_b")).isEqualTo("svc_b");
  KeyValueStore<String, Set<String>> spanNames =
      testDriver.getKeyValueStore(SPAN_NAMES_STORE_NAME);
  assertThat(spanNames.get("svc_a")).containsExactly("op_a");
  assertThat(spanNames.get("svc_b")).containsExactly("op_b");
  KeyValueStore<String, Set<String>> autocompleteTags =
      testDriver.getKeyValueStore(AUTOCOMPLETE_TAGS_STORE_NAME);
  assertThat(autocompleteTags.get("environment")).containsExactly("dev");
  // When: clock moves forward
  Span d = Span.newBuilder()
      .traceId("d")
      .id("d")
      .timestamp(
          MILLISECONDS.toMicros(traceTtlCheckInterval.toMillis()) + MILLISECONDS.toMicros(20))
      .build();
  testDriver.pipeInput(
      factory.create(spansTopic, d.traceId(), Collections.singletonList(d),
          traceTtlCheckInterval.plusMillis(1).toMillis()));
  // Then: Traces store is empty
  assertThat(traces.get(a.traceId())).isNull();
  // Finally close resources
  testDriver.close();
  spansSerde.close();
}
 
示例19
@Test void should_aggregateSpans_and_mapDependencies() {
  // Given: configuration
  Duration traceTimeout = Duration.ofSeconds(1);
  SpansSerde spansSerde = new SpansSerde();
  DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde();
  // When: topology built
  Topology topology = new SpanAggregationTopology(
      spansTopic,
      traceTopic,
      dependencyTopic,
      traceTimeout,
      true).get();
  TopologyDescription description = topology.describe();
  // Then: single threaded topology
  assertThat(description.subtopologies()).hasSize(1);
  // Given: test driver
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  // When: two related spans coming on the same Session window
  ConsumerRecordFactory<String, List<Span>> factory =
      new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer());
  Span a = Span.newBuilder().traceId("a").id("a").name("op_a").kind(Span.Kind.CLIENT)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_a").build())
      .build();
  Span b = Span.newBuilder().traceId("a").id("b").name("op_b").kind(Span.Kind.SERVER)
      .localEndpoint(Endpoint.newBuilder().serviceName("svc_b").build())
      .build();
  testDriver.pipeInput(
      factory.create(spansTopic, a.traceId(), Collections.singletonList(a), 0L));
  testDriver.pipeInput(
      factory.create(spansTopic, b.traceId(), Collections.singletonList(b), 0L));
  // When: and new record arrive, moving the event clock further than inactivity gap
  Span c = Span.newBuilder().traceId("c").id("c").build();
  testDriver.pipeInput(factory.create(spansTopic, c.traceId(), Collections.singletonList(c),
      traceTimeout.toMillis() + 1));
  // Then: a trace is aggregated.1
  ProducerRecord<String, List<Span>> trace =
      testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer());
  assertThat(trace).isNotNull();
  OutputVerifier.compareKeyValue(trace, a.traceId(), Arrays.asList(a, b));
  // Then: a dependency link is created
  ProducerRecord<String, DependencyLink> linkRecord =
      testDriver.readOutput(dependencyTopic, new StringDeserializer(),
          dependencyLinkSerde.deserializer());
  assertThat(linkRecord).isNotNull();
  DependencyLink link = DependencyLink.newBuilder()
      .parent("svc_a").child("svc_b").callCount(1).errorCount(0)
      .build();
  OutputVerifier.compareKeyValue(linkRecord, "svc_a:svc_b", link);
  //Finally close resources
  testDriver.close();
  spansSerde.close();
  dependencyLinkSerde.close();
}
 
示例20
@Test void should_storeDependencies() {
  // Given: configs
  Duration dependenciesRetentionPeriod = Duration.ofMinutes(1);
  Duration dependenciesWindowSize = Duration.ofMillis(100);
  // When: topology created
  Topology topology = new DependencyStorageTopology(
      dependencyTopic,
      dependenciesRetentionPeriod,
      dependenciesWindowSize,
      true).get();
  TopologyDescription description = topology.describe();
  // Then: topology with 1 thread
  assertThat(description.subtopologies()).hasSize(1);
  // Given: streams configuration
  TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
  // When: a trace is passed
  ConsumerRecordFactory<String, DependencyLink> factory =
      new ConsumerRecordFactory<>(dependencyTopic, new StringSerializer(),
          dependencyLinkSerde.serializer());
  DependencyLink dependencyLink = DependencyLink.newBuilder()
      .parent("svc_a").child("svc_b").callCount(1).errorCount(0)
      .build();
  String dependencyLinkId = "svc_a:svc_b";
  testDriver.pipeInput(factory.create(dependencyTopic, dependencyLinkId, dependencyLink, 10L));
  WindowStore<String, DependencyLink> links = testDriver.getWindowStore(DEPENDENCIES_STORE_NAME);
  // Then: dependency link created
  WindowStoreIterator<DependencyLink> firstLink = links.fetch(dependencyLinkId, 0L, 100L);
  assertThat(firstLink).hasNext();
  assertThat(firstLink.next().value).isEqualTo(dependencyLink);
  // When: new links appear
  testDriver.pipeInput(factory.create(dependencyTopic, dependencyLinkId, dependencyLink, 90L));
  // Then: dependency link increases
  WindowStoreIterator<DependencyLink> secondLink = links.fetch(dependencyLinkId, 0L, 100L);
  assertThat(secondLink).hasNext();
  assertThat(secondLink.next().value.callCount()).isEqualTo(2);
  // When: time moves forward
  testDriver.advanceWallClockTime(dependenciesRetentionPeriod.toMillis() + 91L);
  testDriver.pipeInput(factory.create(dependencyTopic, dependencyLinkId, dependencyLink));
  // Then: dependency link is removed and restarted
  KeyValueIterator<Windowed<String>, DependencyLink> thirdLink = links.all();
  assertThat(thirdLink).hasNext();
  assertThat(thirdLink.next().value.callCount()).isEqualTo(1);
  // Close resources
  testDriver.close();
  dependencyLinkSerde.close();
}
 
示例21
@Test
public void shouldBuildMapNode() throws Exception {
  verifyProcessorNode((TopologyDescription.Processor) getNodeByName(SOURCE_MAPVALUES_NODE),
      Collections.singletonList(SOURCE_NODE),
      Collections.singletonList(TRANSFORM_NODE));
}
 
示例22
@Test
public void shouldBuildTransformNode() {
  final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(TRANSFORM_NODE);
  verifyProcessorNode(node, Collections.singletonList(SOURCE_MAPVALUES_NODE), Collections.singletonList(FILTER_NODE));
}
 
示例23
@Test
public void shouldBuildFilterNode() {
  final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(FILTER_NODE);
  verifyProcessorNode(node, Collections.singletonList(TRANSFORM_NODE), Collections.singletonList(FILTER_MAPVALUES_NODE));
}
 
示例24
@Test
public void shouldBuildMapValuesNode() {
  final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(FILTER_MAPVALUES_NODE);
  verifyProcessorNode(node, Collections.singletonList(FILTER_NODE), Collections.singletonList(FOREACH_NODE));
}
 
示例25
@Test
public void shouldBuildForEachNode() {
  final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(FOREACH_NODE);
  verifyProcessorNode(node, Collections.singletonList(FILTER_MAPVALUES_NODE), Collections.emptyList());
}
 
示例26
private TopologyDescription.Node getNodeByName(String nodeName) {
  return PlanTestUtil.getNodeByName(builder.build(), nodeName);
}
 
示例27
@Test
public void shouldBuildMapNode() {
  verifyProcessorNode((TopologyDescription.Processor) getNodeByName(builder.build(), PlanTestUtil.MAPVALUES_NODE),
      Collections.singletonList(PlanTestUtil.SOURCE_NODE),
      Collections.singletonList(PlanTestUtil.TRANSFORM_NODE));
}
 
示例28
@Test
public void shouldBuildTransformNode() {
  final TopologyDescription.Processor node = (TopologyDescription.Processor) getNodeByName(builder.build(), PlanTestUtil.TRANSFORM_NODE);
  verifyProcessorNode(node, Collections.singletonList(PlanTestUtil.MAPVALUES_NODE), Collections.emptyList());
}
 
示例29
@Test
public void shouldHaveOneSubTopologyIfGroupByKey() {
  build();
  final TopologyDescription description = builder.build().describe();
  assertThat(description.subtopologies().size(), equalTo(1));
}
 
示例30
@Test
public void shouldBuildMapNodePriorToOutput() {
  verifyProcessorNode((TopologyDescription.Processor) getNodeByName(builder.build(), MAPVALUES_OUTPUT_NODE),
      Collections.singletonList(TRANSFORM_NODE),
      Collections.singletonList(OUTPUT_NODE));
}