mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
4 Commits
38ab6de85b
...
d483eba20d
| Author | SHA1 | Date | |
|---|---|---|---|
| d483eba20d | |||
| 4cbb4ce2be | |||
| 5f85e944e3 | |||
| 4c90a74fea |
@@ -111,29 +111,37 @@ public class JObjectManager {
|
||||
}
|
||||
writes.putAll(currentIteration);
|
||||
} while (somethingChanged);
|
||||
|
||||
if (writes.isEmpty()) {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
return new TransactionHandle() {
|
||||
@Override
|
||||
public void onFlush(Runnable runnable) {
|
||||
runnable.run();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
} finally {
|
||||
readSet = tx.reads();
|
||||
|
||||
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.forEach(addDependency);
|
||||
|
||||
for (var read : readSet.entrySet()) {
|
||||
} catch (Throwable e) {
|
||||
for (var read : tx.reads().entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
readSet = tx.reads();
|
||||
|
||||
if (!writes.isEmpty()) {
|
||||
Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
|
||||
.sorted(Comparator.comparing(JObjectKey::toString))
|
||||
.forEach(addDependency);
|
||||
}
|
||||
|
||||
for (var read : readSet.entrySet()) {
|
||||
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
|
||||
toUnlock.add(locked.lock());
|
||||
}
|
||||
}
|
||||
|
||||
if (writes.isEmpty()) {
|
||||
Log.trace("Committing transaction - no changes");
|
||||
return new TransactionHandle() {
|
||||
@Override
|
||||
public void onFlush(Runnable runnable) {
|
||||
runnable.run();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Log.trace("Committing transaction start");
|
||||
|
||||
@@ -2,13 +2,21 @@ package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
|
||||
@ApplicationScoped
|
||||
public class LockManager {
|
||||
private final DataLocker _objLocker = new DataLocker();
|
||||
|
||||
@Nonnull
|
||||
public AutoCloseableNoThrow lockObject(JObjectKey key) {
|
||||
return _objLocker.lock(key);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AutoCloseableNoThrow tryLockObject(JObjectKey key) {
|
||||
return _objLocker.tryLock(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects.persistence;
|
||||
|
||||
import com.usatiuk.dhfs.objects.*;
|
||||
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
@@ -25,7 +24,9 @@ public class CachingObjectPersistentStore {
|
||||
private long _cacheVersion = 0;
|
||||
|
||||
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||
private final DataLocker _readerLocker = new DataLocker();
|
||||
|
||||
@Inject
|
||||
LockManager lockManager;
|
||||
|
||||
@Inject
|
||||
SerializingObjectPersistentStore delegate;
|
||||
@@ -93,11 +94,22 @@ public class CachingObjectPersistentStore {
|
||||
} finally {
|
||||
_lock.readLock().unlock();
|
||||
}
|
||||
try (var lock = _readerLocker.lock(name)) {
|
||||
// TODO: This is possibly racy
|
||||
// var got = delegate.readObject(name);
|
||||
// put(name, got);
|
||||
return delegate.readObject(name);
|
||||
// Global object lock, prevent putting the object into cache
|
||||
// if its being written right now by another thread
|
||||
try (var lock = lockManager.tryLockObject(name)) {
|
||||
var got = delegate.readObject(name);
|
||||
|
||||
if (lock == null)
|
||||
return got;
|
||||
|
||||
_lock.writeLock().lock();
|
||||
try {
|
||||
put(name, got);
|
||||
// No need to increase cache version, the objects didn't change
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
return got;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,79 +137,6 @@ public class CachingObjectPersistentStore {
|
||||
Log.tracev("Committed: {0} writes, {1} deletes", names.written().size(), names.deleted().size());
|
||||
}
|
||||
|
||||
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
|
||||
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
|
||||
private final long _curCacheVersion;
|
||||
|
||||
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate, long cacheVersion) {
|
||||
_delegate = delegate;
|
||||
_curCacheVersion = cacheVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekNextKey() {
|
||||
return _delegate.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_delegate.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return _delegate.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekPrevKey() {
|
||||
return _delegate.peekPrevKey();
|
||||
}
|
||||
|
||||
private void maybeCache(Pair<JObjectKey, JDataVersionedWrapper> prev) {
|
||||
_lock.writeLock().lock();
|
||||
try {
|
||||
if (_cacheVersion != _curCacheVersion) {
|
||||
Log.tracev("Not caching: {0}", prev);
|
||||
} else {
|
||||
Log.tracev("Caching: {0}", prev);
|
||||
put(prev.getKey(), Optional.of(prev.getValue()));
|
||||
}
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
|
||||
var prev = _delegate.prev();
|
||||
maybeCache(prev);
|
||||
return prev;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrev() {
|
||||
return _delegate.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipPrev() {
|
||||
_delegate.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> next() {
|
||||
var next = _delegate.next();
|
||||
maybeCache(next);
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
|
||||
TreePMap<JObjectKey, CacheEntry> curSortedCache;
|
||||
Snapshot<JObjectKey, JDataVersionedWrapper> backing = null;
|
||||
@@ -210,6 +149,7 @@ public class CachingObjectPersistentStore {
|
||||
try {
|
||||
curSortedCache = _sortedCache;
|
||||
cacheVersion = _cacheVersion;
|
||||
// TODO: Could this be done without lock?
|
||||
backing = delegate.getSnapshot();
|
||||
} finally {
|
||||
_lock.readLock().unlock();
|
||||
@@ -219,7 +159,81 @@ public class CachingObjectPersistentStore {
|
||||
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
|
||||
private final TreePMap<JObjectKey, CacheEntry> _curSortedCache = curSortedCache;
|
||||
private final Snapshot<JObjectKey, JDataVersionedWrapper> _backing = finalBacking;
|
||||
private final long _cacheVersion = cacheVersion;
|
||||
private final long _snapshotCacheVersion = cacheVersion;
|
||||
|
||||
private void maybeCache(JObjectKey key, Optional<JDataVersionedWrapper> obj) {
|
||||
if (_snapshotCacheVersion != _cacheVersion)
|
||||
return;
|
||||
|
||||
_lock.writeLock().lock();
|
||||
try {
|
||||
if (_snapshotCacheVersion != _cacheVersion) {
|
||||
Log.tracev("Not caching: {0}", key);
|
||||
} else {
|
||||
Log.tracev("Caching: {0}", key);
|
||||
put(key, obj);
|
||||
}
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
|
||||
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
|
||||
|
||||
private CachingKvIterator(CloseableKvIterator<JObjectKey, JDataVersionedWrapper> delegate) {
|
||||
_delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekNextKey() {
|
||||
return _delegate.peekNextKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skip() {
|
||||
_delegate.skip();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
_delegate.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return _delegate.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey peekPrevKey() {
|
||||
return _delegate.peekPrevKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> prev() {
|
||||
var prev = _delegate.prev();
|
||||
maybeCache(prev.getKey(), Optional.of(prev.getValue()));
|
||||
return prev;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPrev() {
|
||||
return _delegate.hasPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void skipPrev() {
|
||||
_delegate.skipPrev();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<JObjectKey, JDataVersionedWrapper> next() {
|
||||
var next = _delegate.next();
|
||||
maybeCache(next.getKey(), Optional.of(next.getValue()));
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
|
||||
@@ -232,7 +246,7 @@ public class CachingObjectPersistentStore {
|
||||
return e.object();
|
||||
}
|
||||
),
|
||||
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key), _cacheVersion), Data::new));
|
||||
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
@@ -248,6 +262,8 @@ public class CachingObjectPersistentStore {
|
||||
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
|
||||
};
|
||||
}
|
||||
var read = _backing.readObject(name);
|
||||
maybeCache(name, read);
|
||||
return _backing.readObject(name);
|
||||
}
|
||||
|
||||
|
||||
@@ -9,26 +9,22 @@ import java.util.Collection;
|
||||
import java.util.Set;
|
||||
|
||||
public record File(JObjectKey key, long mode, long cTime, long mTime,
|
||||
boolean symlink, long size
|
||||
boolean symlink
|
||||
) implements JDataRemote, JMapHolder<JMapLongKey> {
|
||||
public File withSymlink(boolean symlink) {
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
}
|
||||
|
||||
public File withSize(long size) {
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink);
|
||||
}
|
||||
|
||||
public File withMode(long mode) {
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink);
|
||||
}
|
||||
|
||||
public File withCTime(long cTime) {
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink);
|
||||
}
|
||||
|
||||
public File withMTime(long mTime) {
|
||||
return new File(key, mode, cTime, mTime, symlink, size);
|
||||
return new File(key, mode, cTime, mTime, symlink);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.files.service;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.files.objects.File;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
@@ -29,9 +28,7 @@ public interface DhfsFileService {
|
||||
|
||||
Iterable<String> readDir(String name);
|
||||
|
||||
void updateFileSize(File file);
|
||||
|
||||
Long size(JObjectKey f);
|
||||
long size(JObjectKey fileUuid);
|
||||
|
||||
Optional<ByteString> read(JObjectKey fileUuid, long offset, int length);
|
||||
|
||||
|
||||
@@ -75,10 +75,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Inject
|
||||
JMapHelper jMapHelper;
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTree() {
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
|
||||
return jKleppmannTreeManager.getTree(new JObjectKey("fs"));
|
||||
}
|
||||
|
||||
private JKleppmannTreeManager.JKleppmannTree getTreeR() {
|
||||
return jKleppmannTreeManager.getTree(new JObjectKey("fs"), LockingStrategy.OPTIMISTIC);
|
||||
}
|
||||
|
||||
private ChunkData createChunk(ByteString bytes) {
|
||||
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
|
||||
remoteTx.putData(newChunk);
|
||||
@@ -87,18 +91,25 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
void init(@Observes @Priority(500) StartupEvent event) {
|
||||
Log.info("Initializing file service");
|
||||
getTree();
|
||||
getTreeW();
|
||||
}
|
||||
|
||||
private JKleppmannTreeNode getDirEntry(String name) {
|
||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
private JKleppmannTreeNode getDirEntryW(String name) {
|
||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
private JKleppmannTreeNode getDirEntryR(String name) {
|
||||
var res = getTreeR().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
var ret = curTx.get(JKleppmannTreeNode.class, res).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription("Tree node exists but not found as jObject: " + name)));
|
||||
return ret;
|
||||
}
|
||||
|
||||
private Optional<JKleppmannTreeNode> getDirEntryOpt(String name) {
|
||||
var res = getTree().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
var res = getTreeW().traverse(StreamSupport.stream(Path.of(name).spliterator(), false).map(p -> p.toString()).toList());
|
||||
if (res == null) return Optional.empty();
|
||||
var ret = curTx.get(JKleppmannTreeNode.class, res);
|
||||
return ret;
|
||||
@@ -130,7 +141,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public Optional<JObjectKey> open(String name) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
try {
|
||||
var ret = getDirEntry(name);
|
||||
var ret = getDirEntryR(name);
|
||||
return switch (ret.meta()) {
|
||||
case JKleppmannTreeNodeMetaFile f -> Optional.of(f.getFileIno());
|
||||
case JKleppmannTreeNodeMetaDirectory f -> Optional.of(ret.key());
|
||||
@@ -154,7 +165,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public Optional<JObjectKey> create(String name, long mode) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
Path path = Path.of(name);
|
||||
var parent = getDirEntry(path.getParent().toString());
|
||||
var parent = getDirEntryW(path.getParent().toString());
|
||||
|
||||
ensureDir(parent);
|
||||
|
||||
@@ -162,11 +173,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.debug("Creating file " + fuuid);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), false, 0);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), mode, System.currentTimeMillis(), System.currentTimeMillis(), false);
|
||||
remoteTx.putData(f);
|
||||
|
||||
try {
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
|
||||
} catch (Exception e) {
|
||||
// fobj.getMeta().removeRef(newNodeId);
|
||||
throw e;
|
||||
@@ -179,7 +190,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Override
|
||||
public Pair<String, JObjectKey> inoToParent(JObjectKey ino) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
return getTree().findParent(w -> {
|
||||
return getTreeW().findParent(w -> {
|
||||
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
|
||||
return f.getFileIno().equals(ino);
|
||||
return false;
|
||||
@@ -191,14 +202,14 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public void mkdir(String name, long mode) {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
Path path = Path.of(name);
|
||||
var parent = getDirEntry(path.getParent().toString());
|
||||
var parent = getDirEntryW(path.getParent().toString());
|
||||
ensureDir(parent);
|
||||
|
||||
String dname = path.getFileName().toString();
|
||||
|
||||
Log.debug("Creating directory " + name);
|
||||
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTree().getNewNodeId());
|
||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaDirectory(dname), getTreeW().getNewNodeId());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -210,21 +221,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (!allowRecursiveDelete && !node.children().isEmpty())
|
||||
throw new DirectoryNotEmptyException();
|
||||
}
|
||||
getTree().trash(node.meta(), node.key());
|
||||
getTreeW().trash(node.meta(), node.key());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean rename(String from, String to) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var node = getDirEntry(from);
|
||||
var node = getDirEntryW(from);
|
||||
JKleppmannTreeNodeMeta meta = node.meta();
|
||||
|
||||
var toPath = Path.of(to);
|
||||
var toDentry = getDirEntry(toPath.getParent().toString());
|
||||
var toDentry = getDirEntryW(toPath.getParent().toString());
|
||||
ensureDir(toDentry);
|
||||
|
||||
getTree().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||
getTreeW().move(toDentry.key(), meta.withName(toPath.getFileName().toString()), node.key());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -253,7 +264,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Override
|
||||
public Iterable<String> readDir(String name) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var found = getDirEntry(name);
|
||||
var found = getDirEntryW(name);
|
||||
|
||||
if (!(found.meta() instanceof JKleppmannTreeNodeMetaDirectory md))
|
||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
@@ -541,7 +552,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
|
||||
return (long) data.size();
|
||||
});
|
||||
@@ -571,7 +581,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
// file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
|
||||
remoteTx.putData(file);
|
||||
// cleanupChunks(file, oldChunks.values());
|
||||
updateFileSize(file);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -675,7 +684,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
remoteTx.putData(file);
|
||||
cleanupChunks(file, removedChunks.values());
|
||||
updateFileSize(file);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -699,7 +707,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
public JObjectKey symlink(String oldpath, String newpath) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
Path path = Path.of(newpath);
|
||||
var parent = getDirEntry(path.getParent().toString());
|
||||
var parent = getDirEntryW(path.getParent().toString());
|
||||
|
||||
ensureDir(parent);
|
||||
|
||||
@@ -709,12 +717,11 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
Log.debug("Creating file " + fuuid);
|
||||
|
||||
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), true, 0);
|
||||
File f = new File(JObjectKey.of(fuuid.toString()), 0, System.currentTimeMillis(), System.currentTimeMillis(), true);
|
||||
jMapHelper.put(f, JMapLongKey.of(0), newChunkData.key());
|
||||
|
||||
updateFileSize(f);
|
||||
|
||||
getTree().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTree().getNewNodeId());
|
||||
remoteTx.putData(f);
|
||||
getTreeW().move(parent.key(), new JKleppmannTreeNodeMetaFile(fname, f.key()), getTreeW().getNewNodeId());
|
||||
return f.key();
|
||||
});
|
||||
}
|
||||
@@ -733,12 +740,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateFileSize(File file) {
|
||||
jObjectTxManager.executeTx(() -> {
|
||||
public long size(JObjectKey fileUuid) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
long realSize = 0;
|
||||
var file = remoteTx.getData(File.class, fileUuid)
|
||||
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
|
||||
Log.tracev("Getting last");
|
||||
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.max())) {
|
||||
last = it.hasNext() ? it.next() : null;
|
||||
}
|
||||
@@ -747,19 +755,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
realSize = last.getKey().key() + getChunkSize(last.getValue().ref());
|
||||
}
|
||||
|
||||
if (realSize != file.size()) {
|
||||
remoteTx.putData(file.withSize(realSize));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long size(JObjectKey uuid) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var read = remoteTx.getData(File.class, uuid)
|
||||
.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
return read.size();
|
||||
return realSize;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,9 +35,9 @@ public class JKleppmannTreeManager {
|
||||
@Inject
|
||||
PeerInfoService peerInfoService;
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name) {
|
||||
public JKleppmannTree getTree(JObjectKey name, LockingStrategy lockingStrategy) {
|
||||
return txManager.executeTx(() -> {
|
||||
var data = curTx.get(JKleppmannTreePersistentData.class, name, LockingStrategy.WRITE).orElse(null);
|
||||
var data = curTx.get(JKleppmannTreePersistentData.class, name, lockingStrategy).orElse(null);
|
||||
if (data == null) {
|
||||
data = new JKleppmannTreePersistentData(
|
||||
name,
|
||||
@@ -59,6 +59,10 @@ public class JKleppmannTreeManager {
|
||||
});
|
||||
}
|
||||
|
||||
public JKleppmannTree getTree(JObjectKey name) {
|
||||
return getTree(name, LockingStrategy.WRITE);
|
||||
}
|
||||
|
||||
public class JKleppmannTree {
|
||||
private final KleppmannTree<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> _tree;
|
||||
private final JKleppmannTreeStorageInterface _storageInterface;
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.usatiuk.dhfs.utils;
|
||||
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.annotation.Nonnull;
|
||||
import jakarta.annotation.Nullable;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@@ -10,6 +12,7 @@ public class DataLocker {
|
||||
};
|
||||
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
|
||||
|
||||
@Nonnull
|
||||
public AutoCloseableNoThrow lock(Object data) {
|
||||
while (true) {
|
||||
try {
|
||||
@@ -36,6 +39,30 @@ public class DataLocker {
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public AutoCloseableNoThrow tryLock(Object data) {
|
||||
while (true) {
|
||||
var tag = _locks.get(data);
|
||||
if (tag != null) {
|
||||
synchronized (tag) {
|
||||
if (!tag.released) {
|
||||
if (tag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
var newTag = new LockTag();
|
||||
var oldTag = _locks.putIfAbsent(data, newTag);
|
||||
if (oldTag == null) {
|
||||
return new Lock(data, newTag);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class LockTag {
|
||||
final Thread owner = Thread.currentThread();
|
||||
// final StackTraceElement[] _creationStack = Thread.currentThread().getStackTrace();
|
||||
|
||||
Reference in New Issue
Block a user