Java源码示例:com.alibaba.rocketmq.store.config.FlushDiskType

示例1
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mapedFileQueue =
            new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
                defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(),
                defaultMessageStore.getAllocateMapedFileService());
    this.defaultMessageStore = defaultMessageStore;
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 同步刷盘
        this.flushCommitLogService = new GroupCommitService();
    }
    else {
        // 异步刷盘
        this.flushCommitLogService = new FlushRealTimeService();
    }

    this.appendMessageCallback = new DefaultAppendMessageCallback(
        defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
 
示例2
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mapedFileQueue =
            new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore
                .getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
    this.defaultMessageStore = defaultMessageStore;

    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    }
    else { //默认是异步刷盘,走这个分支。
        this.flushCommitLogService = new FlushRealTimeService();
    }

    this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
 
示例3
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MapedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) {
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);

    this.mlock();
}
 
示例4
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mapedFileQueue = new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
            defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
    this.defaultMessageStore = defaultMessageStore;

    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        this.flushCommitLogService = new FlushRealTimeService();
    }

    this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
 
示例5
/**
 * 所谓预热,就是把超过设定大小(默认1G)的文件 ,每间隔4k(内存分页的大小) 写一个byte (使 page dirty) ,
 * 脏页累积到一定量(16M)的时候,做刷盘动作 (数据真正的落在本地磁盘)。
 * @param type
 * @param pages
 */
public void warmMappedFile(FlushDiskType type, int pages) {
    long  beginTime = System.currentTimeMillis();
    //所谓slice, 可以理解为bytebuffer中剩余容量的一个快照 。
    //比如原来bytebuffer长度为1024 , 还有512字节容量,则新的bytebuffer的pos就是512, limit就是1024,
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MapedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0); // 把bytebuffer的剩余容量即slice 做一个数据填充。
        // force flush when flush disk type is sync
        if (type == FlushDiskType.SYNC_FLUSH) { //并且同步刷盘的话,
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { //写入的位置和flush的位置差值分页数超过了指定的页数。
                //这里算一下,默认是16M做一次强制刷盘。
                flush = i;
                mappedByteBuffer.force();
            }
        }

        // prevent gc
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    // force flush when prepare load finished
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file worm up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime);
        mappedByteBuffer.force();
    }
    log.info("mapped file worm up done. mappedFile={}, costTime={}", this.getFileName(),
        System.currentTimeMillis() - beginTime);
}