some cleanup and fixes

This commit is contained in:
2025-01-02 22:07:23 +01:00
parent 57f865dafb
commit a143648c46
11 changed files with 283 additions and 99 deletions

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.transaction.*;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.dhfs.utils.DataLocker;
@@ -24,13 +23,9 @@ import java.util.function.Function;
@ApplicationScoped
public class JObjectManager {
@Inject
ObjectPersistentStore objectStorage;
@Inject
ObjectSerializer<JDataVersionedWrapper> objectSerializer;
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
@Inject
TxWriteback txWriteback;
private final List<PreCommitTxHook> _preCommitTxHooks;
@@ -82,27 +77,7 @@ public class JObjectManager {
try (var readLock = _objLocker.lock(key)) {
if (_objects.containsKey(key)) continue;
var pending = txWriteback.getPendingWrite(key);
JDataVersionedWrapper<?> read;
switch (pending.orElse(null)) {
case TxWriteback.PendingWrite write -> {
read = write.data();
}
case TxWriteback.PendingDelete delete -> {
return null;
}
case null -> {
}
default -> {
throw new IllegalStateException("Unexpected value: " + pending);
}
}
read = objectStorage.readObject(key)
.map(objectSerializer::deserialize)
.orElse(null);
var read = writebackObjectPersistentStore.readObject(key).orElse(null);
if (read == null) return null;
@@ -280,35 +255,27 @@ public class JObjectManager {
Log.tracef("Flushing transaction %d to storage", tx.getId());
var bundle = txWriteback.createBundle();
try {
for (var action : current.entrySet()) {
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + action.getKey());
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
bundle.commit(wrapped);
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey());
bundle.delete(action.getKey());
_objects.remove(action.getKey());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.getValue());
}
for (var action : current.entrySet()) {
switch (action.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + action.getKey());
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey());
_objects.remove(action.getKey());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.getValue());
}
}
} catch (Throwable t) {
txWriteback.dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", tx.getId());
txWriteback.commitBundle(bundle);
writebackObjectPersistentStore.commitTx(current.values(), tx.getId());
} catch (Throwable t) {
Log.error("Error when committing transaction", t);
Log.trace("Error when committing transaction", t);
throw new TxCommitException(t.getMessage(), t);
} finally {
for (var unlock : toUnlock) {

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import java.util.function.Supplier;
@@ -23,8 +24,10 @@ public interface TransactionManager {
commit();
return ret;
} catch (TxCommitException txCommitException) {
if (tries == 0)
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
return runTries(supplier, tries - 1);
} catch (Throwable e) {
rollback();
@@ -43,10 +46,11 @@ public interface TransactionManager {
fn.apply();
commit();
} catch (TxCommitException txCommitException) {
if (tries == 0)
if (tries == 0) {
Log.error("Transaction commit failed", txCommitException);
throw txCommitException;
}
runTries(fn, tries - 1);
return;
} catch (Throwable e) {
rollback();
throw e;

View File

@@ -34,7 +34,7 @@ public class TransactionManagerImpl implements TransactionManager {
try {
jObjectManager.commit(_currentTransaction.get());
} catch (Throwable e) {
Log.warn("Transaction commit failed", e);
Log.trace("Transaction commit failed", e);
throw e;
} finally {
_currentTransaction.remove();

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifest;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
@@ -29,9 +29,7 @@ public class TxWritebackImpl implements TxWriteback {
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
ObjectPersistentStore objectPersistentStore;
@Inject
ObjectSerializer<JDataVersionedWrapper> objectSerializer;
CachingObjectPersistentStore objectPersistentStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private long currentSize = 0;
@@ -118,7 +116,7 @@ public class TxWritebackImpl implements TxWriteback {
case TxBundleImpl.CommittedEntry c -> _commitExecutor.execute(() -> {
try {
Log.trace("Writing new " + c.key());
objectPersistentStore.writeObject(c.key(), objectSerializer.serialize(c.data()));
objectPersistentStore.writeObject(c.key(), c.data());
} catch (Throwable t) {
Log.error("Error writing " + c.key(), t);
errors.add(t);

View File

@@ -0,0 +1,62 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
@ApplicationScoped
public class WritebackObjectPersistentStore {
@Inject
CachingObjectPersistentStore delegate;
@Inject
TxWriteback txWriteback;
@Nonnull
Collection<JObjectKey> findAllObjects() {
return delegate.findAllObjects();
}
@Nonnull
Optional<JDataVersionedWrapper<?>> readObject(JObjectKey name) {
var pending = txWriteback.getPendingWrite(name).orElse(null);
return switch (pending) {
case TxWriteback.PendingWrite write -> Optional.of(write.data());
case TxWriteback.PendingDelete ignored -> Optional.empty();
case null -> delegate.readObject(name);
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
void commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, long id) {
var bundle = txWriteback.createBundle();
try {
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapper<>(write.data(), id));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
} catch (Throwable t) {
txWriteback.dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", id);
txWriteback.commitBundle(bundle);
}
}

View File

@@ -0,0 +1,110 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.utils.DataLocker;
import io.quarkus.logging.Log;
import io.quarkus.runtime.Startup;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
@Inject
SerializingObjectPersistentStore delegate;
private record CacheEntry(Optional<JDataVersionedWrapper<?>> object, long size) {
}
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>();
@ConfigProperty(name = "dhfs.objects.lru.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private long _curSize = 0;
private long _evict = 0;
private ExecutorService _statusExecutor = null;
private final DataLocker _locker = new DataLocker();
@Startup
void init() {
if (printStats) {
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(10000);
if (_curSize > 0)
Log.info("Cache status: size=" + _curSize / 1024 / 1024 + "MB" + " evicted=" + _evict);
_evict = 0;
}
} catch (InterruptedException ignored) {
}
});
}
}
@Nonnull
public Collection<JObjectKey> findAllObjects() {
return delegate.findAllObjects();
}
private void put(JObjectKey key, Optional<JDataVersionedWrapper<?>> obj) {
synchronized (_cache) {
int size = obj.map(o -> o.data().estimateSize()).orElse(0);
_curSize += size;
var old = _cache.putLast(key, new CacheEntry(obj, size));
if (old != null)
_curSize -= old.size();
while (_curSize >= sizeLimit) {
var del = _cache.pollFirstEntry();
_curSize -= del.getValue().size();
_evict++;
}
}
}
@Nonnull
public Optional<JDataVersionedWrapper<?>> readObject(JObjectKey name) {
try (var lock = _locker.lock(name)) {
synchronized (_cache) {
var got = _cache.get(name);
if (got != null) {
return got.object();
}
}
var got = delegate.readObject(name);
put(name, got);
return got;
}
}
public void writeObject(JObjectKey name, JDataVersionedWrapper<?> object) {
delegate.writeObject(name, object);
}
public void commitTx(TxManifest names) {
// During commit, readObject shouldn't be called for these items,
// it should be handled by the upstream store
for (var key : Stream.concat(names.written().stream(), names.deleted().stream()).toList()) {
_cache.remove(key);
}
delegate.commitTx(names);
}
}

View File

@@ -0,0 +1,38 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectSerializer;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
@ApplicationScoped
public class SerializingObjectPersistentStore {
@Inject
ObjectSerializer<JDataVersionedWrapper> serializer;
@Inject
ObjectPersistentStore delegate;
@Nonnull
Collection<JObjectKey> findAllObjects() {
return delegate.findAllObjects();
}
@Nonnull
Optional<JDataVersionedWrapper<?>> readObject(JObjectKey name) {
return delegate.readObject(name).map(serializer::deserialize);
}
void writeObject(JObjectKey name, JDataVersionedWrapper<?> object) {
delegate.writeObject(name, serializer.serialize(object));
}
void commitTx(TxManifest names) {
delegate.commitTx(names);
}
}

View File

@@ -1,7 +1,7 @@
dhfs.objects.persistence=files
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.lru.print-stats=false
dhfs.objects.lru.print-stats=true
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.root=${HOME}/dhfs_default/data/stuff

View File

@@ -106,7 +106,7 @@ public class DhfsFuse extends FuseStubFS {
stbuf.f_favail.set(Integer.MAX_VALUE - 2000); //FIXME:
stbuf.f_namemax.set(2048);
return super.statfs(path, stbuf);
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When statfs " + path, e);
return -ErrorCodes.EIO();
}
@@ -147,9 +147,6 @@ public class DhfsFuse extends FuseStubFS {
stat.st_atim.tv_sec.set(found.get().mtime() / 1000);
stat.st_atim.tv_nsec.set((found.get().mtime() % 1000) * 1000);
stat.st_blksize.set(blksize);
} catch (Exception e) {
Log.error("When getattr " + path, e);
return -ErrorCodes.EIO();
} catch (Throwable e) {
Log.error("When getattr " + path, e);
return -ErrorCodes.EIO();
@@ -168,7 +165,7 @@ public class DhfsFuse extends FuseStubFS {
timespec[1].tv_sec.get() * 1000);
if (!res) return -ErrorCodes.EINVAL();
else return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When utimens " + path, e);
return -ErrorCodes.EIO();
}
@@ -179,7 +176,7 @@ public class DhfsFuse extends FuseStubFS {
try {
if (fileService.open(path).isEmpty()) return -ErrorCodes.ENOENT();
return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When open " + path, e);
return -ErrorCodes.EIO();
}
@@ -197,7 +194,7 @@ public class DhfsFuse extends FuseStubFS {
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size();
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When reading " + path, e);
return -ErrorCodes.EIO();
}
@@ -211,15 +208,19 @@ public class DhfsFuse extends FuseStubFS {
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
buf.address(),
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
size
);
if (buffer.isDirect()) {
jnrPtrByteOutputAccessors.getUnsafe().copyMemory(
buf.address(),
jnrPtrByteOutputAccessors.getNioAccess().getBufferAddress(buffer),
size
);
} else {
buf.get(0, buffer.array(), 0, (int) size);
}
var written = fileService.write(fileOpt.get(), offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When writing " + path, e);
return -ErrorCodes.EIO();
}
@@ -231,7 +232,7 @@ public class DhfsFuse extends FuseStubFS {
var ret = fileService.create(path, mode);
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
else return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When creating " + path, e);
return -ErrorCodes.EIO();
}
@@ -244,7 +245,7 @@ public class DhfsFuse extends FuseStubFS {
return 0;
} catch (AlreadyExistsException aex) {
return -ErrorCodes.EEXIST();
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When creating dir " + path, e);
return -ErrorCodes.EIO();
}
@@ -257,7 +258,7 @@ public class DhfsFuse extends FuseStubFS {
return 0;
} catch (DirectoryNotEmptyException ex) {
return -ErrorCodes.ENOTEMPTY();
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When removing dir " + path, e);
return -ErrorCodes.EIO();
}
@@ -269,7 +270,7 @@ public class DhfsFuse extends FuseStubFS {
var ret = fileService.rename(path, newName);
if (!ret) return -ErrorCodes.ENOENT();
else return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When renaming " + path, e);
return -ErrorCodes.EIO();
}
@@ -281,7 +282,7 @@ public class DhfsFuse extends FuseStubFS {
try {
fileService.unlink(path);
return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When unlinking " + path, e);
return -ErrorCodes.EIO();
}
@@ -299,7 +300,7 @@ public class DhfsFuse extends FuseStubFS {
return 0;
else
return -ErrorCodes.ENOSPC();
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When truncating " + path, e);
return -ErrorCodes.EIO();
}
@@ -313,7 +314,7 @@ public class DhfsFuse extends FuseStubFS {
var ret = fileService.chmod(fileOpt.get(), mode);
if (ret) return 0;
else return -ErrorCodes.EINVAL();
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When chmod " + path, e);
return -ErrorCodes.EIO();
}
@@ -339,7 +340,7 @@ public class DhfsFuse extends FuseStubFS {
}
return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When readdir " + path, e);
return -ErrorCodes.EIO();
}
@@ -357,7 +358,7 @@ public class DhfsFuse extends FuseStubFS {
UnsafeByteOperations.unsafeWriteTo(read, new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
buf.putByte(Math.min(size - 1, read.size()), (byte) 0);
return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When reading " + path, e);
return -ErrorCodes.EIO();
}
@@ -369,7 +370,7 @@ public class DhfsFuse extends FuseStubFS {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When chown " + path, e);
return -ErrorCodes.EIO();
}
@@ -381,7 +382,7 @@ public class DhfsFuse extends FuseStubFS {
var ret = fileService.symlink(oldpath, newpath);
if (ret == null) return -ErrorCodes.EEXIST();
else return 0;
} catch (Exception e) {
} catch (Throwable e) {
Log.error("When creating " + newpath, e);
return -ErrorCodes.EIO();
}

View File

@@ -51,7 +51,7 @@ public class JnrPtrByteOutput extends ByteOutput {
var out = _backing.address() + _pos;
_accessors.getUnsafe().copyMemory(addr, out, rem);
} else {
throw new UnsupportedOperationException();
_backing.put(_pos, value.array(), value.arrayOffset() + value.position(), rem);
}
_pos += rem;

View File

@@ -9,20 +9,24 @@ public class UninitializedByteBuffer {
private static final Logger LOGGER = Logger.getLogger(UninitializedByteBuffer.class.getName());
public static ByteBuffer allocateUninitialized(int size) {
if (size < DhfsSupport.PAGE_SIZE)
return ByteBuffer.allocateDirect(size);
try {
if (size < DhfsSupport.PAGE_SIZE)
return ByteBuffer.allocateDirect(size);
var bb = new ByteBuffer[1];
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
var ret = bb[0];
CLEANER.register(ret, () -> {
try {
DhfsSupport.releaseByteBuffer(token);
} catch (Throwable e) {
LOGGER.severe("Error releasing buffer: " + e);
System.exit(-1);
}
});
return ret;
var bb = new ByteBuffer[1];
long token = DhfsSupport.allocateUninitializedByteBuffer(bb, size);
var ret = bb[0];
CLEANER.register(ret, () -> {
try {
DhfsSupport.releaseByteBuffer(token);
} catch (Throwable e) {
LOGGER.severe("Error releasing buffer: " + e);
System.exit(-1);
}
});
return ret;
} catch (OutOfMemoryError e) {
return ByteBuffer.allocate(size);
}
}
}