

发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,这篇文章主要介绍RocketMQ设计之同步刷盘的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog
千家信息网最后更新 2025年01月18日RocketMQ设计之同步刷盘的示例分析




public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    // Synchronization flush    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;        if (messageExt.isWaitStoreMsgOK()) {            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());            service.putRequest(request);            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());            if (!flushOK) {                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                    + " client address: " + messageExt.getBornHostString());                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);            }        } else {            service.wakeup();        }    }    // Asynchronous flush    else {        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            flushCommitLogService.wakeup();        } else {            commitLogService.wakeup();        }    }}class GroupCommitService extends FlushCommitLogService {        private volatile List requestsWrite = new ArrayList();        private volatile List requestsRead = new ArrayList();        //提交刷盘任务到任务列表        public synchronized void putRequest(final GroupCommitRequest request) {            synchronized (this.requestsWrite) {                this.requestsWrite.add(request);            }            if (hasNotified.compareAndSet(false, true)) {                waitPoint.countDown(); // notify            }        }        private void swapRequests() {            List tmp = this.requestsWrite;            this.requestsWrite = this.requestsRead;            this.requestsRead = tmp;        }        private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }        public void run() {            CommitLog.log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {                try {                    this.waitForRunning(10);                    this.doCommit();                } catch (Exception e) {                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                }            }            // Under normal circumstances shutdown, wait for the arrival of the            // request, and then flush            try {                Thread.sleep(10);            } catch (InterruptedException e) {                CommitLog.log.warn("GroupCommitService Exception, ", e);            }            synchronized (this) {                this.swapRequests();            }            this.doCommit();            CommitLog.log.info(this.getServiceName() + " service end");        }        @Override        protected void onWaitEnd() {            this.swapRequests();        }        @Override        public String getServiceName() {            return GroupCommitService.class.getSimpleName();        }        @Override        public long getJointime() {            return 1000 * 60 * 5;        }    }



  • putRequest(request) 提交刷盘任务到任务列表

  • request.waitForFlush同步等待GroupCommitService将任务列表中的任务刷盘完成。



private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            //根据offset确定是否已经刷盘                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    //清空已刷盘的列表                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }
  • 刷盘的时候依次读取requestsRead中的数据写入磁盘,

  • 写入完成后清空requestsRead



public boolean flush(final int flushLeastPages) {    boolean result = true;    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);    if (mappedFile != null) {        long tmpTimeStamp = mappedFile.getStoreTimestamp();        int offset = mappedFile.flush(flushLeastPages);        long where = mappedFile.getFileFromOffset() + offset;        result = where == this.flushedWhere;        this.flushedWhere = where;        if (0 == flushLeastPages) {            this.storeTimestamp = tmpTimeStamp;        }    }    return result;}


