Java源码示例:org.apache.kafka.common.acl.AclPermissionType

示例1
public void clearAcls(TopologyAclBinding aclBinding) throws IOException {
  Collection<AclBindingFilter> filters = new ArrayList<>();

  LOGGER.debug("clearAcl = " + aclBinding);
  ResourcePatternFilter resourceFilter =
      new ResourcePatternFilter(
          aclBinding.getResourceType(),
          aclBinding.getResourceName(),
          PatternType.valueOf(aclBinding.getPattern()));

  AccessControlEntryFilter accessControlEntryFilter =
      new AccessControlEntryFilter(
          aclBinding.getPrincipal(),
          aclBinding.getHost(),
          AclOperation.valueOf(aclBinding.getOperation()),
          AclPermissionType.ANY);

  AclBindingFilter filter = new AclBindingFilter(resourceFilter, accessControlEntryFilter);
  filters.add(filter);
  clearAcls(filters);
}
 
示例2
private void verifyControlCenterAcls(Platform platform)
    throws ExecutionException, InterruptedException {

  List<ControlCenter> c3List = platform.getControlCenter();

  for (ControlCenter c3 : c3List) {
    ResourcePatternFilter resourceFilter =
        new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY);

    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            c3.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);

    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(16, acls.size());
  }
}
 
示例3
@Test
public void testFromCrdToKafkaAclBinding()   {
    AclRule rule = new AclRuleBuilder()
        .withType(AclRuleType.ALLOW)
        .withResource(aclRuleTopicResource)
        .withHost("127.0.0.1")
        .withOperation(AclOperation.READ)
        .build();

    AclBinding expectedKafkaAclBinding = new AclBinding(
            kafkaResourcePattern,
            new AccessControlEntry(kafkaPrincipal.toString(), "127.0.0.1",
                    org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW)
    );

    assertThat(SimpleAclRule.fromCrd(rule).toKafkaAclBinding(kafkaPrincipal), is(expectedKafkaAclBinding));
}
 
示例4
public List<AclBinding> setAclsForControlCenter(String principal, String appId)
    throws IOException {
  List<AclBinding> bindings = new ArrayList<>();

  bindings.add(buildGroupLevelAcl(principal, appId, PatternType.PREFIXED, AclOperation.READ));
  bindings.add(
      buildGroupLevelAcl(principal, appId + "-command", PatternType.PREFIXED, AclOperation.READ));

  Arrays.asList("_confluent-monitoring", "_confluent-command", " _confluent-metrics")
      .forEach(
          topic ->
              Stream.of(
                      AclOperation.WRITE,
                      AclOperation.READ,
                      AclOperation.CREATE,
                      AclOperation.DESCRIBE)
                  .map(
                      aclOperation ->
                          buildTopicLevelAcl(principal, topic, PatternType.LITERAL, aclOperation))
                  .forEach(aclBinding -> bindings.add(aclBinding)));

  Stream.of(AclOperation.WRITE, AclOperation.READ, AclOperation.CREATE, AclOperation.DESCRIBE)
      .map(
          aclOperation ->
              buildTopicLevelAcl(principal, appId, PatternType.PREFIXED, aclOperation))
      .forEach(aclBinding -> bindings.add(aclBinding));

  ResourcePattern resourcePattern =
      new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL);
  AccessControlEntry entry =
      new AccessControlEntry(principal, "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW);
  bindings.add(new AclBinding(resourcePattern, entry));

  entry =
      new AccessControlEntry(
          principal, "*", AclOperation.DESCRIBE_CONFIGS, AclPermissionType.ALLOW);
  bindings.add(new AclBinding(resourcePattern, entry));
  createAcls(bindings);
  return bindings;
}
 
示例5
public List<AclBinding> setAclsForConnect(
    String principal, String topicPrefix, List<String> readTopics, List<String> writeTopics)
    throws IOException {

  List<AclBinding> acls = new ArrayList<>();

  List<String> topics = Arrays.asList("connect-status", "connect-offsets", "connect-configs");
  for (String topic : topics) {
    acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ));
    acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE));
  }

  ResourcePattern resourcePattern =
      new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL);
  AccessControlEntry entry =
      new AccessControlEntry(principal, "*", AclOperation.CREATE, AclPermissionType.ALLOW);
  acls.add(new AclBinding(resourcePattern, entry));

  resourcePattern = new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL);
  entry = new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW);
  acls.add(new AclBinding(resourcePattern, entry));

  if (readTopics != null) {
    readTopics.forEach(
        topic -> {
          acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.READ));
        });
  }

  if (writeTopics != null) {
    writeTopics.forEach(
        topic -> {
          acls.add(buildTopicLevelAcl(principal, topic, PatternType.LITERAL, AclOperation.WRITE));
        });
  }

  createAcls(acls);
  return acls;
}
 
示例6
private AclBinding buildTopicLevelAcl(
    String principal, String topic, PatternType patternType, AclOperation op) {
  return new AclBuilder(principal)
      .addResource(ResourceType.TOPIC, topic, patternType)
      .addControlEntry("*", op, AclPermissionType.ALLOW)
      .build();
}
 
示例7
private AclBinding buildGroupLevelAcl(
    String principal, String group, PatternType patternType, AclOperation op) {
  return new AclBuilder(principal)
      .addResource(ResourceType.GROUP, group, patternType)
      .addControlEntry("*", op, AclPermissionType.ALLOW)
      .build();
}
 
示例8
private void verifyProducerAcls(List<Producer> producers, String topic)
    throws InterruptedException, ExecutionException {

  for (Producer producer : producers) {
    ResourcePatternFilter resourceFilter = ResourcePatternFilter.ANY;
    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            producer.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);
    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(2, acls.size());

    List<ResourceType> types =
        acls.stream()
            .map(aclBinding -> aclBinding.pattern().resourceType())
            .collect(Collectors.toList());

    Assert.assertTrue(types.contains(ResourceType.TOPIC));

    List<AclOperation> ops =
        acls.stream()
            .map(aclsBinding -> aclsBinding.entry().operation())
            .collect(Collectors.toList());

    Assert.assertTrue(ops.contains(AclOperation.DESCRIBE));
    Assert.assertTrue(ops.contains(AclOperation.WRITE));
  }
}
 
示例9
private void verifyConsumerAcls(List<Consumer> consumers, String topic)
    throws InterruptedException, ExecutionException {

  for (Consumer consumer : consumers) {
    ResourcePatternFilter resourceFilter = ResourcePatternFilter.ANY;
    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            consumer.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);
    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(3, acls.size());

    List<ResourceType> types =
        acls.stream()
            .map(aclBinding -> aclBinding.pattern().resourceType())
            .collect(Collectors.toList());

    Assert.assertTrue(types.contains(ResourceType.GROUP));
    Assert.assertTrue(types.contains(ResourceType.TOPIC));

    List<AclOperation> ops =
        acls.stream()
            .map(aclsBinding -> aclsBinding.entry().operation())
            .collect(Collectors.toList());

    Assert.assertTrue(ops.contains(AclOperation.DESCRIBE));
    Assert.assertTrue(ops.contains(AclOperation.READ));
  }
}
 
示例10
private void givenAllowAcl(final Credentials credentials,
                           final ResourceType resourceType,
                           final String resourceName,
                           final Set<AclOperation> ops) {
  SECURE_CLUSTER.addUserAcl(credentials.username, AclPermissionType.ALLOW,
                            new Resource(resourceType, resourceName), ops);
}
 
示例11
/**
 * Writes the supplied ACL information to ZK, where it will be picked up by the brokes authorizer.
 *
 * @param username    the who.
 * @param permission  the allow|deny.
 * @param resource    the thing
 * @param ops         the what.
 */
public void addUserAcl(final String username,
                       final AclPermissionType permission,
                       final Resource resource,
                       final Set<AclOperation> ops) {

  final KafkaPrincipal principal = new KafkaPrincipal("User", username);
  final PermissionType scalaPermission = PermissionType$.MODULE$.fromJava(permission);

  final Set<Acl> javaAcls = ops.stream()
      .map(Operation$.MODULE$::fromJava)
      .map(op -> new Acl(principal, scalaPermission, "*", op))
      .collect(Collectors.toSet());

  final scala.collection.immutable.Set<Acl> scalaAcls =
      JavaConversions.asScalaSet(javaAcls).toSet();

  kafka.security.auth.ResourceType scalaResType =
      ResourceType$.MODULE$.fromJava(resource.resourceType());

  final kafka.security.auth.Resource scalaResource =
      new kafka.security.auth.Resource(scalaResType, resource.name());

  authorizer.addAcls(scalaAcls, scalaResource);

  addedAcls.add(scalaResource);
}
 
示例12
/**
 * Returns Set of ACLs applying to single user.
 *
 * @param username  Name of the user.
 * @return The Set of ACLs applying to single user.
 */
public Set<SimpleAclRule> getAcls(String username)   {
    log.debug("Searching for ACL rules of user {}", username);
    Set<SimpleAclRule> result = new HashSet<>();
    KafkaPrincipal principal = new KafkaPrincipal("User", username);

    AclBindingFilter aclBindingFilter = new AclBindingFilter(ResourcePatternFilter.ANY,
        new AccessControlEntryFilter(principal.toString(), null, AclOperation.ANY, AclPermissionType.ANY));

    Collection<AclBinding> aclBindings = null;
    try {
        aclBindings = adminClient.describeAcls(aclBindingFilter).values().get();
    } catch (InterruptedException | ExecutionException e) {
        // Admin Client API needs authorizer enabled on the Kafka brokers
        if (e.getCause() instanceof SecurityDisabledException) {
            throw new InvalidResourceException("Authorization needs to be enabled in the Kafka custom resource", e.getCause());
        } else if (e.getCause() instanceof UnknownServerException && e.getMessage().contains("Simple ACL delegation not enabled")) {
            throw new InvalidResourceException("Simple ACL delegation needs to be enabled in the Kafka custom resource", e.getCause());
        }
    }

    if (aclBindings != null) {
        log.debug("ACL rules for user {}", username);
        for (AclBinding aclBinding : aclBindings) {
            log.debug("{}", aclBinding);
            result.add(SimpleAclRule.fromAclBinding(aclBinding));
        }
    }

    return result;
}
 
示例13
/**
 * Create Kafka's AclBinding instance from current SimpleAclRule instance for the provided principal
 *
 * @param principal KafkaPrincipal instance for the current SimpleAclRule
 * @return Kafka AclBinding instance
 */
public AclBinding toKafkaAclBinding(KafkaPrincipal principal) {
    ResourcePattern resourcePattern = resource.toKafkaResourcePattern();
    AclPermissionType kafkaType = toKafkaAclPermissionType(type);
    org.apache.kafka.common.acl.AclOperation kafkaOperation = toKafkaAclOperation(operation);
    return new AclBinding(resourcePattern, new AccessControlEntry(principal.toString(), getHost(), kafkaOperation, kafkaType));
}
 
示例14
private AclPermissionType toKafkaAclPermissionType(AclRuleType aclRuleType) {
    switch (aclRuleType) {
        case DENY:
            return AclPermissionType.DENY;
        case ALLOW:
            return AclPermissionType.ALLOW;
        default:
            throw new IllegalArgumentException("Invalid Acl type: " + aclRuleType);
    }
}
 
示例15
private static AclRuleType fromKafkaAclPermissionType(AclPermissionType aclPermissionType) {
    switch (aclPermissionType) {
        case DENY:
            return AclRuleType.DENY;
        case ALLOW:
            return AclRuleType.ALLOW;
        default:
            throw new IllegalArgumentException("Invalid AclRule type: " + aclPermissionType);
    }
}
 
示例16
@Test
public void testGetUsersFromAcls(VertxTestContext context)  {
    Admin mockAdminClient = mock(AdminClient.class);
    SimpleAclOperator aclOp = new SimpleAclOperator(vertx, mockAdminClient);

    ResourcePattern res1 = new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL);
    ResourcePattern res2 = new ResourcePattern(ResourceType.GROUP, "my-group", PatternType.LITERAL);

    KafkaPrincipal foo = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "CN=foo");
    AclBinding fooAclBinding = new AclBinding(res1, new AccessControlEntry(foo.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal bar = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "CN=bar");
    AclBinding barAclBinding = new AclBinding(res1, new AccessControlEntry(bar.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal baz = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "baz");
    AclBinding bazAclBinding = new AclBinding(res2, new AccessControlEntry(baz.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal all = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
    AclBinding allAclBinding = new AclBinding(res1, new AccessControlEntry(all.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    KafkaPrincipal anonymous = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
    AclBinding anonymousAclBinding = new AclBinding(res2, new AccessControlEntry(anonymous.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));

    Collection<AclBinding> aclBindings =
            asList(fooAclBinding, barAclBinding, bazAclBinding, allAclBinding, anonymousAclBinding);

    assertDoesNotThrow(() -> mockDescribeAcls(mockAdminClient, AclBindingFilter.ANY, aclBindings));
    assertThat(aclOp.getUsersWithAcls(), is(new HashSet<>(asList("foo", "bar", "baz"))));
    context.completeNow();
}
 
示例17
@Test
public void testReconcileInternalDelete(VertxTestContext context) {
    Admin mockAdminClient = mock(AdminClient.class);
    SimpleAclOperator aclOp = new SimpleAclOperator(vertx, mockAdminClient);

    ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL);

    KafkaPrincipal foo = new KafkaPrincipal("User", "CN=foo");
    AclBinding readAclBinding = new AclBinding(resource, new AccessControlEntry(foo.toString(), "*", org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));

    ArgumentCaptor<Collection<AclBindingFilter>> aclBindingFiltersCaptor = ArgumentCaptor.forClass(Collection.class);
    assertDoesNotThrow(() -> {
        mockDescribeAcls(mockAdminClient, null, Collections.singleton(readAclBinding));
        mockDeleteAcls(mockAdminClient, Collections.singleton(readAclBinding), aclBindingFiltersCaptor);
    });

    Checkpoint async = context.checkpoint();
    aclOp.reconcile("CN=foo", null)
            .onComplete(context.succeeding(rr -> context.verify(() -> {

                Collection<AclBindingFilter> capturedAclBindingFilters = aclBindingFiltersCaptor.getValue();
                assertThat(capturedAclBindingFilters, hasSize(1));
                assertThat(capturedAclBindingFilters, hasItem(readAclBinding.toFilter()));

                Set<ResourcePatternFilter> capturedResourcePatternFilters =
                        capturedAclBindingFilters.stream().map(AclBindingFilter::patternFilter).collect(Collectors.toSet());
                assertThat(capturedResourcePatternFilters, hasSize(1));
                assertThat(capturedResourcePatternFilters, hasItem(resource.toFilter()));

                async.flag();
            })));
}
 
示例18
@Test
public void testToKafkaAclBindingForSpecifiedKafkaPrincipalReturnsKafkaAclBindingForKafkaPrincipal() {
    SimpleAclRule kafkaTopicSimpleAclRule = new SimpleAclRule(AclRuleType.ALLOW, resource, "127.0.0.1", AclOperation.READ);
    AclBinding expectedAclBinding = new AclBinding(
            kafkaResourcePattern,
            new AccessControlEntry(kafkaPrincipal.toString(), "127.0.0.1",
                    org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW)
    );
    assertThat(kafkaTopicSimpleAclRule.toKafkaAclBinding(kafkaPrincipal), is(expectedAclBinding));
}
 
示例19
@Test
public void testFromAclBindingReturnsSimpleAclRule() {
    AclBinding aclBinding = new AclBinding(
            kafkaResourcePattern,
            new AccessControlEntry(kafkaPrincipal.toString(), "127.0.0.1",
                    org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW)
    );
    SimpleAclRule expectedSimpleAclRule = new SimpleAclRule(AclRuleType.ALLOW, resource, "127.0.0.1", AclOperation.READ);
    assertThat(SimpleAclRule.fromAclBinding(aclBinding), is(expectedSimpleAclRule));
}
 
示例20
@Test
public void testFromKafkaAclBindingToKafkaAclBindingRoundtrip()   {
    AclBinding kafkaAclBinding = new AclBinding(
            kafkaResourcePattern,
            new AccessControlEntry(kafkaPrincipal.toString(), "127.0.0.1",
                    org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW)
    );
    assertThat(SimpleAclRule.fromAclBinding(kafkaAclBinding).toKafkaAclBinding(kafkaPrincipal), is(kafkaAclBinding));
}
 
示例21
public AclBuilder addControlEntry(
    String host, AclOperation op, AclPermissionType permissionType) {
  entry = new AccessControlEntry(principal, host, op, permissionType);
  return this;
}
 
示例22
private void verifySchemaRegistryAcls(Platform platform)
    throws ExecutionException, InterruptedException {

  List<SchemaRegistry> srs = platform.getSchemaRegistry();

  for (SchemaRegistry sr : srs) {

    ResourcePatternFilter resourceFilter =
        new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY);

    AccessControlEntryFilter entryFilter =
        new AccessControlEntryFilter(
            sr.getPrincipal(), null, AclOperation.ANY, AclPermissionType.ALLOW);

    AclBindingFilter filter = new AclBindingFilter(resourceFilter, entryFilter);

    Collection<AclBinding> acls = kafkaAdminClient.describeAcls(filter).values().get();

    Assert.assertEquals(3, acls.size());
  }
}
 
示例23
@Test
public void testReconcileInternalCreateAddsAclsToAuthorizer(VertxTestContext context) {
    Admin mockAdminClient = mock(AdminClient.class);
    SimpleAclOperator aclOp = new SimpleAclOperator(vertx, mockAdminClient);

    ResourcePattern resource1 = new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PatternType.LITERAL);
    ResourcePattern resource2 = new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL);

    KafkaPrincipal foo = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "CN=foo");
    AclBinding describeAclBinding = new AclBinding(resource1, new AccessControlEntry(foo.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.DESCRIBE, AclPermissionType.ALLOW));
    AclBinding readAclBinding = new AclBinding(resource2, new AccessControlEntry(foo.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    AclBinding writeAclBinding = new AclBinding(resource2, new AccessControlEntry(foo.toString(), "*",
            org.apache.kafka.common.acl.AclOperation.WRITE, AclPermissionType.ALLOW));

    SimpleAclRuleResource ruleResource1 = new SimpleAclRuleResource("kafka-cluster", SimpleAclRuleResourceType.CLUSTER, AclResourcePatternType.LITERAL);
    SimpleAclRuleResource ruleResource2 = new SimpleAclRuleResource("my-topic", SimpleAclRuleResourceType.TOPIC, AclResourcePatternType.LITERAL);
    SimpleAclRule resource1DescribeRule = new SimpleAclRule(AclRuleType.ALLOW, ruleResource1, "*", AclOperation.DESCRIBE);
    SimpleAclRule resource2ReadRule = new SimpleAclRule(AclRuleType.ALLOW, ruleResource2, "*", AclOperation.READ);
    SimpleAclRule resource2WriteRule = new SimpleAclRule(AclRuleType.ALLOW, ruleResource2, "*", AclOperation.WRITE);

    ArgumentCaptor<Collection<AclBinding>> aclBindingsCaptor = ArgumentCaptor.forClass(Collection.class);
    assertDoesNotThrow(() -> {
        mockDescribeAcls(mockAdminClient, null, emptyList());
        mockCreateAcls(mockAdminClient, aclBindingsCaptor);
    });

    Checkpoint async = context.checkpoint();
    aclOp.reconcile("CN=foo", new LinkedHashSet<>(asList(resource2ReadRule, resource2WriteRule, resource1DescribeRule)))
            .onComplete(context.succeeding(rr -> context.verify(() -> {
                Collection<AclBinding> capturedAclBindings = aclBindingsCaptor.getValue();
                assertThat(capturedAclBindings, hasSize(3));
                assertThat(capturedAclBindings, hasItems(describeAclBinding, readAclBinding, writeAclBinding));

                Set<ResourcePattern> capturedResourcePatterns =
                        capturedAclBindings.stream().map(AclBinding::pattern).collect(Collectors.toSet());
                assertThat(capturedResourcePatterns, hasSize(2));
                assertThat(capturedResourcePatterns, hasItems(resource1, resource2));

                async.flag();
            })));
}
 
示例24
@Test
public void testReconcileInternalUpdateCreatesNewAclsAndDeletesOldAcls(VertxTestContext context) {
    Admin mockAdminClient = mock(AdminClient.class);
    SimpleAclOperator aclOp = new SimpleAclOperator(vertx, mockAdminClient);

    ResourcePattern resource1 = new ResourcePattern(ResourceType.TOPIC, "my-topic", PatternType.LITERAL);
    ResourcePattern resource2 = new ResourcePattern(ResourceType.TOPIC, "my-topic2", PatternType.LITERAL);

    KafkaPrincipal foo = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "CN=foo");
    AclBinding readAclBinding = new AclBinding(resource1, new AccessControlEntry(foo.toString(), "*", org.apache.kafka.common.acl.AclOperation.READ, AclPermissionType.ALLOW));
    AclBinding writeAclBinding = new AclBinding(resource2, new AccessControlEntry(foo.toString(), "*", org.apache.kafka.common.acl.AclOperation.WRITE, AclPermissionType.ALLOW));

    SimpleAclRuleResource resource = new SimpleAclRuleResource("my-topic2", SimpleAclRuleResourceType.TOPIC, AclResourcePatternType.LITERAL);
    SimpleAclRule rule1 = new SimpleAclRule(AclRuleType.ALLOW, resource, "*", AclOperation.WRITE);

    ArgumentCaptor<Collection<AclBinding>> aclBindingsCaptor = ArgumentCaptor.forClass(Collection.class);
    ArgumentCaptor<Collection<AclBindingFilter>> aclBindingFiltersCaptor = ArgumentCaptor.forClass(Collection.class);
    assertDoesNotThrow(() -> {
        mockDescribeAcls(mockAdminClient, null, Collections.singleton(readAclBinding));
        mockCreateAcls(mockAdminClient, aclBindingsCaptor);
        mockDeleteAcls(mockAdminClient, Collections.singleton(readAclBinding), aclBindingFiltersCaptor);
    });

    Checkpoint async = context.checkpoint();
    aclOp.reconcile("CN=foo", new LinkedHashSet(asList(rule1)))
            .onComplete(context.succeeding(rr -> context.verify(() -> {

                // Create Write rule for resource 2
                Collection<AclBinding> capturedAclBindings = aclBindingsCaptor.getValue();
                assertThat(capturedAclBindings, hasSize(1));
                assertThat(capturedAclBindings, hasItem(writeAclBinding));
                Set<ResourcePattern> capturedResourcePatterns =
                        capturedAclBindings.stream().map(AclBinding::pattern).collect(Collectors.toSet());
                assertThat(capturedResourcePatterns, hasSize(1));
                assertThat(capturedResourcePatterns, hasItem(resource2));

                // Delete read rule for resource 1
                Collection<AclBindingFilter> capturedAclBindingFilters = aclBindingFiltersCaptor.getValue();
                assertThat(capturedAclBindingFilters, hasSize(1));
                assertThat(capturedAclBindingFilters, hasItem(readAclBinding.toFilter()));

                Set<ResourcePatternFilter> capturedResourcePatternFilters =
                        capturedAclBindingFilters.stream().map(AclBindingFilter::patternFilter).collect(Collectors.toSet());
                assertThat(capturedResourcePatternFilters, hasSize(1));
                assertThat(capturedResourcePatternFilters, hasItem(resource1.toFilter()));

                async.flag();
            })));
}