千家信息网

怎么使用StampLock

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,本篇内容介绍了"怎么使用StampLock"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!主要成员变量
千家信息网最后更新 2025年01月31日怎么使用StampLock

本篇内容介绍了"怎么使用StampLock"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

主要成员变量

public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {    // 实际存储数据的位置    private final EntryLogger entryLogger;    // -----------------    // index 相关    // -----------------     // 记录fence,exist,masterKey等信息    private final LedgerMetadataIndex ledgerIndex;    // 关于位置的index    private final EntryLocationIndex entryLocationIndex;      // 临时的ledgerCache    private final ConcurrentLongHashMap transientLedgerInfoCache;      // -----------------    // 写入相关    // -----------------        // 用来写入的memtable,2个互相swap    private final StampedLock writeCacheRotationLock = new StampedLock();    // Write cache where all new entries are inserted into    protected volatile WriteCache writeCache;    // Write cache that is used to swap with writeCache during flushes    protected volatile WriteCache writeCacheBeingFlushed;      // Cache where we insert entries for speculative reading    private final ReadCache readCache;      // checkpoint 相关    private final CheckpointSource checkpointSource;    private Checkpoint lastCheckpoint = Checkpoint.MIN;}
主要作用
  1. 可以读写ledger,维护ledger的位置(index)

  2. 保存ledger相关的metadata

  3. 支持checkpoint

写入Entry

写入会直接写入到WriteCache里面,这里面使用了StampLock,将swap cache的操作进行了保护,StampLock是一个乐观读的读写锁,并发更高。

public long addEntry(ByteBuf entry) throws IOException, BookieException {        long startTime = MathUtils.nowInNano();        long ledgerId = entry.getLong(entry.readerIndex());        long entryId = entry.getLong(entry.readerIndex() + 8);        long lac = entry.getLong(entry.readerIndex() + 16);        // 这里的模板是StampLock乐观读取的通用模板        // 相对的互斥操作实际上是swap cache的操作          // First we try to do an optimistic locking to get access to the current write cache.        // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the        // rest of the time, we can have multiple thread using the optimistic lock here without interfering.               // 乐观读锁        long stamp = writeCacheRotationLock.tryOptimisticRead();        boolean inserted = false;        inserted = writeCache.put(ledgerId, entryId, entry);        // 如果插入过程中发生了cache swap 则再次插入        if (!writeCacheRotationLock.validate(stamp)) {            // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat            // the operation because we might have inserted in a write cache that was already being flushed and cleared,            // without being sure about this last entry being flushed or not.                      // 说明插入到被swap的那个cache里面了            // 如果insert是true TODO            // 如果是false的话没啥影响            stamp = writeCacheRotationLock.readLock();            try {                inserted = writeCache.put(ledgerId, entryId, entry);            } finally {                writeCacheRotationLock.unlockRead(stamp);            }        }        // 如果这里写入到writeCache失败的话,触发Flush WriteCache        // 走到这里说明可能2个buffer都满了?        if (!inserted) {            triggerFlushAndAddEntry(ledgerId, entryId, entry);        }        // 更新LAC的缓存        // after successfully insert the entry, update LAC and notify the watchers        updateCachedLacIfNeeded(ledgerId, lac);          return entryId;}
writeCache满了,触发flush的流程

这里的逻辑比较容易,一直不断循环插入到writeCache 里面,如果超时的话就跳出循环标记,这个写入失败。

如果没有触发flush动作的话,会提交一个flush task。

private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)            throws IOException, BookieException {        // metric 打点        dbLedgerStorageStats.getThrottledWriteRequests().inc();        ...        // 最大等待写入时间,超时之前不断重试        while (System.nanoTime() < absoluteTimeoutNanos) {            // Write cache is full, we need to trigger a flush so that it gets rotated            // If the flush has already been triggered or flush has already switched the            // cache, we don't need to trigger another flush                      // 提交一个flush任务,如果之前有了就不提交了            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {                // Trigger an early flush in background                log.info("Write cache is full, triggering flush");                executor.execute(() -> {                        try {                            flush();                        } catch (IOException e) {                            log.error("Error during flush", e);                        }                    });            }            long stamp = writeCacheRotationLock.readLock();            try {                if (writeCache.put(ledgerId, entryId, entry)) {                    // We succeeded in putting the entry in write cache in the                    return;                }            } finally {                writeCacheRotationLock.unlockRead(stamp);            }            // Wait some time and try again            try {                Thread.sleep(1);            } catch (InterruptedException e) {                Thread.currentThread().interrupt();                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);            }        }        // Timeout expired and we weren't able to insert in write cache        dbLedgerStorageStats.getRejectedWriteRequests().inc();        throw new OperationRejectedException();}
flush 流程

实际上flush流程是触发checkpoint的逻辑,

主要动作

  • 交换2个writeCache,正在写入的cache会被交换成flush的batch

  • 遍历writeCache,将内容写到EntryLogger里面

  • sync EntryLogger将上一步写入的内容落盘

  • 更新ledgerLocationIndex,同时flush这个index到rocksDb里面

public void flush() throws IOException {    // journal    Checkpoint cp = checkpointSource.newCheckpoint();    checkpoint(cp);    checkpointSource.checkpointComplete(cp, true);}public void checkpoint(Checkpoint checkpoint) throws IOException {        // journal        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();          // 这里检查是否在这个点之前做过checkpoint了        if (lastCheckpoint.compareTo(checkpoint) > 0) {            return;        }        long startTime = MathUtils.nowInNano();        // Only a single flush operation can happen at a time        flushMutex.lock();        try {            // Swap the write cache so that writes can continue to happen while the flush is            // ongoing            // 这里逻辑比较容易,交换当前的writeCache和后备的writeCache            // 获取的是StampLock的writeLock            swapWriteCache();            long sizeToFlush = writeCacheBeingFlushed.size();                        // Write all the pending entries into the entry logger and collect the offset            // position for each entry                        // 刷cache到实际的保存位置上、                      // 构建一个rocksDb的batch            Batch batch = entryLocationIndex.newBatch();            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {                try {                    // 把写入的entry刷到entryLogger里面                    // 这里返回的这个entry的offset                    long location = entryLogger.addEntry(ledgerId, entry, true);                                      // 这里的逻辑实际上就是把3个long 拆分成k/v 写入到RocksDb的batch 里面                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);                } catch (IOException e) {                    throw new RuntimeException(e);                }            });            // 这里不展开说了,实际上会把刚才写入的entryLogger进行flush && fsync 到磁盘上。            entryLogger.flush();            // 这里触发RocksDb的batch flush            // 这个写入是sync的            long batchFlushStarTime = System.nanoTime();            batch.flush();            batch.close();                                  // flush ledgerIndex            // 这里的内容变化比较少,因为记录的是metadata            ledgerIndex.flush();            // 调度一个cleanUp的逻辑            cleanupExecutor.execute(() -> {                // There can only be one single cleanup task running because the cleanupExecutor                // is single-threaded                try {                    if (log.isDebugEnabled()) {                        log.debug("Removing deleted ledgers from db indexes");                    }                    entryLocationIndex.removeOffsetFromDeletedLedgers();                    ledgerIndex.removeDeletedLedgers();                } catch (Throwable t) {                    log.warn("Failed to cleanup db indexes", t);                }            });                        // 保存checkpoint             lastCheckpoint = thisCheckpoint;            // 清空这个cache                      // Discard all the entry from the write cache, since they're now persisted            writeCacheBeingFlushed.clear();                    } catch (IOException e) {            // Leave IOExecption as it is            throw e;        } catch (RuntimeException e) {            // Wrap unchecked exceptions            throw new IOException(e);        } finally {            try {                isFlushOngoing.set(false);            } finally {                flushMutex.unlock();            }        }}

这样写入就完成了

读取Entry

这里会从3个位置开始读取

  1. writeCache,包括正在刷新的和正在写入的

  2. readCache,预读的缓存

  3. entryLogger,读文件,这部分已经落盘了

读取成功之后会尝试增加预读的buffer


如果正在flush这个时候有触发读取会怎么样?

上面的flush流程是在所有内容已经落盘之后才把刷新的writeCache 清空的

即使有并发读,如果最后还是落到了读文件这一步,那怎么都能读到


还有个问题就是这个先后顺序,不确定是否有相同ledgerId,entry,但是内容不同的请求出现。

这样的话感觉可能有问题

public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {        long startTime = MathUtils.nowInNano();                // 读LAC的情况        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {            return getLastEntry(ledgerId);        }        // We need to try to read from both write caches, since recent entries could be found in either of the two. The        // write caches are already thread safe on their own, here we just need to make sure we get references to both        // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.        long stamp = writeCacheRotationLock.tryOptimisticRead();        WriteCache localWriteCache = writeCache;        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;        if (!writeCacheRotationLock.validate(stamp)) {            // Fallback to regular read lock approach            stamp = writeCacheRotationLock.readLock();            try {                localWriteCache = writeCache;                localWriteCacheBeingFlushed = writeCacheBeingFlushed;            } finally {                writeCacheRotationLock.unlockRead(stamp);            }        }        // First try to read from the write cache of recent entries        ByteBuf entry = localWriteCache.get(ledgerId, entryId);        if (entry != null) {            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);            return entry;        }        // If there's a flush going on, the entry might be in the flush buffer        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);        if (entry != null) {            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);            return entry;        }        // Try reading from read-ahead cache        entry = readCache.get(ledgerId, entryId);        if (entry != null) {            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);            return entry;        }        // Read from main storage        long entryLocation;        try {            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);            if (entryLocation == 0) {                throw new NoEntryException(ledgerId, entryId);            }            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);        } catch (NoEntryException e) {            recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);            throw e;        }        readCache.put(ledgerId, entryId, entry);        // Try to read more entries        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);        recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);        recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);        return entry;}

"怎么使用StampLock"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0