4 Commits

Author SHA1 Message Date
4bd7266c89 don't try to read objects we know don't exist when committing
such as new chunks with random ids
2025-04-13 11:57:22 +02:00
bb65aab166 Server: remember opened files ids 2025-04-13 11:52:50 +02:00
a4810c7ee4 Objects: fix cache reading twice 2025-04-12 20:49:29 +02:00
e42e076b77 Objects: disable dhfs.objects.lru.print-stats by default
quite annoying
2025-04-09 11:02:46 +02:00
10 changed files with 96 additions and 21 deletions

View File

@@ -42,13 +42,16 @@ public class CachingObjectPersistentStore {
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry);
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
@@ -189,7 +192,7 @@ public class CachingObjectPersistentStore {
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
return read;
}
@Override

View File

@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
public <T extends JData> void put(JData obj) {
transactionManager.current().put(obj);
}
@Override
public <T extends JData> void putNew(JData obj) {
transactionManager.current().putNew(obj);
}
}

View File

@@ -1,11 +1,11 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -16,7 +16,6 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -106,10 +105,17 @@ public class JObjectManager {
for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
if (entry.getValue() instanceof TxRecord.TxObjectRecordNewWrite<?> newWrite) {
lastCurHookSeen.put(entry.getKey(), entry.getValue());
hook.onCreate(newWrite.key(), newWrite.data());
continue;
}
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
case TxRecord.TxObjectRecordWriteChecked<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
@@ -221,8 +227,10 @@ public class JObjectManager {
writes.values().stream()
.filter(r -> {
if (!same)
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
if (r instanceof TxRecord.TxObjectRecordWriteChecked<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep == null)
return true;
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;

View File

@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj);
void delete(JObjectKey key);

View File

@@ -57,6 +57,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final HashSet<JObjectKey> _totallyNew = new HashSet<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
@@ -97,8 +98,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
var got = _readSet.get(key);
if (got == null) {
if (_totallyNew.contains(key)) {
return Optional.empty();
}
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectNoLock<>(read));
// Log.infov("Read object {0} from source, type {1}", key, type);
return read.map(JDataVersionedWrapper::data).map(type::cast);
}
@@ -175,8 +181,15 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void put(JData obj) {
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_writes.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj));
}
@Override
public <T extends JData> void putNew(JData obj) {
_writes.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj));
_totallyNew.add(obj.key());
}
@Override

View File

@@ -8,7 +8,18 @@ public class TxRecord {
JObjectKey key();
}
public record TxObjectRecordWrite<T extends JData>(JData data) implements TxObjectRecord<T> {
public interface TxObjectRecordWrite<T> extends TxObjectRecord<T> {
JData data();
}
public record TxObjectRecordWriteChecked<T extends JData>(JData data) implements TxObjectRecordWrite<T> {
@Override
public JObjectKey key() {
return data.key();
}
}
public record TxObjectRecordNewWrite<T extends JData>(JData data) implements TxObjectRecordWrite<T> {
@Override
public JObjectKey key() {
return data.key();
@@ -17,4 +28,5 @@ public class TxRecord {
public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> {
}
}

View File

@@ -1,7 +1,7 @@
dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.lru.print-stats=true
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.repository.SyncHandler;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
@@ -123,6 +123,11 @@ public class RemoteTransaction {
curTx.put(newData);
}
public <T extends JDataRemote> void putDataNew(T obj) {
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return getMeta(key, LockingStrategy.OPTIMISTIC);
}

View File

@@ -81,7 +81,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putData(newChunk);
remoteTx.putDataNew(newChunk);
return newChunk;
}

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -30,6 +31,8 @@ import ru.serce.jnrfuse.struct.Timespec;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static jnr.posix.FileStat.*;
@@ -50,6 +53,24 @@ public class DhfsFuse extends FuseStubFS {
@Inject
DhfsFileService fileService;
private final ConcurrentHashMap<Long, JObjectKey> _openHandles = new ConcurrentHashMap<>();
private final AtomicLong _fh = new AtomicLong(1);
private long allocateHandle(JObjectKey key) {
while (true) {
var newFh = _fh.getAndIncrement();
if (newFh == 0) continue;
if (_openHandles.putIfAbsent(newFh, key) == null) {
return newFh;
}
}
}
private JObjectKey getFromHandle(long handle) {
assert handle != 0;
return _openHandles.get(handle);
}
void init(@Observes @Priority(100000) StartupEvent event) {
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
@@ -174,7 +195,9 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int open(String path, FuseFileInfo fi) {
try {
if (fileService.open(path).isEmpty()) return -ErrorCodes.ENOENT();
var opened = fileService.open(path);
if (opened.isEmpty()) return -ErrorCodes.ENOENT();
fi.fh.set(allocateHandle(opened.get()));
return 0;
} catch (Throwable e) {
Log.error("When open " + path, e);
@@ -182,15 +205,20 @@ public class DhfsFuse extends FuseStubFS {
}
}
@Override
public int release(String path, FuseFileInfo fi) {
assert fi.fh.get() != 0;
_openHandles.remove(fi.fh.get());
return 0;
}
@Override
public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
if (size < 0) return -ErrorCodes.EINVAL();
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var read = fileService.read(fileOpt.get(), offset, (int) size);
var fileKey = getFromHandle(fi.fh.get());
var read = fileService.read(fileKey, offset, (int) size);
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size();
@@ -204,8 +232,7 @@ public class DhfsFuse extends FuseStubFS {
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var fileKey = getFromHandle(fi.fh.get());
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
if (buffer.isDirect()) {
@@ -218,7 +245,7 @@ public class DhfsFuse extends FuseStubFS {
buf.get(0, buffer.array(), 0, (int) size);
}
var written = fileService.write(fileOpt.get(), offset, UnsafeByteOperations.unsafeWrap(buffer));
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Throwable e) {
Log.error("When writing " + path, e);
@@ -231,7 +258,8 @@ public class DhfsFuse extends FuseStubFS {
try {
var ret = fileService.create(path, mode);
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
else return 0;
fi.fh.set(allocateHandle(ret.get()));
return 0;
} catch (Throwable e) {
Log.error("When creating " + path, e);
return -ErrorCodes.EIO();