4 Commits

8 changed files with 209 additions and 157 deletions

View File

@@ -111,29 +111,37 @@ public class JObjectManager {
}
writes.putAll(currentIteration);
} while (somethingChanged);
if (writes.isEmpty()) {
Log.trace("Committing transaction - no changes");
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
runnable.run();
}
};
}
} finally {
readSet = tx.reads();
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
.sorted(Comparator.comparing(JObjectKey::toString))
.forEach(addDependency);
for (var read : readSet.entrySet()) {
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
}
}
throw e;
}
readSet = tx.reads();
if (!writes.isEmpty()) {
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
.sorted(Comparator.comparing(JObjectKey::toString))
.forEach(addDependency);
}
for (var read : readSet.entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
toUnlock.add(locked.lock());
}
}
if (writes.isEmpty()) {
Log.trace("Committing transaction - no changes");
return new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
runnable.run();
}
};
}
Log.trace("Committing transaction start");

View File

@@ -2,13 +2,21 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class LockManager {
private final DataLocker _objLocker = new DataLocker();
@Nonnull
public AutoCloseableNoThrow lockObject(JObjectKey key) {
return _objLocker.lock(key);
}
@Nullable
public AutoCloseableNoThrow tryLockObject(JObjectKey key) {
return _objLocker.tryLock(key);
}
}

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
@@ -25,7 +24,9 @@ public class CachingObjectPersistentStore {
private long _cacheVersion = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final DataLocker _readerLocker = new DataLocker();
@Inject
LockManager lockManager;
@Inject
SerializingObjectPersistentStore delegate;
@@ -93,11 +94,22 @@ public class CachingObjectPersistentStore {
} finally {
_lock.readLock().unlock();
}
try (var lock = _readerLocker.lock(name)) {
// TODO: This is possibly racy
// var got = delegate.readObject(name);
// put(name, got);
return delegate.readObject(name);
// Global object lock, prevent putting the object into cache
// if its being written right now by another thread
try (var lock = lockManager.tryLockObject(name)) {
var got = delegate.readObject(name);
if (lock == null)
return got;
_lock.writeLock().lock();
try {
put(name, got);
// No need to increase cache version, the objects didn't change
} finally {
_lock.writeLock().unlock();
}
return got;
}
}
@@ -125,79 +137,6 @@ public class CachingObjectPersistentStore {
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private final long _curCacheVersion;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate, long cacheVersion) {
_delegate = delegate;
_curCacheVersion = cacheVersion;
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
}
@Override
public void skip() {
_delegate.skip();
}
@Override
public void close() {
_delegate.close();
}
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
@Override
public JObjectKey peekPrevKey() {
return _delegate.peekPrevKey();
}
private void maybeCache(Pair<JObjectKey, JDataVersionedWrapper> prev) {
_lock.writeLock().lock();
try {
if (_cacheVersion != _curCacheVersion) {
Log.tracev("Not caching: {0}", prev);
} else {
Log.tracev("Caching: {0}", prev);
put(prev.getKey(), Optional.of(prev.getValue()));
}
} finally {
_lock.writeLock().unlock();
}
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev();
maybeCache(prev);
return prev;
}
@Override
public boolean hasPrev() {
return _delegate.hasPrev();
}
@Override
public void skipPrev() {
_delegate.skipPrev();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
maybeCache(next);
return next;
}
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
TreePMap<JObjectKey, CacheEntry> curSortedCache;
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
@@ -210,6 +149,7 @@ public class CachingObjectPersistentStore {
try {
curSortedCache = _sortedCache;
cacheVersion = _cacheVersion;
// TODO: Could this be done without lock?
backing = delegate.getSnapshot();
} finally {
_lock.readLock().unlock();
@@ -219,7 +159,81 @@ public class CachingObjectPersistentStore {
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final TreePMap<JObjectKey, CacheEntry> _curSortedCache = curSortedCache;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
private final long _cacheVersion = cacheVersion;
private final long _snapshotCacheVersion = cacheVersion;
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
if (_snapshotCacheVersion != _cacheVersion)
return;
_lock.writeLock().lock();
try {
if (_snapshotCacheVersion != _cacheVersion) {
Log.tracev("Not caching: {0}", key);
} else {
Log.tracev("Caching: {0}", key);
put(key, obj);
}
} finally {
_lock.writeLock().unlock();
}
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
_delegate = delegate;
}
@Override
public JObjectKey peekNextKey() {
return _delegate.peekNextKey();
}
@Override
public void skip() {
_delegate.skip();
}
@Override
public void close() {
_delegate.close();
}
@Override
public boolean hasNext() {
return _delegate.hasNext();
}
@Override
public JObjectKey peekPrevKey() {
return _delegate.peekPrevKey();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
var prev = _delegate.prev();
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
return prev;
}
@Override
public boolean hasPrev() {
return _delegate.hasPrev();
}
@Override
public void skipPrev() {
_delegate.skipPrev();
}
@Override
public Pair<JObjectKey, JDataVersionedWrapper> next() {
var next = _delegate.next();
maybeCache(next.getKey(), Optional.of(next.getValue()));
return next;
}
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
@@ -232,7 +246,7 @@ public class CachingObjectPersistentStore {
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key), _cacheVersion), Data::new));
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
}
@Nonnull
@@ -248,6 +262,8 @@ public class CachingObjectPersistentStore {
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
}

View File

@@ -9,26 +9,22 @@ import java.util.Collection;
import java.util.Set;
public record File(JObjectKey key, long mode, long cTime, long mTime,
boolean symlink, long size
boolean symlink
) implements JDataRemote, JMapHolder<JMapLongKey> {
public File withSymlink(boolean symlink) {
return new File(key, mode, cTime, mTime, symlink, size);
}
public File withSize(long size) {
return new File(key, mode, cTime, mTime, symlink, size);
return new File(key, mode, cTime, mTime, symlink);
}
public File withMode(long mode) {
return new File(key, mode, cTime, mTime, symlink, size);
return new File(key, mode, cTime, mTime, symlink);
}
public File withCTime(long cTime) {
return new File(key, mode, cTime, mTime, symlink, size);
return new File(key, mode, cTime, mTime, symlink);
}
public File withMTime(long mTime) {
return new File(key, mode, cTime, mTime, symlink, size);
return new File(key, mode, cTime, mTime, symlink);
}
@Override

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.files.service;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.objects.JObjectKey;
import org.apache.commons.lang3.tuple.Pair;
@@ -29,9 +28,7 @@ public interface DhfsFileService {
Iterable<String> readDir(String name);
void updateFileSize(File file);
Long size(JObjectKey f);
long size(JObjectKey fileUuid);
Optional<ByteString> read(JObjectKey fileUuid, long offset, int length);

View File

@@ -75,10 +75,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
JMapHelper jMapHelper;
private JKleppmannTreeManager.JKleppmannTree getTree() {
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(new JObjectKey("fs"));
}
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
return jKleppmannTreeManager.getTree(new JObjectKey("fs"), LockingStrategy.OPTIMISTIC);
}
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putData(newChunk);
@@ -87,18 +91,25 @@ public class DhfsFileServiceImpl implements DhfsFileService {
void init(@Observes @Priority(500) StartupEvent event) {
Log.info("Initializing file service");
getTree();
getTreeW();
}
private JKleppmannTreeNode getDirEntry(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
private JKleppmannTreeNode getDirEntryW(String name) {
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private JKleppmannTreeNode getDirEntryR(String name) {
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
return ret;
}
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
if (res == null) return Optional.empty();
var ret = curTx.get(JKleppmannTreeNode.class, res);
return ret;
@@ -130,7 +141,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
public Optional<JObjectKey> open(String name) {
return jObjectTxManager.executeTx(() -> {
try {
var ret = getDirEntry(name);
var ret = getDirEntryR(name);
return switch (ret.meta()) {
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.getFileIno());
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
@@ -154,7 +165,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
public Optional<JObjectKey> create(String name, long mode) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntry(path.getParent().toString());
var parent = getDirEntryW(path.getParent().toString());
ensureDir(parent);
@@ -162,11 +173,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var fuuid = UUID.randomUUID();
Log.debug("Creating file " + fuuid);
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), false, 0);
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), false);
remoteTx.putData(f);
try {
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
} catch (Exception e) {
// fobj.getMeta().removeRef(newNodeId);
throw e;
@@ -179,7 +190,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
return jObjectTxManager.executeTx(() -> {
return getTree().findParent(w -> {
return getTreeW().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
return f.getFileIno().equals(ino);
return false;
@@ -191,14 +202,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
public void mkdir(String name, long mode) {
jObjectTxManager.executeTx(() -> {
Path path = Path.of(name);
var parent = getDirEntry(path.getParent().toString());
var parent = getDirEntryW(path.getParent().toString());
ensureDir(parent);
String dname = path.getFileName().toString();
Log.debug("Creating directory " + name);
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId());
});
}
@@ -210,21 +221,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (!allowRecursiveDelete && !node.children().isEmpty())
throw new DirectoryNotEmptyException();
}
getTree().trash(node.meta(), node.key());
getTreeW().trash(node.meta(), node.key());
});
}
@Override
public Boolean rename(String from, String to) {
return jObjectTxManager.executeTx(() -> {
var node = getDirEntry(from);
var node = getDirEntryW(from);
JKleppmannTreeNodeMeta meta = node.meta();
var toPath = Path.of(to);
var toDentry = getDirEntry(toPath.getParent().toString());
var toDentry = getDirEntryW(toPath.getParent().toString());
ensureDir(toDentry);
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
return true;
});
}
@@ -253,7 +264,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public Iterable<String> readDir(String name) {
return jObjectTxManager.executeTx(() -> {
var found = getDirEntry(name);
var found = getDirEntryW(name);
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
@@ -541,7 +552,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
updateFileSize(file);
return (long) data.size();
});
@@ -571,7 +581,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
// file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
remoteTx.putData(file);
// cleanupChunks(file, oldChunks.values());
updateFileSize(file);
return true;
}
@@ -675,7 +684,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
updateFileSize(file);
return true;
});
}
@@ -699,7 +707,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
public JObjectKey symlink(String oldpath, String newpath) {
return jObjectTxManager.executeTx(() -> {
Path path = Path.of(newpath);
var parent = getDirEntry(path.getParent().toString());
var parent = getDirEntryW(path.getParent().toString());
ensureDir(parent);
@@ -709,12 +717,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
Log.debug("Creating file " + fuuid);
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), true, 0);
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), true);
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
updateFileSize(f);
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
remoteTx.putData(f);
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
return f.key();
});
}
@@ -733,12 +740,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
@Override
public void updateFileSize(File file) {
jObjectTxManager.executeTx(() -> {
public long size(JObjectKey fileUuid) {
return jObjectTxManager.executeTx(() -> {
long realSize = 0;
var file = remoteTx.getData(File.class, fileUuid)
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
Log.tracev("Getting last");
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.max())) {
last = it.hasNext() ? it.next() : null;
}
@@ -747,19 +755,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
realSize = last.getKey().key() + getChunkSize(last.getValue().ref());
}
if (realSize != file.size()) {
remoteTx.putData(file.withSize(realSize));
}
});
}
@Override
public Long size(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {
var read = remoteTx.getData(File.class, uuid)
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
return read.size();
return realSize;
});
}
}

View File

@@ -35,9 +35,9 @@ public class JKleppmannTreeManager {
@Inject
PeerInfoService peerInfoService;
public JKleppmannTree getTree(JObjectKey name) {
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
return txManager.executeTx(() -> {
var data = curTx.get(JKleppmannTreePersistentData.class, name, LockingStrategy.WRITE).orElse(null);
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
if (data == null) {
data = new JKleppmannTreePersistentData(
name,
@@ -59,6 +59,10 @@ public class JKleppmannTreeManager {
});
}
public JKleppmannTree getTree(JObjectKey name) {
return getTree(name, LockingStrategy.WRITE);
}
public class JKleppmannTree {
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private final JKleppmannTreeStorageInterface _storageInterface;

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.utils;
import io.quarkus.logging.Log;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap;
@@ -10,6 +12,7 @@ public class DataLocker {
};
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
@Nonnull
public AutoCloseableNoThrow lock(Object data) {
while (true) {
try {
@@ -36,6 +39,30 @@ public class DataLocker {
}
}
@Nullable
public AutoCloseableNoThrow tryLock(Object data) {
while (true) {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
return null;
}
continue;
}
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
}
}
private static class LockTag {
final Thread owner = Thread.currentThread();
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();