This commit is contained in:
2024-12-01 23:29:13 +01:00
parent bbf275855c
commit 094a3e5e76
28 changed files with 987 additions and 252 deletions

View File

@@ -0,0 +1 @@
lombok.accessors.prefix += _

View File

@@ -1,40 +0,0 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
@ApplicationScoped
public class DataLocker {
private final ConcurrentHashMap<JObjectKey, WeakReference<? extends LockWrapper<? extends JData>>> _locks = new ConcurrentHashMap<>();
private final static Cleaner CLEANER = Cleaner.create();
public <T extends JData> LockWrapper<T> get(T data) {
while (true) {
var have = _locks.get(data.getKey());
if (have != null) {
var ret = have.get();
if (ret != null) {
if (ret.sameObject(data)) {
return (LockWrapper<T>) ret;
} else {
Log.warn("Removed stale lock for " + data.getKey());
_locks.remove(data.getKey(), have);
}
}
}
var ret = new LockWrapper<>(data);
var ref = new WeakReference<>(ret);
if (_locks.putIfAbsent(data.getKey(), ref) == null) {
CLEANER.register(ret, () -> _locks.remove(data.getKey(), ref));
return ret;
}
}
}
}

View File

@@ -1,11 +1,8 @@
package com.usatiuk.dhfs.objects;
import java.util.function.Function;
// The base class for JObject data
// Only one instance of this exists per key, the instance in the manager is canonical
// When committing a transaction, the instance is checked against it, if it isn't the same, a race occurred.
public interface JData {
JObjectKey getKey();
JData bindCopy();
Function<JObjectInterface, JObject> binder();
}

View File

@@ -1,11 +0,0 @@
package com.usatiuk.dhfs.objects;
public abstract class JObject {
protected final JObjectInterface _jObjectInterface;
public JObject(JObjectInterface jObjectInterface) {
_jObjectInterface = jObjectInterface;
}
public abstract JData getData();
}

View File

@@ -1,9 +0,0 @@
package com.usatiuk.dhfs.objects;
import java.util.Optional;
public interface JObjectInterface {
Optional<JObject> getObject(JObjectKey key);
<T extends JObject> Optional<T> getObject(JObjectKey key, Class<T> type);
}

View File

@@ -1,90 +1,181 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.objects.transaction.TransactionFactory;
import com.usatiuk.dhfs.objects.transaction.TransactionObjectSource;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.HashMap;
import java.util.Map;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
// Manages all access to JData objects.
// In particular, it serves as a source of truth for what is committed to the backing storage.
// All data goes through it, it is responsible for transaction atomicity
@ApplicationScoped
public class JObjectManager {
@Inject
ObjectPersistentStore objectStorage;
@Inject
DataLocker dataLocker;
ObjectSerializer<JData> objectSerializer;
@Inject
ObjectAllocator objectAllocator;
@Inject
TransactionFactory transactionFactory;
public class Transaction implements JObjectInterface {
private final Map<JObjectKey, JObject> _objects = new HashMap<>();
private final DataLocker _storageReadLocker = new DataLocker();
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
private final AtomicLong _txCounter = new AtomicLong();
private JObject dataToObject(JData data) {
return data.binder().apply(this);
}
private class JDataWrapper<T extends JData> extends WeakReference<T> {
private static final Cleaner CLEANER = Cleaner.create();
@Override
public Optional<JObject> getObject(JObjectKey key) {
if (_objects.containsKey(key)) {
return Optional.of(_objects.get(key));
}
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
long lastWriteTx = 0;
var data = objectStorage.readObject(key).orElse(null);
if (data == null) {
return Optional.empty();
}
var ret = dataToObject(data);
_objects.put(key, ret);
return Optional.of(ret);
}
@Override
public <T extends JObject> Optional<T> getObject(JObjectKey key, Class<T> type) {
if (_objects.containsKey(key)) {
var got = _objects.get(key);
if (type.isInstance(got)) {
return Optional.of(type.cast(got));
} else {
throw new IllegalArgumentException("Object type mismatch");
}
}
var data = objectStorage.readObject(key).orElse(null);
if (data == null) {
return Optional.empty();
}
var got = dataToObject(data);
if (type.isInstance(got)) {
_objects.put(key, got);
return Optional.of(type.cast(got));
} else {
throw new IllegalArgumentException("Object type mismatch");
}
}
public void commit() {
_objects.forEach((key, value) -> {
var data = (TestData) value.getData();
if (!data.isChanged()) {
return;
}
if (_objectStorage.get(key) == null) {
_objectStorage.put(data.copy());
return;
}
if (_objectStorage.get(key).getVersion() <= data.getVersion()) {
_objectStorage.put(data.copy());
} else {
throw new IllegalArgumentException("Version mismatch");
}
public JDataWrapper(T referent) {
super(referent);
var key = referent.getKey();
CLEANER.register(referent, () -> {
_objects.remove(key, this);
});
}
}
public Transaction beginTransaction() {
return new Transaction();
private <T extends JData> Pair<T, JDataWrapper<T>> get(Class<T> type, JObjectKey key) {
while (true) {
{
var got = _objects.get(key);
if (got != null) {
var ref = got.get();
if (type.isInstance(ref)) {
return Pair.of(type.cast(ref), (JDataWrapper<T>) got);
} else if (ref == null) {
_objects.remove(key, got);
} else {
throw new IllegalArgumentException("Object type mismatch");
}
}
}
//noinspection unused
try (var readLock = _storageReadLocker.lock(key)) {
var read = objectStorage.readObject(key).orElse(null);
if (read == null) throw new IllegalArgumentException("Object not found");
var got = objectSerializer.deserialize(read);
if (type.isInstance(got)) {
var wrapper = new JDataWrapper<T>((T) got);
var old = _objects.putIfAbsent(key, wrapper);
if (old != null) continue;
return Pair.of(type.cast(got), wrapper);
} else if (got == null) {
return null;
} else {
throw new IllegalArgumentException("Object type mismatch");
}
}
}
}
}
private final TransactionObjectSource _objSource = new TransactionObjectSource() {
@Override
public <T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key) {
var got = JObjectManager.this.get(type, key);
if (got == null) return Optional.empty();
return Optional.of(new TransactionObject<>() {
@Override
public T get() {
return got.getLeft();
}
@Override
public ReadWriteLock getLock() {
return got.getRight().lock;
}
});
}
};
public TransactionPrivate createTransaction() {
return transactionFactory.createTransaction(_txCounter.getAndIncrement(), _objSource);
}
public void commit(TransactionPrivate tx) {
var toUnlock = new LinkedList<VoidFn>();
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>();
var toLock = new ArrayList<TxRecord.TxObjectRecordCopyNoLock<?>>();
try {
for (var entry : tx.drain()) {
switch (entry) {
case TxRecord.TxObjectRecordRead<?> read -> {
toUnlock.add(read.original().getLock().readLock()::unlock);
}
case TxRecord.TxObjectRecordCopyLock<?> copy -> {
toUnlock.add(copy.original().getLock().writeLock()::unlock);
if (copy.copy().isModified()) {
toFlush.add(copy);
}
}
case TxRecord.TxObjectRecordCopyNoLock<?> copy -> {
if (copy.copy().isModified()) {
toLock.add(copy);
toFlush.add(copy);
}
}
case TxRecord.TxObjectRecordNew<?> created -> {
toFlush.add(created);
}
default -> throw new IllegalStateException("Unexpected value: " + entry);
}
}
for (var record : toLock) {
var found = _objects.get(record.original().getKey());
if (found.get() != record.original()) {
throw new IllegalStateException("Object changed during transaction");
}
found.lock.writeLock().lock();
toUnlock.add(found.lock.writeLock()::unlock);
}
for (var record : toFlush) {
var current = _objects.get(record.copy().wrapped().getKey());
assert current == null && record instanceof TxRecord.TxObjectRecordNew<?> || current == record.copy().wrapped();
if (current.get() != )
}
} catch (Throwable t) {
Log.error("Error when committing transaction", t);
throw t;
} finally {
for (var unlock : toUnlock) {
unlock.apply();
}
}
}
}

View File

@@ -1,60 +0,0 @@
package com.usatiuk.dhfs.objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockWrapper<T extends JData> {
private final JData _data;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
public LockWrapper(T data) {
_data = data;
}
public boolean sameObject(JData data) {
return _data == data;
}
interface DataAccessor<T extends JData> extends AutoCloseable {
T getData();
}
public class ReadLocked<B extends JData> implements DataAccessor<B> {
public ReadLocked() {
_lock.readLock().lock();
}
@Override
public void close() {
_lock.readLock().unlock();
}
@Override
public B getData() {
return (B) _data;
}
}
public ReadLocked<T> read() {
return new ReadLocked<>();
}
public class WriteLocked<B extends JData> implements DataAccessor<B> {
public WriteLocked() {
_lock.writeLock().lock();
}
@Override
public void close() {
_lock.writeLock().unlock();
}
@Override
public B getData() {
return (B) _data;
}
}
public WriteLocked<T> write() {
return new WriteLocked<>();
}
}

View File

@@ -0,0 +1,16 @@
package com.usatiuk.dhfs.objects;
public interface ObjectAllocator {
<T extends JData> T create(Class<T> type, JObjectKey key);
interface ChangeTrackingJData<T extends JData> {
T wrapped();
boolean isModified();
}
// A copy of data that can be modified without affecting the original, and that can track changes
<T extends JData> ChangeTrackingJData<T> copy(T obj);
<T extends JData> T unmodifiable(T obj);
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects;
import com.google.protobuf.ByteString;
public interface ObjectSerializer<T extends JData> {
ByteString serialize(T obj);
T deserialize(ByteString data);
}

View File

@@ -0,0 +1,13 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
public interface TransactionManager {
void begin();
void commit();
void rollback();
Transaction current();
}

View File

@@ -0,0 +1,45 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class TransactionManagerImpl implements TransactionManager {
@Inject
JObjectManager objectManager;
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
@Override
public void begin() {
if (_currentTransaction.get() != null) {
throw new IllegalStateException("Transaction already started");
}
var tx = objectManager.createTransaction();
_currentTransaction.set(tx);
}
@Override
public void commit() {
if(_currentTransaction.get() == null) {
throw new IllegalStateException("No transaction started");
}
jobjectManager.commit(_currentTransaction.get());
}
@Override
public void rollback() {
}
@Override
public Transaction current() {
return _currentTransaction.get();
}
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects;
public interface TxBundle {
long getId();
void commit(JData obj);
void delete(JData obj);
}

View File

@@ -0,0 +1,17 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.VoidFn;
public interface TxWriteback {
TxBundle createBundle();
void commitBundle(TxBundle bundle);
void dropBundle(TxBundle bundle);
void fence(long bundleId);
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread
void asyncFence(long bundleId, VoidFn callback);
}

View File

@@ -0,0 +1,415 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import lombok.Getter;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ApplicationScoped
public class TxWritebackImpl implements TxWriteback {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
private final AtomicLong _counter = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
ObjectPersistentStore objectPersistentStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _commitExecutor;
private ExecutorService _statusExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(110) StartupEvent event) {
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("tx-writeback-%d")
.build();
_writebackExecutor = Executors.newSingleThreadExecutor(factory);
_writebackExecutor.submit(this::writeback);
}
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("writeback-commit-%d")
.build();
_commitExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
}
_statusExecutor = Executors.newSingleThreadExecutor();
_statusExecutor.submit(() -> {
try {
while (true) {
Thread.sleep(1000);
if (currentSize > 0)
Log.info("Tx commit status: size="
+ currentSize / 1024 / 1024 + "MB");
}
} catch (InterruptedException ignored) {
}
});
_ready = true;
}
void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
Log.info("Waiting for all transactions to drain");
synchronized (_flushWaitSynchronizer) {
_ready = false;
while (currentSize > 0) {
_flushWaitSynchronizer.wait();
}
}
_writebackExecutor.shutdownNow();
Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
}
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundle bundle = new TxBundle(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
}
var latch = new CountDownLatch(bundle._committed.size() + bundle._meta.size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var c : bundle._committed.values()) {
_commitExecutor.execute(() -> {
try {
Log.trace("Writing new " + c.newMeta.getName());
objectPersistentStore.writeNewObject(c.newMeta.getName(), c.newMeta, c.newData);
} catch (Throwable t) {
Log.error("Error writing " + c.newMeta.getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
for (var c : bundle._meta.values()) {
_commitExecutor.execute(() -> {
try {
Log.trace("Writing (meta) " + c.newMeta.getName());
objectPersistentStore.writeNewObjectMeta(c.newMeta.getName(), c.newMeta);
} catch (Throwable t) {
Log.error("Error writing " + c.newMeta.getName(), t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
if (Log.isDebugEnabled())
for (var d : bundle._deleted.keySet())
Log.debug("Deleting from persistent storage " + d.getMeta().getName()); // FIXME: For tests
latch.await();
if (!errors.isEmpty()) {
throw new RuntimeException("Errors in writeback!");
}
objectPersistentStore.commitTx(
new TxManifest(
Stream.concat(bundle._committed.keySet().stream().map(t -> t.getMeta().getName()),
bundle._meta.keySet().stream().map(t -> t.getMeta().getName())).collect(Collectors.toCollection(ArrayList::new)),
bundle._deleted.keySet().stream().map(t -> t.getMeta().getName()).collect(Collectors.toCollection(ArrayList::new))
));
Log.trace("Bundle " + bundle.getId() + " committed");
List<List<VoidFn>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenTx.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(VoidFn::apply));
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundle) bundle).calculateTotalSize();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
} catch (InterruptedException ignored) {
} catch (Exception e) {
Log.error("Uncaught exception in writeback", e);
} catch (Throwable o) {
Log.error("Uncaught THROWABLE in writeback", o);
}
}
Log.info("Writeback thread exiting");
}
@Override
public com.usatiuk.dhfs.objects.jrepository.TxBundle createBundle() {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
wait = false;
}
}
synchronized (_pendingBundles) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
synchronized (_notFlushedBundles) {
var bundle = new TxBundle(_counter.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
return bundle;
}
}
}
}
@Override
public void commitBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).calculateTotalSize();
}
}
}
@Override
public void dropBundle(com.usatiuk.dhfs.objects.TxBundle bundle) {
verifyReady();
synchronized (_pendingBundles) {
Log.warn("Dropped bundle: " + bundle);
_pendingBundles.remove((TxBundle) bundle);
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundle) bundle).calculateTotalSize();
}
}
}
@Override
public void fence(long bundleId) {
var latch = new CountDownLatch(1);
asyncFence(bundleId, latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void asyncFence(long bundleId, VoidFn fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenTx.get() >= bundleId) {
fn.apply();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
}
}
@Getter
private static class TxManifest implements com.usatiuk.dhfs.objects.repository.persistence.TxManifest {
private final ArrayList<String> _written;
private final ArrayList<String> _deleted;
private TxManifest(ArrayList<String> written, ArrayList<String> deleted) {
_written = written;
_deleted = deleted;
}
}
private class TxBundle implements com.usatiuk.dhfs.objects.jrepository.TxBundle {
private final HashMap<JObject<?>, CommittedEntry> _committed = new HashMap<>();
private final HashMap<JObject<?>, CommittedMeta> _meta = new HashMap<>();
private final HashMap<JObject<?>, Integer> _deleted = new HashMap<>();
private final ArrayList<VoidFn> _callbacks = new ArrayList<>();
private long _txId;
@Getter
private volatile boolean _ready = false;
private long _size = -1;
private boolean _wasCommitted = false;
private TxBundle(long txId) {_txId = txId;}
@Override
public long getId() {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(VoidFn callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<VoidFn> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
}
@Override
public void commit(JObject<?> obj, ObjectMetadataP meta, JObjectDataP data) {
synchronized (_committed) {
_committed.put(obj, new CommittedEntry(meta, data, obj.estimateSize()));
}
}
@Override
public void commitMetaChange(JObject<?> obj, ObjectMetadataP meta) {
synchronized (_meta) {
_meta.put(obj, new CommittedMeta(meta, obj.estimateSize()));
}
}
@Override
public void delete(JObject<?> obj) {
synchronized (_deleted) {
_deleted.put(obj, obj.estimateSize());
}
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
long out = 0;
for (var c : _committed.values())
out += c.size;
for (var c : _meta.values())
out += c.size;
for (var c : _deleted.entrySet())
out += c.getValue();
_size = out;
return _size;
}
public void compress(TxBundle other) {
if (_txId >= other._txId)
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
for (var d : other._deleted.entrySet()) {
_committed.remove(d.getKey());
_meta.remove(d.getKey());
_deleted.put(d.getKey(), d.getValue());
}
for (var c : other._committed.entrySet()) {
_committed.put(c.getKey(), c.getValue());
_meta.remove(c.getKey());
_deleted.remove(c.getKey());
}
for (var m : other._meta.entrySet()) {
var deleted = _deleted.remove(m.getKey());
if (deleted != null) {
_committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, null, m.getKey().estimateSize()));
continue;
}
var committed = _committed.remove(m.getKey());
if (committed != null) {
_committed.put(m.getKey(), new CommittedEntry(m.getValue().newMeta, committed.newData, m.getKey().estimateSize()));
continue;
}
_meta.put(m.getKey(), m.getValue());
}
}
private record CommittedEntry(ObjectMetadataP newMeta, JObjectDataP newData, int size) {}
private record CommittedMeta(ObjectMetadataP newMeta, int size) {}
private record Deleted(JObject<?> handle) {}
}
}

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.dhfs.utils.ByteUtils;
@@ -43,14 +43,14 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
// rest of metadata
@ApplicationScoped
public class SerializingFileObjectPersistentStore implements ObjectPersistentStore {
public class FileObjectPersistentStore implements ObjectPersistentStore {
private final Path _root;
private final Path _txManifest;
private ExecutorService _flushExecutor;
private RandomAccessFile _txFile;
private volatile boolean _ready = false;
public SerializingFileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
this._root = Path.of(root).resolve("objects");
_txManifest = Path.of(root).resolve("cur-tx-manifest");
}
@@ -135,7 +135,7 @@ public class SerializingFileObjectPersistentStore implements ObjectPersistentSto
@Nonnull
@Override
public Optional<JData> readObject(JObjectKey name) {
public Optional<ByteString> readObject(JObjectKey name) {
verifyReady();
var path = getObjPath(name);
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
@@ -146,10 +146,9 @@ public class SerializingFileObjectPersistentStore implements ObjectPersistentSto
var bs = UnsafeByteOperations.unsafeWrap(buf);
// This way, the input will be considered "immutable" which would allow avoiding copies
// when parsing byte arrays
var ch = bs.newCodedInput();
ch.enableAliasing(true);
// return JObjectDataP.parseFrom(ch);
return null;
// var ch = bs.newCodedInput();
// ch.enableAliasing(true);
return Optional.of(bs);
} catch (EOFException | FileNotFoundException | NoSuchFileException fx) {
return Optional.empty();
} catch (IOException e) {
@@ -169,13 +168,9 @@ public class SerializingFileObjectPersistentStore implements ObjectPersistentSto
throw new EOFException();
}
private void writeObjectImpl(Path path, JData data, boolean sync) throws IOException {
private void writeObjectImpl(Path path, ByteString data, boolean sync) throws IOException {
try (var fsb = new FileOutputStream(path.toFile(), false)) {
// int dataSize = data.getSerializedSize();
int dataSize = 0;
// if (fsb.getChannel().write(metaBb.limit(META_BLOCK_SIZE)) != META_BLOCK_SIZE)
// throw new IOException("Could not write to file");
data.writeTo(fsb);
if (sync) {
fsb.flush();
@@ -185,19 +180,7 @@ public class SerializingFileObjectPersistentStore implements ObjectPersistentSto
}
@Override
public void writeObjectDirect(JObjectKey name, JData data) {
verifyReady();
try {
var path = getObjPath(name);
writeObjectImpl(path, data, false);
} catch (IOException e) {
Log.error("Error writing file " + name, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Override
public void writeObject(JObjectKey name, JData obj) {
public void writeObject(JObjectKey name, ByteString obj) {
verifyReady();
try {
var tmpPath = getTmpObjPath(name);
@@ -207,7 +190,6 @@ public class SerializingFileObjectPersistentStore implements ObjectPersistentSto
}
}
private TxManifest readTxManifest() {
try {
var channel = _txFile.getChannel();
@@ -324,12 +306,6 @@ public class SerializingFileObjectPersistentStore implements ObjectPersistentSto
}
}
@Override
public void deleteObjectDirect(JObjectKey name) {
verifyReady();
deleteImpl(getObjPath(name));
}
@Override
public long getTotalSpace() {
verifyReady();

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
@@ -7,24 +8,19 @@ import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
// Persistent storage of objects
// All changes are written as sequential transactions
public interface ObjectPersistentStore {
@Nonnull
Collection<JObjectKey> findAllObjects();
@Nonnull
Optional<JData> readObject(JObjectKey name);
void writeObjectDirect(JObjectKey name, JData object);
void writeObject(JObjectKey name, JData object);
Optional<ByteString> readObject(JObjectKey name);
void writeObject(JObjectKey name, ByteString object);
void commitTx(TxManifest names);
// Deletes object metadata and data
void deleteObjectDirect(JObjectKey name);
long getTotalSpace();
long getFreeSpace();

View File

@@ -0,0 +1,7 @@
package com.usatiuk.dhfs.objects.transaction;
public enum LockingStrategy {
READ_ONLY, // Read only, no writes allowed, blocks writers
OPTIMISTIC, // Optimistic write, no blocking other possible writers
WRITE // Write lock, blocks all other writers
}

View File

@@ -0,0 +1,17 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import java.util.Optional;
// The transaction interface actually used by user code to retrieve objects
public interface Transaction {
<T extends JData> Optional<T> getObject(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> void putObject(JData obj);
default <T extends JData> Optional<T> getObject(Class<T> type, JObjectKey key) {
return getObject(type, key, LockingStrategy.READ_ONLY);
}
}

View File

@@ -0,0 +1,5 @@
package com.usatiuk.dhfs.objects.transaction;
public interface TransactionFactory {
TransactionPrivate createTransaction(long id, TransactionObjectSource source);
}

View File

@@ -0,0 +1,96 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectAllocator;
import jakarta.inject.Inject;
import lombok.AccessLevel;
import lombok.Getter;
import java.util.*;
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
ObjectAllocator objectAllocator;
private class TransactionImpl implements TransactionPrivate {
@Getter(AccessLevel.PUBLIC)
private final long _id;
private final TransactionObjectSource _source;
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>();
private TransactionImpl(long id, TransactionObjectSource source) {
_id = id;
_source = source;
}
@Override
public <T extends JData> Optional<T> getObject(Class<T> type, JObjectKey key, LockingStrategy strategy) {
var got = _objects.get(key);
if (got != null) {
var compatible = got.getIfStrategyCompatible(key, strategy);
if (compatible == null) {
throw new IllegalArgumentException("Locking strategy mismatch");
}
if (!type.isInstance(compatible)) {
throw new IllegalArgumentException("Object type mismatch");
}
return Optional.of(type.cast(compatible));
}
var read = _source.get(type, key).orElse(null);
if (read == null) {
return Optional.empty();
}
switch (strategy) {
case READ_ONLY: {
read.getLock().readLock().lock();
var view = objectAllocator.unmodifiable(read.get());
_objects.put(key, new TxRecord.TxObjectRecordRead<>(read, view));
return Optional.of(view);
}
case WRITE:
case OPTIMISTIC: {
var copy = objectAllocator.copy(read.get());
switch (strategy) {
case WRITE:
read.getLock().writeLock().lock();
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(read, copy));
break;
case OPTIMISTIC:
_objects.put(key, new TxRecord.TxObjectRecordCopyNoLock<>(read.get(), copy));
break;
}
return Optional.of(copy.wrapped());
}
default:
throw new IllegalArgumentException("Unknown locking strategy");
}
}
@Override
public void putObject(JData obj) {
if (_objects.containsKey(obj.getKey())) {
throw new IllegalArgumentException("Object already exists in transaction");
}
_objects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj));
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drain() {
return Collections.unmodifiableCollection(_objects.values());
}
}
@Override
public TransactionPrivate createTransaction(long id, TransactionObjectSource source) {
return new TransactionImpl(id, source);
}
}

View File

@@ -0,0 +1,17 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import java.util.Optional;
import java.util.concurrent.locks.ReadWriteLock;
public interface TransactionObjectSource {
interface TransactionObject<T extends JData> {
T get();
ReadWriteLock getLock();
}
<T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key);
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects.transaction;
import java.util.Collection;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction{
long getId();
Collection<TxRecord.TxObjectRecord<?>> drain();
}

View File

@@ -0,0 +1,73 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectAllocator;
public class TxRecord {
public interface TxObjectRecord<T> {
T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy);
}
public interface TxObjectRecordWrite<T extends JData> extends TxObjectRecord<T> {
ObjectAllocator.ChangeTrackingJData<T> copy();
}
public record TxObjectRecordRead<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
T copy)
implements TxObjectRecord<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.READ_ONLY)
return copy;
return null;
}
}
public record TxObjectRecordNew<T extends JData>(T created)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
return created;
return null;
}
@Override
public ObjectAllocator.ChangeTrackingJData<T> copy() {
return new ObjectAllocator.ChangeTrackingJData<T>() {
@Override
public T wrapped() {
return created;
}
@Override
public boolean isModified() {
return false;
}
};
}
}
public record TxObjectRecordCopyLock<T extends JData>(TransactionObjectSource.TransactionObject<T> original,
ObjectAllocator.ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
return copy.wrapped();
return null;
}
}
public record TxObjectRecordCopyNoLock<T extends JData>(T original,
ObjectAllocator.ChangeTrackingJData<T> copy)
implements TxObjectRecordWrite<T> {
@Override
public T getIfStrategyCompatible(JObjectKey key, LockingStrategy strategy) {
if (strategy == LockingStrategy.WRITE || strategy == LockingStrategy.OPTIMISTIC)
return copy.wrapped();
return null;
}
}
}

View File

@@ -30,13 +30,6 @@ public class FakeObjectStorage implements ObjectPersistentStore {
}
}
@Override
public void writeObjectDirect(JObjectKey name, JData object) {
synchronized (this) {
_objects.put(name, (TestData) object);
}
}
@Override
public void writeObject(JObjectKey name, JData object) {
synchronized (this) {

View File

@@ -2,12 +2,12 @@ package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.JObjectInterface;
import com.usatiuk.dhfs.objects.transaction.Transaction;
public class Kid extends JObject {
public Kid(JObjectInterface jObjectInterface, KidData data) {
super(jObjectInterface, data);
public Kid(Transaction Transaction, KidData data) {
super(Transaction, data);
}
@Override

View File

@@ -2,7 +2,7 @@ package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.JObjectInterface;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import java.util.function.Function;
@@ -13,7 +13,7 @@ public interface KidData extends JData {
KidData bindCopy();
default Function<JObjectInterface, JObject> binder() {
default Function<Transaction, JObject> binder(boolean isLocked) {
return jo -> new Kid(jo, bindCopy());
}
}

View File

@@ -2,15 +2,15 @@ package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.JObjectInterface;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import lombok.experimental.Delegate;
public class Parent extends JObject {
@Delegate
private final ParentData _data;
public Parent(JObjectInterface jObjectInterface, ParentData data) {
super(jObjectInterface);
public Parent(Transaction Transaction, ParentData data) {
super(Transaction);
_data = data;
}

View File

@@ -0,0 +1,53 @@
package com.usatiuk.dhfs.utils;
import java.util.concurrent.ConcurrentHashMap;
public class DataLocker {
private static class LockTag {
boolean released = false;
}
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
public class Lock implements AutoCloseable {
private final Object _key;
private final LockTag _tag;
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
}
@Override
public void close() {
synchronized (_tag) {
_tag.released = true;
_tag.notifyAll();
_locks.remove(_key, _tag);
}
}
}
public Lock lock(Object data) {
while (true) {
try {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released)
tag.wait();
continue;
}
}
} catch (InterruptedException ignored) {
}
var newTag = new LockTag();
var oldTag = _locks.putIfAbsent(data, newTag);
if (oldTag == null) {
return new Lock(data, newTag);
}
}
}
}