Java源码示例:com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData

示例1
public void doRebalance() {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic);
            }
            catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    this.truncateMessageQueueNotMyTopic();
}
 
示例2
@Override
public Set<SubscriptionData> subscriptions() {
    Set<SubscriptionData> result = new HashSet<SubscriptionData>();

    Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
    if (topics != null) {
        synchronized (topics) {
            for (String t : topics) {
                SubscriptionData ms = null;
                try {
                    ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
                } catch (Exception e) {
                    log.error("parse subscription error", e);
                }
                ms.setSubVersion(0L);
                result.add(ms);
            }
        }
    }

    return result;
}
 
示例3
@Override
public boolean isMessageMatched(SubscriptionData subscriptionData, long tagsCode) {
    if (null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode())
        return true;

    if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
        return true;
    }

    //这里的匹配机制很简单 ,只要tags 的hashcode一致, 就认为匹配; 然后把消息发送到client以后做msg tag的精确匹配。
    return subscriptionData.getCodeSet().contains((int) tagsCode);
}
 
示例4
/**
 * 注册消费者 返回是否有变化
 */
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
        final Set<SubscriptionData> subList) {
    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
        consumeFromWhere);
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        // ConsumerId列表变化,通知所有Consumer
        this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
    }

    return r1 || r2;
}
 
示例5
private void copySubscription() throws MQClientException {
    try {
        Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
        if (registerTopics != null) {
            for (final String topic : registerTopics) {
                SubscriptionData subscriptionData =
                        FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
                            topic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
    }
    catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}
 
示例6
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
    try {
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
                topic, "*");
        subscriptionData.setSubString(fullClassName);
        subscriptionData.setClassFilterMode(true);
        subscriptionData.setFilterClassSource(filterClassSource);
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }

    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}
 
示例7
public void subscribe(String topic, String fullClassName, String filterClassSource)
        throws MQClientException {
    try {
        SubscriptionData subscriptionData =
                FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
                    topic, "*");
        subscriptionData.setSubString(fullClassName);
        subscriptionData.setClassFilterMode(true);
        subscriptionData.setFilterClassSource(filterClassSource);
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }

    }
    catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}
 
示例8
@Override
public Set<SubscriptionData> subscriptions() {
    Set<SubscriptionData> result = new HashSet<SubscriptionData>();

    Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
    if (topics != null) {
        synchronized (topics) {
            for (String t : topics) {
                SubscriptionData ms = null;
                try {
                    ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
                } catch (Exception e) {
                    log.error("parse subscription error", e);
                    return null;
                }
                ms.setSubVersion(0L);
                result.add(ms);
            }
        }
    }

    return result;
}
 
示例9
public void doRebalance(final boolean isOrder) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic, isOrder);
            } catch (Exception e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }

    this.truncateMessageQueueNotMyTopic();
}
 
示例10
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
                                ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
                                final Set<SubscriptionData> subList) {

    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    boolean r1 =
            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                    consumeFromWhere);
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    if (r1 || r2) {
        this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
    }

    return r1 || r2;
}
 
示例11
@Override
public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
    if (tagsCode == null) {
        return true;
    }

    if (null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode())
        return true;

    if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
        return true;
    }

    return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
 
示例12
@Override
public Set<SubscriptionData> subscriptions() {
    Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();

    //SubscriptionData 集合
    subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());

    return subSet;
}
 
示例13
/**
 * 注册client
 * 
 * @param addr
 * @param heartbeat
 * @param timeoutMillis
 * @return
 * @throws RemotingException
 * @throws InterruptedException
 */
public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
        throws RemotingException, InterruptedException {
    if (!UtilAll.isBlank(projectGroupPrefix)) {
        Set<ConsumerData> consumerDatas = heartbeat.getConsumerDataSet();
        for (ConsumerData consumerData : consumerDatas) {
            consumerData.setGroupName(
                VirtualEnvUtil.buildWithProjectGroup(consumerData.getGroupName(), projectGroupPrefix));
            Set<SubscriptionData> subscriptionDatas = consumerData.getSubscriptionDataSet();
            for (SubscriptionData subscriptionData : subscriptionDatas) {
                subscriptionData.setTopic(VirtualEnvUtil
                    .buildWithProjectGroup(subscriptionData.getTopic(), projectGroupPrefix));
            }
        }
        Set<ProducerData> producerDatas = heartbeat.getProducerDataSet();
        for (ProducerData producerData : producerDatas) {
            producerData.setGroupName(
                VirtualEnvUtil.buildWithProjectGroup(producerData.getGroupName(), projectGroupPrefix));
        }
    }

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);

    request.setBody(heartbeat.encode());
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    return response.getCode() == ResponseCode.SUCCESS;
}
 
示例14
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        if (subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
    }

    return false;
}
 
示例15
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        }
    }
}
 
示例16
public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        SubscriptionData subscriptionData =
                FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                    topic, subExpression);
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    }
    catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}
 
示例17
@Override
public ConsumerRunningInfo consumerRunningInfo() {
    ConsumerRunningInfo info = new ConsumerRunningInfo();

    Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);

    prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
    prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE,
        String.valueOf(this.consumeMessageService.getCorePoolSize()));
    prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP,
        String.valueOf(this.consumerStartTimestamp));

    info.setProperties(prop);

    Set<SubscriptionData> subSet = this.subscriptions();
    info.getSubscriptionSet().addAll(subSet);

    Iterator<Entry<MessageQueue, ProcessQueue>> it =
            this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        ProcessQueueInfo pqinfo = new ProcessQueueInfo();
        pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
        pq.fillProcessQueueInfo(pqinfo);
        info.getMqTable().put(mq, pqinfo);
    }

    for (SubscriptionData sd : subSet) {
        ConsumeStatus consumeStatus =
                this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(),
                    sd.getTopic());
        info.getStatusTable().put(sd.getTopic(), consumeStatus);
    }

    return info;
}
 
示例18
@Override
public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
    Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
    if (subTable != null) {
        if (subTable.containsKey(topic)) {
            this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
        }
    }
}
 
示例19
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
    Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
    if (subTable != null) {
        if (subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
    }

    return false;
}
 
示例20
private void subscriptionAutomatically(final String topic) {
    if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
        try {
            SubscriptionData subscriptionData =
                    FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
                            topic, SubscriptionData.SUB_ALL);
            this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
        } catch (Exception e) {
        }
    }
}
 
示例21
/**
 * 通过topic查找对应的消费组名
 * 
 * @param topic
 * @return
 */
public HashSet<String> queryTopicConsumeByWho(final String topic) {
    HashSet<String> groups = new HashSet<String>();
    Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, ConsumerGroupInfo> entry = it.next();
        ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
                entry.getValue().getSubscriptionTable();
        if (subscriptionTable.containsKey(topic)) {
            groups.add(entry.getKey());
        }
    }

    return groups;
}
 
示例22
public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        SubscriptionData subscriptionData =
                FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
                    topic, subExpression);
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    }
    catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}
 
示例23
private void truncateMessageQueueNotMyTopic() {
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();

    for (MessageQueue mq : this.processQueueTable.keySet()) {
        if (!subTable.containsKey(mq.getTopic())) {
            ProcessQueue pq = this.processQueueTable.remove(mq);
            if (pq != null) {
                pq.setDropped(true);
                log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}",
                        consumerGroup, mq);
            }
        }
    }
}
 
示例24
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    CloneGroupOffsetRequestHeader requestHeader =
            (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);

    Set<String> topics;
    if (UtilAll.isBlank(requestHeader.getTopic())) {
        topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
    }
    else {
        topics = new HashSet<String>();
        topics.add(requestHeader.getTopic());
    }

    for (String topic : topics) {
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
        if (null == topicConfig) {
            log.warn("[cloneGroupOffset], topic config not exist, {}", topic);
            continue;
        }

        if (!requestHeader.isOffline()) {
            SubscriptionData findSubscriptionData =
                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
            if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
                    && findSubscriptionData == null) {
                log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
                continue;
            }
        }

        this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
            requestHeader.getTopic());
    }

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}
 
示例25
@Override
public boolean isMessageMatched(SubscriptionData subscriptionData, long tagsCode) {
    if (null == subscriptionData) {
        return true;
    }

    if (subscriptionData.isClassFilterMode())
        return true;

    if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
        return true;
    }

    return subscriptionData.getCodeSet().contains((int) tagsCode);
}
 
示例26
public SubscriptionData findSubscriptionData(final String group, final String topic) {
    ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
    if (consumerGroupInfo != null) {
        return consumerGroupInfo.findSubscriptionData(topic);
    }

    return null;
}
 
示例27
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
    Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
    if (subTable != null) {
        if (subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
    }

    return false;
}
 
示例28
@Test
public void testSubscriptionData() throws Exception {
    SubscriptionData subscriptionData =
            FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
    subscriptionData.setFilterClassSource("java hello");
    String json = RemotingSerializable.toJson(subscriptionData, true);
    System.out.println(json);
}
 
示例29
@Test
public void testSubscriptionData() throws Exception {
    SubscriptionData subscriptionData =
            FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
    subscriptionData.setFilterClassSource("java hello");
    String json = RemotingSerializable.toJson(subscriptionData, true);
    System.out.println(json);
}
 
示例30
public void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
                topic, subExpression);
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
        if (this.mQClientFactory != null) {
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}