Java源码示例:com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData
示例1
public void doRebalance() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic);
}
catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
示例2
@Override
public Set<SubscriptionData> subscriptions() {
Set<SubscriptionData> result = new HashSet<SubscriptionData>();
Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
if (topics != null) {
synchronized (topics) {
for (String t : topics) {
SubscriptionData ms = null;
try {
ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
} catch (Exception e) {
log.error("parse subscription error", e);
}
ms.setSubVersion(0L);
result.add(ms);
}
}
}
return result;
}
示例3
@Override
public boolean isMessageMatched(SubscriptionData subscriptionData, long tagsCode) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode())
return true;
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
//这里的匹配机制很简单 ,只要tags 的hashcode一致, 就认为匹配; 然后把消息发送到client以后做msg tag的精确匹配。
return subscriptionData.getCodeSet().contains((int) tagsCode);
}
示例4
/**
* 注册消费者 返回是否有变化
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
// ConsumerId列表变化,通知所有Consumer
this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
}
return r1 || r2;
}
示例5
private void copySubscription() throws MQClientException {
try {
Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
if (registerTopics != null) {
for (final String topic : registerTopics) {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
示例6
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
subscriptionData.setFilterClassSource(filterClassSource);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
示例7
public void subscribe(String topic, String fullClassName, String filterClassSource)
throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
subscriptionData.setFilterClassSource(filterClassSource);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
示例8
@Override
public Set<SubscriptionData> subscriptions() {
Set<SubscriptionData> result = new HashSet<SubscriptionData>();
Set<String> topics = this.defaultMQPullConsumer.getRegisterTopics();
if (topics != null) {
synchronized (topics) {
for (String t : topics) {
SubscriptionData ms = null;
try {
ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);
} catch (Exception e) {
log.error("parse subscription error", e);
return null;
}
ms.setSubVersion(0L);
result.add(ms);
}
}
}
return result;
}
示例9
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
示例10
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
}
return r1 || r2;
}
示例11
@Override
public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
if (tagsCode == null) {
return true;
}
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode())
return true;
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
示例12
@Override
public Set<SubscriptionData> subscriptions() {
Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
//SubscriptionData 集合
subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
return subSet;
}
示例13
/**
* 注册client
*
* @param addr
* @param heartbeat
* @param timeoutMillis
* @return
* @throws RemotingException
* @throws InterruptedException
*/
public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
throws RemotingException, InterruptedException {
if (!UtilAll.isBlank(projectGroupPrefix)) {
Set<ConsumerData> consumerDatas = heartbeat.getConsumerDataSet();
for (ConsumerData consumerData : consumerDatas) {
consumerData.setGroupName(
VirtualEnvUtil.buildWithProjectGroup(consumerData.getGroupName(), projectGroupPrefix));
Set<SubscriptionData> subscriptionDatas = consumerData.getSubscriptionDataSet();
for (SubscriptionData subscriptionData : subscriptionDatas) {
subscriptionData.setTopic(VirtualEnvUtil
.buildWithProjectGroup(subscriptionData.getTopic(), projectGroupPrefix));
}
}
Set<ProducerData> producerDatas = heartbeat.getProducerDataSet();
for (ProducerData producerData : producerDatas) {
producerData.setGroupName(
VirtualEnvUtil.buildWithProjectGroup(producerData.getGroupName(), projectGroupPrefix));
}
}
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setBody(heartbeat.encode());
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
return response.getCode() == ResponseCode.SUCCESS;
}
示例14
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
}
}
return false;
}
示例15
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
}
}
}
示例16
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
示例17
@Override
public ConsumerRunningInfo consumerRunningInfo() {
ConsumerRunningInfo info = new ConsumerRunningInfo();
Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);
prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));
prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE,
String.valueOf(this.consumeMessageService.getCorePoolSize()));
prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP,
String.valueOf(this.consumerStartTimestamp));
info.setProperties(prop);
Set<SubscriptionData> subSet = this.subscriptions();
info.getSubscriptionSet().addAll(subSet);
Iterator<Entry<MessageQueue, ProcessQueue>> it =
this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
ProcessQueueInfo pqinfo = new ProcessQueueInfo();
pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
pq.fillProcessQueueInfo(pqinfo);
info.getMqTable().put(mq, pqinfo);
}
for (SubscriptionData sd : subSet) {
ConsumeStatus consumeStatus =
this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(),
sd.getTopic());
info.getStatusTable().put(sd.getTopic(), consumeStatus);
}
return info;
}
示例18
@Override
public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
}
}
}
示例19
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
}
}
return false;
}
示例20
private void subscriptionAutomatically(final String topic) {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),//
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
} catch (Exception e) {
}
}
}
示例21
/**
* 通过topic查找对应的消费组名
*
* @param topic
* @return
*/
public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<String>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next();
ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
entry.getValue().getSubscriptionTable();
if (subscriptionTable.containsKey(topic)) {
groups.add(entry.getKey());
}
}
return groups;
}
示例22
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
}
catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
示例23
private void truncateMessageQueueNotMyTopic() {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
for (MessageQueue mq : this.processQueueTable.keySet()) {
if (!subTable.containsKey(mq.getTopic())) {
ProcessQueue pq = this.processQueueTable.remove(mq);
if (pq != null) {
pq.setDropped(true);
log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}",
consumerGroup, mq);
}
}
}
}
示例24
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());
}
else {
topics = new HashSet<String>();
topics.add(requestHeader.getTopic());
}
for (String topic : topics) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
log.warn("[cloneGroupOffset], topic config not exist, {}", topic);
continue;
}
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
&& findSubscriptionData == null) {
log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
continue;
}
}
this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
示例25
@Override
public boolean isMessageMatched(SubscriptionData subscriptionData, long tagsCode) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode())
return true;
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
return subscriptionData.getCodeSet().contains((int) tagsCode);
}
示例26
public SubscriptionData findSubscriptionData(final String group, final String topic) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
if (consumerGroupInfo != null) {
return consumerGroupInfo.findSubscriptionData(topic);
}
return null;
}
示例27
@Override
public boolean isSubscribeTopicNeedUpdate(String topic) {
Map<String, SubscriptionData> subTable = this.rebalanceImpl.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
}
}
return false;
}
示例28
@Test
public void testSubscriptionData() throws Exception {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
subscriptionData.setFilterClassSource("java hello");
String json = RemotingSerializable.toJson(subscriptionData, true);
System.out.println(json);
}
示例29
@Test
public void testSubscriptionData() throws Exception {
SubscriptionData subscriptionData =
FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
subscriptionData.setFilterClassSource("java hello");
String json = RemotingSerializable.toJson(subscriptionData, true);
System.out.println(json);
}
示例30
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}