Java源码示例:com.alibaba.rocketmq.store.config.StorePathConfigHelper

示例1
private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {
        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();
            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId = Integer.parseInt(fileQueueId.getName());
                    ConsumeQueue logic = new ConsumeQueue(//
                        topic,//
                        queueId,//
                        StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
                        lSize,//
                        null);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (!logic.load()) {
                        return false;
                    }
                }
            }
        }
    }
    System.out.println("load logics queue all over, OK");
    return true;
}
 
示例2
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentHashMap<Integer, ConsumeQueue> newMap =
                new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        }
        else {
            map = newMap;
        }
    }
    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
            topic,//
            queueId,//
            StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
            lSize,//
            null);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        }
        else {
            logic = newLogic;
        }
    }
    return logic;
}
 
示例3
private String diskUtil() {
    String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

    String storePathLogis =
            StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);

    String storePathIndex =
            StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);

    return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
 
示例4
public IndexService(final DefaultMessageStore store) {
    this.defaultMessageStore = store;
    this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
    this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
    this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
 
示例5
public void shutdown() {
    if (!this.shutdown) {
        this.shutdown = true;

        this.scheduledExecutorService.shutdown();

        try {
            Thread.sleep(1000 * 3);
        }
        catch (InterruptedException e) {
            log.error("shutdown Exception, ", e);
        }

        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.shutdown();
        }

        this.haService.shutdown();

        this.storeStatsService.shutdown();
        this.indexService.shutdown();
        this.flushConsumeQueueService.shutdown();
        this.commitLog.shutdown();
        this.reputMessageService.shutdown();
        this.allocateMapedFileService.shutdown();
        this.storeCheckpoint.flush();
        this.storeCheckpoint.shutdown();

        if (this.runningFlags.isWriteable()) {
            this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
        }
        else {
            log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
        }
    }
}
 
示例6
public void destroy() {
    this.destroyLogics();
    this.commitLog.destroy();
    this.indexService.destroy();
    this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
 
示例7
@Override
public HashMap<String, String> getRuntimeInfo() {
    HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        //获取commitlog文件夹占用磁盘资源百分百
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

    }

    {

        String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        //consumeQueue文件夹占用磁盘资源百分比
        result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
    }

    {   //获取存在于延时队列offset中还未到时间的消息的offset,以及把到时间到消息移到commitlog和consumeQueue队列后的offset获取出来
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(result);
        }
    }

    //commilog目录文件的最小offset和最大offset
    result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
    result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));

    return result;
}
 
示例8
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        }
        else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
            topic,//
            queueId,//
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        }
        else {
            logic = newLogic;
        }
    }

    return logic;
}
 
示例9
private void createTempFile() throws IOException {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    MapedFile.ensureDirOK(file.getParent());
    boolean result = file.createNewFile();
    log.info(fileName + (result ? " create OK" : " already exists"));
}
 
示例10
/**
 *
 [[email protected] topic-prod-xxxxxservice-xxxxx]# pwd
 /data/store/consumequeue/topic-prod-xxxxxservice-xxxxx
 [[email protected] topic-prod-xxxxxservice-xxxxx]# ls
 0  1  2  3  4  5  6  7  8  9
 [[email protected] topic-prod-xxxxxservice-xxxxx]# ls 0/
 00000000000048000000  00000000000054000000  00000000000060000000  00000000000066000000  00000000000072000000  00000000000078000000  00000000000084000000  00000000000090000000  00000000000096000000  00000000000102000000
 [[email protected] topic-prod-xxxxxservice-xxxxx]#
 * 消费队列的目录结构
 * /topic/queueid/mapfile 其中mapfile的文件名是?
 * Consume Queue存储消息在Commit Log中的位置信息
 * @return
 */
// DefaultMessageStore.loadConsumeQueue broker起来后,从/data/store/consumequeue路径读取对应topic中各个队列的commit log索引信息
private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {
        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();
            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId = Integer.parseInt(fileQueueId.getName());
                    ConsumeQueue logic = new ConsumeQueue(//
                        topic,//
                        queueId,//
                        StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//
                        this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//
                        this);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (!logic.load()) {
                        return false;
                    }
                }
            }
        }
    }

    log.info("load logics queue all over, OK");

    return true;
}
 
示例11
private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {

        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();

            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId = Integer.parseInt(fileQueueId.getName());
                    ConsumeQueue logic = new ConsumeQueue(//
                            topic,//
                            queueId,//
                            StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
                            lSize,//
                            null);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (!logic.load()) {
                        return false;
                    }
                }
            }
        }
    }
    System.out.println("load logics queue all over, OK");
    return true;
}
 
示例12
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentHashMap<Integer, ConsumeQueue> newMap =
                new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }
    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
                topic,//
                queueId,//
                StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),//
                lSize,//
                null);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }
    return logic;
}
 
示例13
private String diskUtil() {
    String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

    String storePathLogis =
            StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);

    String storePathIndex =
            StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);

    return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
 
示例14
public IndexService(final DefaultMessageStore store) {
    this.defaultMessageStore = store;
    this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
    this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
    this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
 
示例15
/**

     */
    public void shutdown() {
        if (!this.shutdown) {
            this.shutdown = true;

            this.scheduledExecutorService.shutdown();

            try {

                Thread.sleep(1000 * 3);
            } catch (InterruptedException e) {
                log.error("shutdown Exception, ", e);
            }

            if (this.scheduleMessageService != null) {
                this.scheduleMessageService.shutdown();
            }

            this.haService.shutdown();

            this.storeStatsService.shutdown();
            this.indexService.shutdown();
            this.flushConsumeQueueService.shutdown();
            this.commitLog.shutdown();
            this.reputMessageService.shutdown();
            this.allocateMapedFileService.shutdown();
            this.storeCheckpoint.flush();
            this.storeCheckpoint.shutdown();

            if (this.runningFlags.isWriteable()) {
                this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
            } else {
                log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
            }
        }
    }
 
示例16
public void destroy() {
    this.destroyLogics();
    this.commitLog.destroy();
    this.indexService.destroy();
    this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
 
示例17
@Override
public HashMap<String, String> getRuntimeInfo() {
    HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();

    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

    }


    {

        String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
    }


    {
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(result);
        }
    }

    result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
    result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));

    return result;
}
 
示例18
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        } else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
                topic, //
                queueId, //
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        } else {
            logic = newLogic;
        }
    }

    return logic;
}
 
示例19
/**

     *
     * @throws IOException
     */
    private void createTempFile() throws IOException {
        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
        File file = new File(fileName);
        MapedFile.ensureDirOK(file.getParent());
        boolean result = file.createNewFile();
        log.info(fileName + (result ? " create OK" : " already exists"));
    }
 
示例20
private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {

        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();

            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId;
                    try {
                        queueId = Integer.parseInt(fileQueueId.getName());
                    }catch (NumberFormatException e) {
                        continue;
                    }
                    ConsumeQueue logic = new ConsumeQueue(//
                            topic, //
                            queueId, //
                            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                            this);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (!logic.load()) {
                        return false;
                    }
                }
            }
        }
    }

    log.info("load logics queue all over, OK");

    return true;
}
 
示例21
private boolean loadConsumeQueue() {
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(lStorePath));
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) {
        // TOPIC 遍历
        for (File fileTopic : fileTopicList) {
            String topic = fileTopic.getName();
            // TOPIC 下队列遍历
            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) {
                for (File fileQueueId : fileQueueIdList) {
                    int queueId = Integer.parseInt(fileQueueId.getName());
                    ConsumeQueue logic = new ConsumeQueue(//
                        topic, //
                        queueId, //
                        StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), //
                        lSize, //
                        null);
                    this.putConsumeQueue(topic, queueId, logic);
                    if (!logic.load()) {
                        return false;
                    }
                }
            }
        }
    }
    System.out.println("load logics queue all over, OK");
    return true;
}
 
示例22
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentHashMap<Integer, ConsumeQueue> newMap =
                new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        }
        else {
            map = newMap;
        }
    }
    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
            topic, //
            queueId, //
            StorePathConfigHelper.getStorePathConsumeQueue(lStorePath), //
            lSize, //
            null);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        }
        else {
            logic = newLogic;
        }
    }
    return logic;
}
 
示例23
private String diskUtil() {
    String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);

    String storePathLogis = StorePathConfigHelper
        .getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);

    String storePathIndex = StorePathConfigHelper
        .getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
    double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);

    return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
}
 
示例24
public IndexService(final DefaultMessageStore store) {
    this.defaultMessageStore = store;
    this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
    this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
    this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
}
 
示例25
/**
 * 关闭存储服务
 */
public void shutdown() {
    if (!this.shutdown) {
        this.shutdown = true;

        this.scheduledExecutorService.shutdown();

        try {
            // 等待其他调用停止
            Thread.sleep(1000 * 3);
        }
        catch (InterruptedException e) {
            log.error("shutdown Exception, ", e);
        }

        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.shutdown();
        }

        this.haService.shutdown();

        this.storeStatsService.shutdown();
        this.dispatchMessageService.shutdown();
        this.indexService.shutdown();
        this.flushConsumeQueueService.shutdown();
        this.commitLog.shutdown();
        this.allocateMapedFileService.shutdown();
        if (this.reputMessageService != null) {
            this.reputMessageService.shutdown();
        }
        this.storeCheckpoint.flush();
        this.storeCheckpoint.shutdown();
        // 正常关闭 删除存储根目录下的abort文件
        this.deleteFile(
            StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    }
}
 
示例26
public void destroy() {
    this.destroyLogics();
    this.commitLog.destroy();
    this.indexService.destroy();
    this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
    this.deleteFile(
        StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
}
 
示例27
@Override
public HashMap<String, String> getRuntimeInfo() {
    HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
    // 检测物理文件磁盘空间
    {
        String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
        result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));

    }

    // 检测逻辑文件磁盘空间
    {

        String storePathLogics = StorePathConfigHelper
            .getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
        double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
        result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
    }

    // 延时进度
    {
        if (this.scheduleMessageService != null) {
            this.scheduleMessageService.buildRunningStats(result);
        }
    }

    result.put(RunningStats.commitLogMinOffset.name(),
        String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
    result.put(RunningStats.commitLogMaxOffset.name(),
        String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));

    return result;
}
 
示例28
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
    ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    if (null == map) {
        ConcurrentHashMap<Integer, ConsumeQueue> newMap =
                new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) {
            map = oldMap;
        }
        else {
            map = newMap;
        }
    }

    ConsumeQueue logic = map.get(queueId);
    if (null == logic) {
        ConsumeQueue newLogic = new ConsumeQueue(//
            topic, //
            queueId, //
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
            this);
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) {
            logic = oldLogic;
        }
        else {
            logic = newLogic;
        }
    }

    return logic;
}
 
示例29
/**
 * 启动服务后,在存储根目录创建临时文件,类似于 UNIX VI编辑工具
 * 
 * @throws IOException
 */
private void createTempFile() throws IOException {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    MapedFile.ensureDirOK(file.getParent());
    boolean result = file.createNewFile();
    log.info(fileName + (result ? " create OK" : " already exists"));
}
 
示例30
private boolean loadConsumeQueue() {

        File dirLogic = new File(
            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
        File[] fileTopicList = dirLogic.listFiles();
        if (fileTopicList != null) {
            // TOPIC 遍历
            for (File fileTopic : fileTopicList) {
                String topic = fileTopic.getName();
                // TOPIC 下队列遍历
                File[] fileQueueIdList = fileTopic.listFiles();
                if (fileQueueIdList != null) {
                    for (File fileQueueId : fileQueueIdList) {
                        int queueId = Integer.parseInt(fileQueueId.getName());
                        ConsumeQueue logic = new ConsumeQueue(//
                            topic, //
                            queueId, //
                            StorePathConfigHelper
                                .getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
                            this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
                            this);
                        this.putConsumeQueue(topic, queueId, logic);
                        if (!logic.load()) {
                            return false;
                        }
                    }
                }
            }
        }

        log.info("load logics queue all over, OK");

        return true;
    }