Run code cleanup

This commit is contained in:
2025-03-23 15:29:32 +01:00
parent c5a875c27f
commit c977b5f6c9
48 changed files with 595 additions and 645 deletions

View File

@@ -1,16 +1,17 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.Main" />
<module name="server" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.Main"/>
<module name="server"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
</component>

View File

@@ -1,16 +1,18 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.Main" />
<module name="server" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication"
nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfs.Main"/>
<module name="server"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Dcom.usatiuk.dhfs.supportlib.native-path=$ProjectFileDir$/target/classes/native -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
</component>

View File

@@ -6,8 +6,8 @@ import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import io.quarkus.gizmo.*;
import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.jboss.jandex.Type;
import org.jboss.jandex.*;
import org.jboss.jandex.Type;
import org.objectweb.asm.Opcodes;
import java.util.ArrayList;

View File

@@ -10,6 +10,6 @@ import lombok.Getter;
@Getter
public class SimpleObject extends AbstractObject {
public int numfield = 0;
private String name;
public ByteString someBytes;
private String name;
}

View File

@@ -3,7 +3,6 @@ package com.usatiuk.kleppmanntree;
import org.pcollections.PMap;
import java.io.Serializable;
import java.util.Map;
public interface TreeNode<TimestampT extends Comparable<TimestampT>, PeerIdT extends Comparable<PeerIdT>, MetaT extends NodeMeta, NodeIdT> extends Serializable {
NodeIdT key();

View File

@@ -20,6 +20,14 @@ public record JObjectKey(String name) implements Serializable, Comparable<JObjec
return new JObjectKey("");
}
public static JObjectKey fromBytes(byte[] bytes) {
return new JObjectKey(new String(bytes, StandardCharsets.UTF_8));
}
public static JObjectKey fromByteBuffer(ByteBuffer buff) {
return new JObjectKey(StandardCharsets.UTF_8.decode(buff).toString());
}
@Override
public int compareTo(JObjectKey o) {
return name.compareTo(o.name);
@@ -42,12 +50,4 @@ public record JObjectKey(String name) implements Serializable, Comparable<JObjec
directBb.flip();
return directBb;
}
public static JObjectKey fromBytes(byte[] bytes) {
return new JObjectKey(new String(bytes, StandardCharsets.UTF_8));
}
public static JObjectKey fromByteBuffer(ByteBuffer buff) {
return new JObjectKey(StandardCharsets.UTF_8.decode(buff).toString());
}
}

View File

@@ -9,27 +9,12 @@ import java.util.stream.Collectors;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
private final IteratorStart _initialStartType;
private final K _initialStartKey;
private interface FirstMatchState<K extends Comparable<K>, V> {
}
private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
private record FirstMatchFound<K extends Comparable<K>, V>(
CloseableKvIterator<K, V> iterator) implements FirstMatchState<K, V> {
}
private record FirstMatchConsumed<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
private final List<IterProdFn<K, V>> _pendingIterators;
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
// Fast path for the first element
private FirstMatchState<K, V> _firstMatchState;
private final List<IterProdFn<K, V>> _pendingIterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
@@ -61,6 +46,11 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
doInitialAdvance();
}
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void doInitialAdvance() {
if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
@@ -133,11 +123,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
doInitialAdvance();
}
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
if (!iterator.hasNext()) {
return;
@@ -288,7 +273,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
return curVal;
}
@Override
public void close() {
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
@@ -307,4 +291,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
", _iterators=" + _iterators +
'}';
}
private interface FirstMatchState<K extends Comparable<K>, V> {
}
private record FirstMatchNone<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
private record FirstMatchFound<K extends Comparable<K>, V>(
CloseableKvIterator<K, V> iterator) implements FirstMatchState<K, V> {
}
private record FirstMatchConsumed<K extends Comparable<K>, V>() implements FirstMatchState<K, V> {
}
}

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.*;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;

View File

@@ -1,13 +1,15 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.stores.WritebackObjectPersistentStore;
import com.usatiuk.dhfs.objects.transaction.TxRecord;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
@ApplicationScoped

View File

@@ -23,21 +23,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
@ApplicationScoped
public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>();
private TreePMap<JObjectKey, CacheEntry> _sortedCache = TreePMap.empty();
private long _cacheVersion = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@Inject
LockManager lockManager;
@Inject
SerializingObjectPersistentStore delegate;
@ConfigProperty(name = "dhfs.objects.lru.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
boolean printStats;
private TreePMap<JObjectKey, CacheEntry> _sortedCache = TreePMap.empty();
private long _cacheVersion = 0;
private long _curSize = 0;
private long _evict = 0;
@@ -181,6 +177,48 @@ public class CachingObjectPersistentStore {
}
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curSortedCache, mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curSortedCache.get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
}
@Override
public long id() {
return _backing.id();
}
@Override
public void close() {
_backing.close();
}
private class CachingKvIterator implements CloseableKvIterator<JObjectKey, JDataVersionedWrapper> {
private final CloseableKvIterator<JObjectKey, JDataVersionedWrapper> _delegate;
@@ -237,48 +275,6 @@ public class CachingObjectPersistentStore {
return next;
}
}
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("cache", start, key,
(mS, mK)
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curSortedCache, mS, mK),
e -> {
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),
(mS, mK) -> new MappingKvIterator<>(new CachingKvIterator(_backing.getIterator(start, key)), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _curSortedCache.get(name);
if (cached != null) {
return switch (cached.object()) {
case Data<JDataVersionedWrapper> data -> Optional.of(data.value());
case Tombstone<JDataVersionedWrapper> tombstone -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached.object());
};
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
}
@Override
public long id() {
return _backing.id();
}
@Override
public void close() {
_backing.close();
}
};
} catch (Throwable ex) {
if (backing != null) {
@@ -288,10 +284,10 @@ public class CachingObjectPersistentStore {
}
}
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, long size) {
}
public long getLastTxId() {
return delegate.getLastCommitId();
}
private record CacheEntry(MaybeTombstone<JDataVersionedWrapper> object, long size) {
}
}

View File

@@ -40,19 +40,16 @@ import static org.lmdbjava.Env.create;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "lmdb")
public class LmdbObjectPersistentStore implements ObjectPersistentStore {
private static final String DB_NAME = "objects";
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
private final Path _root;
private final AtomicReference<RefcountedCloseable<Txn<ByteBuffer>>> _curReadTxn = new AtomicReference<>();
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private Env<ByteBuffer> _env;
private Dbi<ByteBuffer> _db;
private boolean _ready = false;
private final AtomicReference<RefcountedCloseable<Txn<ByteBuffer>>> _curReadTxn = new AtomicReference<>();
private long _lastTxId = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private static final String DB_NAME = "objects";
private static final byte[] DB_VER_OBJ_NAME = "__DB_VER_OBJ".getBytes(StandardCharsets.UTF_8);
public LmdbObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
_root = Path.of(root).resolve("objects");
}
@@ -121,15 +118,135 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
_lock.readLock().lock();
try {
var txn = new RefcountedCloseable<>(_env.txnRead());
var commitId = getLastCommitId();
return new Snapshot<JObjectKey, ByteString>() {
private final RefcountedCloseable<Txn<ByteBuffer>> _txn = txn;
private final long _id = commitId;
private boolean _closed = false;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteString::copyFrom);
return ret;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.unref();
}
};
} finally {
_lock.readLock().unlock();
}
}
@Override
public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
verifyReady();
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
for (var written : names.written()) {
// TODO:
var bb = UninitializedByteBuffer.allocateUninitialized(written.getValue().size());
bb.put(written.getValue().asReadOnlyByteBuffer());
bb.flip();
_db.put(txn, written.getKey().toByteBuffer(), bb);
}
for (JObjectKey key : names.deleted()) {
_db.delete(txn, key.toByteBuffer());
}
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
commitLocked.accept(() -> {
_lock.writeLock().lock();
try {
var realTxId = txId;
if (realTxId == -1)
realTxId = _lastTxId + 1;
assert realTxId > _lastTxId;
_lastTxId = realTxId;
bbData.putLong(realTxId);
bbData.flip();
_db.put(txn, bb, bbData);
_curReadTxn.set(null);
txn.commit();
} finally {
_lock.writeLock().unlock();
}
});
}
}
@Override
public long getTotalSpace() {
verifyReady();
return _root.toFile().getTotalSpace();
}
@Override
public long getFreeSpace() {
verifyReady();
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
@Override
public long getLastCommitId() {
_lock.readLock().lock();
try {
return _lastTxId;
} finally {
_lock.readLock().unlock();
}
}
private class LmdbKvIterator extends ReversibleKvIterator<JObjectKey, ByteString> {
private static final Cleaner CLEANER = Cleaner.create();
private final RefcountedCloseable<Txn<ByteBuffer>> _txn;
private final Cursor<ByteBuffer> _cursor;
private boolean _hasNext = false;
private static final Cleaner CLEANER = Cleaner.create();
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
// private final Exception _allocationStacktrace = new Exception();
private final Exception _allocationStacktrace = null;
private boolean _hasNext = false;
LmdbKvIterator(RefcountedCloseable<Txn<ByteBuffer>> txn, IteratorStart start, JObjectKey key) {
_txn = txn;
@@ -286,125 +403,4 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
}
}
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
return new KeyPredicateKvIterator<>(new LmdbKvIterator(start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Override
public Snapshot<JObjectKey, ByteString> getSnapshot() {
_lock.readLock().lock();
try {
var txn = new RefcountedCloseable<>(_env.txnRead());
var commitId = getLastCommitId();
return new Snapshot<JObjectKey, ByteString>() {
private boolean _closed = false;
private final RefcountedCloseable<Txn<ByteBuffer>> _txn = txn;
private final long _id = commitId;
@Override
public CloseableKvIterator<JObjectKey, ByteString> getIterator(IteratorStart start, JObjectKey key) {
assert !_closed;
return new KeyPredicateKvIterator<>(new LmdbKvIterator(_txn.ref(), start, key), start, key, (k) -> !Arrays.equals(k.name().getBytes(StandardCharsets.UTF_8), DB_VER_OBJ_NAME));
}
@Nonnull
@Override
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteString::copyFrom);
return ret;
}
@Override
public long id() {
assert !_closed;
return _id;
}
@Override
public void close() {
assert !_closed;
_closed = true;
_txn.unref();
}
};
} finally {
_lock.readLock().unlock();
}
}
@Override
public void commitTx(TxManifestRaw names, long txId, Consumer<Runnable> commitLocked) {
verifyReady();
try (Txn<ByteBuffer> txn = _env.txnWrite()) {
for (var written : names.written()) {
// TODO:
var bb = UninitializedByteBuffer.allocateUninitialized(written.getValue().size());
bb.put(written.getValue().asReadOnlyByteBuffer());
bb.flip();
_db.put(txn, written.getKey().toByteBuffer(), bb);
}
for (JObjectKey key : names.deleted()) {
_db.delete(txn, key.toByteBuffer());
}
var bb = ByteBuffer.allocateDirect(DB_VER_OBJ_NAME.length);
bb.put(DB_VER_OBJ_NAME);
bb.flip();
var bbData = ByteBuffer.allocateDirect(8);
commitLocked.accept(() -> {
_lock.writeLock().lock();
try {
var realTxId = txId;
if (realTxId == -1)
realTxId = _lastTxId + 1;
assert realTxId > _lastTxId;
_lastTxId = realTxId;
bbData.putLong(realTxId);
bbData.flip();
_db.put(txn, bb, bbData);
_curReadTxn.set(null);
txn.commit();
} finally {
_lock.writeLock().unlock();
}
});
}
}
@Override
public long getTotalSpace() {
verifyReady();
return _root.toFile().getTotalSpace();
}
@Override
public long getFreeSpace() {
verifyReady();
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
@Override
public long getLastCommitId() {
_lock.readLock().lock();
try {
return _lastTxId;
} finally {
_lock.readLock().unlock();
}
}
}

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.iterators.NavigableMapKvIterator;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
@@ -18,9 +18,9 @@ import java.util.function.Consumer;
@ApplicationScoped
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
public class MemoryObjectPersistentStore implements ObjectPersistentStore {
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private TreePMap<JObjectKey, ByteString> _objects = TreePMap.empty();
private long _lastCommitId = 0;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
@Nonnull
@Override

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;

View File

@@ -1,7 +1,9 @@
package com.usatiuk.dhfs.objects.stores;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.ObjectSerializer;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.iterators.MappingKvIterator;

View File

@@ -1,6 +1,8 @@
package com.usatiuk.dhfs.objects.stores;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JDataVersionedWrapperImpl;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.transaction.TxCommitException;
@@ -304,6 +306,150 @@ public class WritebackObjectPersistentStore {
}
}
public Optional<PendingWriteEntry> getPendingWrite(JObjectKey key) {
synchronized (_pendingBundles) {
return Optional.ofNullable(_pendingWrites.get().get(key));
}
}
@Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var pending = getPendingWrite(name).orElse(null);
return switch (pending) {
case PendingWrite write -> Optional.of(write.data());
case PendingDelete ignored -> Optional.empty();
case null -> cachedStore.readObject(name);
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
@Nonnull
public VerboseReadResult readObjectVerbose(JObjectKey key) {
var pending = getPendingWrite(key).orElse(null);
if (pending != null) {
return new VerboseReadResultPending(pending);
}
return new VerboseReadResultPersisted(cachedStore.readObject(key));
}
/**
* @param commitLocked - a function that will be called with a Consumer of a new transaction id,
* that will commit the transaction the changes in the store will be visible to new transactions
* only after the runnable is called
*/
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, BiConsumer<Long, Runnable> commitLocked) {
var bundle = createBundle();
long bundleId = bundle.getId();
try {
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundleId));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
} catch (Throwable t) {
dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", bundleId);
commitLocked.accept(bundleId, () -> {
commitBundle(bundle);
});
return r -> asyncFence(bundleId, r);
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
PSortedMap<JObjectKey, PendingWriteEntry> pendingWrites;
Snapshot<JObjectKey, JDataVersionedWrapper> cache = null;
long lastTxId;
try {
_pendingWritesVersionLock.readLock().lock();
try {
pendingWrites = _pendingWrites.get();
cache = cachedStore.getSnapshot();
lastTxId = getLastTxId();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalCache = cache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final PSortedMap<JObjectKey, PendingWriteEntry> _pendingWrites = pendingWrites;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _cache = finalCache;
private final long txId = lastTxId;
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _pendingWrites.get(name);
if (cached != null) {
return switch (cached) {
case PendingWrite c -> Optional.of(c.data());
case PendingDelete d -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
return _cache.readObject(name);
}
@Override
public long id() {
assert lastTxId >= _cache.id();
return lastTxId;
}
@Override
public void close() {
_cache.close();
}
};
} catch (Throwable e) {
if (cache != null)
cache.close();
throw e;
}
}
public long getLastTxId() {
_pendingWritesVersionLock.readLock().lock();
try {
return _lastCommittedTx.get();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
}
public interface VerboseReadResult {
}
private static class TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
@@ -385,154 +531,9 @@ public class WritebackObjectPersistentStore {
}
}
public Optional<PendingWriteEntry> getPendingWrite(JObjectKey key) {
synchronized (_pendingBundles) {
return Optional.ofNullable(_pendingWrites.get().get(key));
}
}
@Nonnull
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var pending = getPendingWrite(name).orElse(null);
return switch (pending) {
case PendingWrite write -> Optional.of(write.data());
case PendingDelete ignored -> Optional.empty();
case null -> cachedStore.readObject(name);
default -> throw new IllegalStateException("Unexpected value: " + pending);
};
}
public interface VerboseReadResult {
}
public record VerboseReadResultPersisted(Optional<JDataVersionedWrapper> data) implements VerboseReadResult {
}
public record VerboseReadResultPending(PendingWriteEntry pending) implements VerboseReadResult {
}
@Nonnull
public VerboseReadResult readObjectVerbose(JObjectKey key) {
var pending = getPendingWrite(key).orElse(null);
if (pending != null) {
return new VerboseReadResultPending(pending);
}
return new VerboseReadResultPersisted(cachedStore.readObject(key));
}
/**
* @param commitLocked - a function that will be called with a Consumer of a new transaction id,
* that will commit the transaction the changes in the store will be visible to new transactions
* only after the runnable is called
*/
public Consumer<Runnable> commitTx(Collection<TxRecord.TxObjectRecord<?>> writes, BiConsumer<Long, Runnable> commitLocked) {
var bundle = createBundle();
long bundleId = bundle.getId();
try {
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundleId));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
bundle.delete(deleted.key());
}
default -> {
throw new TxCommitException("Unexpected value: " + action.key());
}
}
}
} catch (Throwable t) {
dropBundle(bundle);
throw new TxCommitException(t.getMessage(), t);
}
Log.tracef("Committing transaction %d to storage", bundleId);
commitLocked.accept(bundleId, () -> {
commitBundle(bundle);
});
return r -> asyncFence(bundleId, r);
}
public Snapshot<JObjectKey, JDataVersionedWrapper> getSnapshot() {
PSortedMap<JObjectKey, PendingWriteEntry> pendingWrites;
Snapshot<JObjectKey, JDataVersionedWrapper> cache = null;
long lastTxId;
try {
_pendingWritesVersionLock.readLock().lock();
try {
pendingWrites = _pendingWrites.get();
cache = cachedStore.getSnapshot();
lastTxId = getLastTxId();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
Snapshot<JObjectKey, JDataVersionedWrapper> finalCache = cache;
return new Snapshot<JObjectKey, JDataVersionedWrapper>() {
private final PSortedMap<JObjectKey, PendingWriteEntry> _pendingWrites = pendingWrites;
private final Snapshot<JObjectKey, JDataVersionedWrapper> _cache = finalCache;
private final long txId = lastTxId;
@Override
public CloseableKvIterator<JObjectKey, JDataVersionedWrapper> getIterator(IteratorStart start, JObjectKey key) {
return new TombstoneMergingKvIterator<>("writeback-ps", start, key,
(tS, tK) -> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_pendingWrites, tS, tK),
e -> switch (e) {
case PendingWrite pw -> new Data<>(pw.data());
case PendingDelete d -> new Tombstone<>();
default -> throw new IllegalStateException("Unexpected value: " + e);
}),
(tS, tK) -> new MappingKvIterator<>(_cache.getIterator(tS, tK), Data::new));
}
@Nonnull
@Override
public Optional<JDataVersionedWrapper> readObject(JObjectKey name) {
var cached = _pendingWrites.get(name);
if (cached != null) {
return switch (cached) {
case PendingWrite c -> Optional.of(c.data());
case PendingDelete d -> {
yield Optional.empty();
}
default -> throw new IllegalStateException("Unexpected value: " + cached);
};
}
return _cache.readObject(name);
}
@Override
public long id() {
assert lastTxId >= _cache.id();
return lastTxId;
}
@Override
public void close() {
_cache.close();
}
};
} catch (Throwable e) {
if (cache != null)
cache.close();
throw e;
}
}
public long getLastTxId() {
_pendingWritesVersionLock.readLock().lock();
try {
return _lastCommittedTx.get();
} finally {
_pendingWritesVersionLock.readLock().unlock();
}
}
}

View File

@@ -26,13 +26,18 @@ import java.util.stream.Stream;
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
private boolean _ready = false;
@Inject
SnapshotManager snapshotManager;
@Inject
TransactionFactory transactionFactory;
@Inject
LockManager lockManager;
private boolean _ready = false;
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Wrong service order!");
@@ -42,11 +47,6 @@ public class JObjectManager {
_ready = true;
}
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = List.copyOf(preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList());
Log.debugv("Pre-commit hooks: {0}", String.join("->", _preCommitTxHooks.stream().map(Objects::toString).toList()));
}
public TransactionPrivate createTransaction() {
verifyReady();
var tx = transactionFactory.createTransaction();

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import java.util.Optional;

View File

@@ -55,81 +55,18 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
private class TransactionImpl implements TransactionPrivate {
private boolean _closed = false;
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
private boolean _closed = false;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl() {
_snapshot = snapshotManager.createSnapshot();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
@Override
public void onCommit(Runnable runnable) {
_onCommit.add(runnable);
@@ -260,5 +197,66 @@ public class TransactionFactoryImpl implements TransactionFactory {
_closed = true;
_snapshot.close();
}
private class ReadTrackingIterator implements CloseableKvIterator<JObjectKey, JData> {
private final CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> _backing;
public ReadTrackingIterator(CloseableKvIterator<JObjectKey, ReadTrackingInternalCrap> backing) {
_backing = backing;
}
@Override
public JObjectKey peekNextKey() {
return _backing.peekNextKey();
}
@Override
public void skip() {
_backing.skip();
}
@Override
public JObjectKey peekPrevKey() {
return _backing.peekPrevKey();
}
@Override
public Pair<JObjectKey, JData> prev() {
var got = _backing.prev();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
@Override
public boolean hasPrev() {
return _backing.hasPrev();
}
@Override
public void skipPrev() {
_backing.skipPrev();
}
@Override
public void close() {
_backing.close();
}
@Override
public boolean hasNext() {
return _backing.hasNext();
}
@Override
public Pair<JObjectKey, JData> next() {
var got = _backing.next();
if (got.getValue() instanceof ReadTrackingInternalCrapSource(JDataVersionedWrapper wrapped)) {
_readSet.putIfAbsent(got.getKey(), new TransactionObjectNoLock<>(Optional.of(wrapped)));
}
return Pair.of(got.getKey(), got.getValue().obj());
}
}
}
}

View File

@@ -4,7 +4,6 @@ import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import java.util.Collection;

View File

@@ -12,80 +12,6 @@ import java.util.NoSuchElementException;
public class MergingKvIteratorTest {
private class SimpleIteratorWrapper<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final Iterator<Pair<K, V>> _iterator;
private Pair<K, V> _next;
public SimpleIteratorWrapper(Iterator<Pair<K, V>> iterator) {
_iterator = iterator;
fillNext();
}
private void fillNext() {
while (_iterator.hasNext() && _next == null) {
_next = _iterator.next();
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void skip() {
if (_next == null) {
throw new NoSuchElementException();
}
_next = null;
fillNext();
}
@Override
public K peekPrevKey() {
throw new UnsupportedOperationException();
}
@Override
public Pair<K, V> prev() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasPrev() {
throw new UnsupportedOperationException();
}
@Override
public void skipPrev() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
_next = null;
fillNext();
return ret;
}
}
@Test
public void testTestIterator() {
var list = List.of(Pair.of(1, 2), Pair.of(3, 4), Pair.of(5, 6));
@@ -345,4 +271,78 @@ public class MergingKvIteratorTest {
}
Assertions.assertFalse(mergingIterator2.hasNext());
}
private class SimpleIteratorWrapper<K extends Comparable<K>, V> implements CloseableKvIterator<K, V> {
private final Iterator<Pair<K, V>> _iterator;
private Pair<K, V> _next;
public SimpleIteratorWrapper(Iterator<Pair<K, V>> iterator) {
_iterator = iterator;
fillNext();
}
private void fillNext() {
while (_iterator.hasNext() && _next == null) {
_next = _iterator.next();
}
}
@Override
public K peekNextKey() {
if (_next == null) {
throw new NoSuchElementException();
}
return _next.getKey();
}
@Override
public void skip() {
if (_next == null) {
throw new NoSuchElementException();
}
_next = null;
fillNext();
}
@Override
public K peekPrevKey() {
throw new UnsupportedOperationException();
}
@Override
public Pair<K, V> prev() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasPrev() {
throw new UnsupportedOperationException();
}
@Override
public void skipPrev() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
}
@Override
public boolean hasNext() {
return _next != null;
}
@Override
public Pair<K, V> next() {
if (_next == null) {
throw new NoSuchElementException("No more elements");
}
var ret = _next;
_next = null;
fillNext();
return ret;
}
}
}

View File

@@ -1,10 +1,5 @@
package com.usatiuk.dhfs.objects.snapshot;
import com.usatiuk.dhfs.objects.JObjectKey;
import org.junit.jupiter.api.Test;
import java.util.Map;
public class SnapshotKvIteratorTest {

View File

@@ -6,7 +6,6 @@ import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
import org.apache.commons.lang3.tuple.Pair;
import java.util.List;
import java.util.Map;
public record FileDto(File file, List<Pair<Long, JObjectKey>> chunks) implements JDataRemoteDto {
@Override

View File

@@ -35,11 +35,6 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
false);
}
@Override
public JObjectKey key() {
return ofMetaKey(key);
}
public static JObjectKey ofMetaKey(JObjectKey key) {
return key;
}
@@ -48,6 +43,11 @@ public record RemoteObjectMeta(PCollection<JDataRef> refsFrom, boolean frozen,
return JObjectKey.of("data_" + key.name());
}
@Override
public JObjectKey key() {
return ofMetaKey(key);
}
public JObjectKey dataKey() {
return ofDataKey(key);
}

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaDirectory;
@@ -11,6 +10,7 @@ import com.usatiuk.dhfs.objects.repository.invalidation.Op;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;
import com.usatiuk.kleppmanntree.*;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;

View File

@@ -9,8 +9,6 @@ import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
@Singleton
public class JKleppmannTreePeerInterface implements PeerInterface<PeerId> {

View File

@@ -2,8 +2,6 @@ package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.PeerId;
import java.util.UUID;
public class JKleppmannTreePeriodicPushOp {
private final PeerId _from;
private final long _timestamp;

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JObjectKey;
import java.util.Objects;

View File

@@ -49,7 +49,7 @@ public class JMapHelper {
curTx.delete(makeKey(holder.key(), key));
}
public <K extends JMapKey> void deleteAll(JMapHolder<K> he) {
public <K extends JMapKey> void deleteAll(JMapHolder<K> he) {
ArrayList<K> collectedKeys = new ArrayList<>();
try (var it = getIterator(he)) {
while (it.hasNext()) {

View File

@@ -1,8 +1,8 @@
package com.usatiuk.dhfs.objects.jmap;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.CloseableKvIterator;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;

View File

@@ -8,15 +8,15 @@ public record JMapLongKey(long key) implements JMapKey, Comparable<JMapKey>, Ser
return new JMapLongKey(key);
}
public static JMapLongKey max() {
return new JMapLongKey(Long.MAX_VALUE);
}
@Override
public String toString() {
return String.format("%016d", key);
}
public static JMapLongKey max() {
return new JMapLongKey(Long.MAX_VALUE);
}
@Override
public int compareTo(@Nonnull JMapKey o) {
if (!(o instanceof JMapLongKey lk)) {

View File

@@ -26,19 +26,17 @@ import java.util.stream.Collectors;
@ApplicationScoped
public class RemoteObjectServiceClient {
private final ExecutorService _batchExecutor = Executors.newVirtualThreadPerTaskExecutor();
@Inject
PersistentPeerDataService persistentPeerDataService;
@Inject
RpcClientFactory rpcClientFactory;
@Inject
TransactionManager txm;
@Inject
Transaction curTx;
@Inject
RemoteTransaction remoteTx;
@Inject
SyncHandler syncHandler;
@Inject
@@ -48,8 +46,6 @@ public class RemoteObjectServiceClient {
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
private final ExecutorService _batchExecutor = Executors.newVirtualThreadPerTaskExecutor();
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
var reply = client.getObject(GetObjectRequest.newBuilder().setName(JObjectKeyP.newBuilder().setName(key.toString()).build()).build());

View File

@@ -19,6 +19,9 @@ import java.util.stream.Stream;
@ApplicationScoped
public class SyncHandler {
private final Map<Class<? extends JDataRemote>, ObjSyncHandler> _objToSyncHandler;
private final Map<Class<? extends JDataRemoteDto>, ObjSyncHandler> _dtoToSyncHandler;
private final Map<Class<? extends JData>, InitialSyncProcessor> _initialSyncProcessors;
@Inject
Transaction curTx;
@Inject
@@ -31,10 +34,6 @@ public class SyncHandler {
DefaultObjSyncHandler defaultObjSyncHandler;
@Inject
RemoteTransaction remoteTx;
private final Map<Class<? extends JDataRemote>, ObjSyncHandler> _objToSyncHandler;
private final Map<Class<? extends JDataRemoteDto>, ObjSyncHandler> _dtoToSyncHandler;
private final Map<Class<? extends JData>, InitialSyncProcessor> _initialSyncProcessors;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;

View File

@@ -1,23 +1,9 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.pcollections.PMap;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
@ApplicationScoped
public class SyncHandlerService {
}

View File

@@ -7,13 +7,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SyncHelper {
public enum ChangelogCmpResult {
EQUAL,
NEWER,
OLDER,
CONFLICT
}
public static ChangelogCmpResult compareChangelogs(PMap<PeerId, Long> current, PMap<PeerId, Long> other) {
boolean hasLower = false;
boolean hasHigher = false;
@@ -36,6 +29,13 @@ public class SyncHelper {
return ChangelogCmpResult.EQUAL;
}
public enum ChangelogCmpResult {
EQUAL,
NEWER,
OLDER,
CONFLICT
}
// public static PMap<PeerId,Long> mergeChangelogs(PMap<PeerId, Long> current, PMap<PeerId, Long> other) {
// return current.plusAll(other);
// }

View File

@@ -8,7 +8,6 @@ import com.usatiuk.dhfs.objects.persistence.JObjectKeyP;
import com.usatiuk.dhfs.objects.persistence.PeerIdP;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.pcollections.HashTreePMap;
import org.pcollections.PMap;

View File

@@ -1,13 +1,11 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.HashSetValuedHashMap;
import java.io.Serial;
import java.io.Serializable;
import java.util.UUID;
public class DeferredInvalidationQueueData implements Serializable {
@Serial

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.JDataRemoteDto;
@@ -8,7 +7,6 @@ import org.pcollections.PMap;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
public record IndexUpdateOp(JObjectKey key, PMap<PeerId, Long> changelog, JDataRemoteDto data) implements Op {
@Override

View File

@@ -1,6 +1,5 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
import com.usatiuk.dhfs.objects.JObjectKey;
import java.io.Serializable;

View File

@@ -4,5 +4,6 @@ import com.usatiuk.dhfs.objects.PeerId;
public interface PeerAddress {
PeerId peer();
PeerAddressType type();
}

View File

@@ -15,29 +15,10 @@ import java.util.stream.Collectors;
@ApplicationScoped
public class PeerDiscoveryDirectory {
private final MultiValuedMap<PeerId, PeerEntry> _entries = new HashSetValuedHashMap<>();
@ConfigProperty(name = "dhfs.peerdiscovery.timeout")
long timeout;
private record PeerEntry(PeerAddress addr, long lastSeen) {
public PeerEntry withLastSeen(long lastSeen) {
return new PeerEntry(addr, lastSeen);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
PeerEntry peerEntry = (PeerEntry) o;
return Objects.equals(addr, peerEntry.addr);
}
@Override
public int hashCode() {
return Objects.hashCode(addr);
}
}
private final MultiValuedMap<PeerId, PeerEntry> _entries = new HashSetValuedHashMap<>();
public void notifyAddr(PeerAddress addr) {
Log.tracev("New address {0}", addr);
synchronized (_entries) {
@@ -73,4 +54,22 @@ public class PeerDiscoveryDirectory {
return partitioned.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toUnmodifiableSet());
}
}
private record PeerEntry(PeerAddress addr, long lastSeen) {
public PeerEntry withLastSeen(long lastSeen) {
return new PeerEntry(addr, lastSeen);
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
PeerEntry peerEntry = (PeerEntry) o;
return Objects.equals(addr, peerEntry.addr);
}
@Override
public int hashCode() {
return Objects.hashCode(addr);
}
}
}

View File

@@ -16,6 +16,8 @@ import java.util.stream.Stream;
@ApplicationScoped
public class StaticPeerDiscovery {
private final List<IpPeerAddress> _peers;
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
public StaticPeerDiscovery(@ConfigProperty(name = "dhfs.peerdiscovery.static-peers") Optional<String> staticPeers) {
var peers = staticPeers.orElse("");
@@ -34,9 +36,6 @@ public class StaticPeerDiscovery {
}).toList();
}
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
@Scheduled(every = "1s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
public void discoverPeers() {
for (var peer : _peers) {

View File

@@ -24,13 +24,10 @@ import java.nio.ByteBuffer;
public class LocalPeerDiscoveryClient {
@Inject
PeerDiscoveryDirectory peerDiscoveryDirectory;
private Thread _clientThread;
private DatagramSocket _socket;
@ConfigProperty(name = "dhfs.objects.peerdiscovery.broadcast")
boolean enabled;
private Thread _clientThread;
private DatagramSocket _socket;
@Startup
void init() throws SocketException, UnknownHostException {

View File

@@ -3,7 +3,6 @@ package com.usatiuk.dhfs.objects.repository.peersync;
import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.PeerInfoP;
import com.usatiuk.dhfs.utils.SerializationHelper;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Singleton;
import java.io.IOException;

View File

@@ -19,6 +19,7 @@ import java.util.Optional;
@ApplicationScoped
public class PeerInfoService {
public static final JObjectKey TREE_KEY = JObjectKey.of("peers");
@Inject
Transaction curTx;
@Inject
@@ -30,8 +31,6 @@ public class PeerInfoService {
@Inject
RemoteTransaction remoteTx;
public static final JObjectKey TREE_KEY = JObjectKey.of("peers");
private JKleppmannTreeManager.JKleppmannTree getTreeW() {
return jKleppmannTreeManager.getTree(TREE_KEY);
}

View File

@@ -15,10 +15,6 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
_peerId = id.toJObjectKey();
}
public JObjectKey getPeerId() {
return _peerId;
}
public static JObjectKey peerIdToNodeId(PeerId id) {
return JObjectKey.of(id.toJObjectKey().name() + "_tree_node");
}
@@ -30,6 +26,10 @@ public class JKleppmannTreeNodeMetaPeer extends JKleppmannTreeNodeMeta {
return PeerId.of(id.name().substring(0, id.name().length() - "_tree_node".length()));
}
public JObjectKey getPeerId() {
return _peerId;
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
assert name.equals(peerIdToNodeId(PeerId.of(getPeerId().name())).toString());

View File

@@ -19,11 +19,10 @@ import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class PeerTrustManager implements X509TrustManager {
private final AtomicReference<X509TrustManager> trustManager = new AtomicReference<>();
@Inject
PeerInfoService peerInfoService;
private final AtomicReference<X509TrustManager> trustManager = new AtomicReference<>();
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
trustManager.get().checkClientTrusted(chain, authType);

View File

@@ -6,10 +6,10 @@ import org.apache.commons.lang3.mutable.MutableObject;
import java.lang.ref.Cleaner;
public class RefcountedCloseable<T extends AutoCloseable> {
private final T _closeable;
private int _refCount = 1;
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
private static final Cleaner CLEANER = Cleaner.create();
private final T _closeable;
private final MutableObject<Boolean> _closed = new MutableObject<>(false);
private int _refCount = 1;
public RefcountedCloseable(T closeable) {
_closeable = closeable;