diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml
index b4738b5f..efef968f 100644
--- a/.github/workflows/server.yml
+++ b/.github/workflows/server.yml
@@ -54,12 +54,12 @@ jobs:
# - name: Build with Maven
# run: cd dhfs-parent && mvn --batch-mode --update-snapshots package # -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
- - uses: actions/upload-artifact@v3
+ - uses: actions/upload-artifact@v4
with:
name: DHFS Server Package
path: dhfs-parent/server/target/quarkus-app
- - uses: actions/upload-artifact@v3
+ - uses: actions/upload-artifact@v4
if: ${{ always() }}
with:
name: Test logs
@@ -84,7 +84,7 @@ jobs:
- name: NPM Build
run: cd webui && npm run build
- - uses: actions/upload-artifact@v3
+ - uses: actions/upload-artifact@v4
with:
name: Webui
path: webui/dist
@@ -155,7 +155,7 @@ jobs:
CMAKE_ARGS="-DCMAKE_BUILD_TYPE=Release" libdhfs_support/builder/cross-build.sh both build "$(pwd)/result"
- name: Upload build
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: NativeLib-${{ matrix.os }}-${{ env.SANITIZED_DOCKER_PLATFORM }}
path: result
@@ -168,7 +168,7 @@ jobs:
uses: actions/checkout@v4
- name: Download artifacts
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
path: downloaded-libs
@@ -180,7 +180,7 @@ jobs:
test -f "result/Linux-x86_64/libdhfs_support.so" || exit 1
- name: Upload
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: NativeLibs
path: result
@@ -201,19 +201,19 @@ jobs:
uses: actions/checkout@v4
- name: Download server package
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
name: DHFS Server Package
path: dhfs-package-downloaded
- name: Download webui
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
name: Webui
path: webui-dist-downloaded
- name: Download native libs
- uses: actions/download-artifact@v3
+ uses: actions/download-artifact@v4
with:
name: NativeLibs
path: dhfs-native-downloaded
@@ -299,17 +299,17 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
- - uses: actions/download-artifact@v3
+ - uses: actions/download-artifact@v4
with:
name: DHFS Server Package
path: dhfs-package-downloaded
- - uses: actions/download-artifact@v3
+ - uses: actions/download-artifact@v4
with:
name: Webui
path: webui-dist-downloaded
- - uses: actions/download-artifact@v3
+ - uses: actions/download-artifact@v4
with:
name: NativeLibs
path: dhfs-native-downloaded
@@ -339,7 +339,7 @@ jobs:
run: tar -cvf ~/run-wrapper.tar.gz ./run-wrapper-out
- name: Upload
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
with:
name: Run wrapper
path: ~/run-wrapper.tar.gz
diff --git a/dhfs-parent/autoprotomap/deployment/pom.xml b/dhfs-parent/autoprotomap/deployment/pom.xml
index 29c02d7a..13f90a9d 100644
--- a/dhfs-parent/autoprotomap/deployment/pom.xml
+++ b/dhfs-parent/autoprotomap/deployment/pom.xml
@@ -34,11 +34,6 @@
org.apache.commons
commons-collections4
-
- org.projectlombok
- lombok
- provided
-
diff --git a/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/AutoprotomapProcessor.java b/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/AutoprotomapProcessor.java
index 3c3b0809..d3e574eb 100644
--- a/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/AutoprotomapProcessor.java
+++ b/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/AutoprotomapProcessor.java
@@ -68,11 +68,11 @@ class AutoprotomapProcessor {
}
} catch (Throwable e) {
StringBuilder sb = new StringBuilder();
- sb.append(e.toString() + "\n");
+ sb.append(e + "\n");
for (var el : e.getStackTrace()) {
sb.append(el.toString() + "\n");
}
- System.out.println(sb.toString());
+ System.out.println(sb);
}
}
}
diff --git a/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/ProtoSerializerGenerator.java b/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/ProtoSerializerGenerator.java
index 386f79f1..6ed94f3a 100644
--- a/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/ProtoSerializerGenerator.java
+++ b/dhfs-parent/autoprotomap/deployment/src/main/java/com/usatiuk/autoprotomap/deployment/ProtoSerializerGenerator.java
@@ -14,6 +14,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -61,7 +62,7 @@ public class ProtoSerializerGenerator {
visitor.accept(cur);
var next = cur.superClassType().name();
- if (next.equals(DotName.OBJECT_NAME)) break;
+ if (next.equals(DotName.OBJECT_NAME) || next.equals(DotName.RECORD_NAME)) break;
cur = index.getClassByName(next);
}
}
@@ -82,6 +83,10 @@ public class ProtoSerializerGenerator {
var objectClass = index.getClassByName(objectType.name().toString());
+ Function getterGetter = objectClass.isRecord()
+ ? Function.identity()
+ : s -> "get" + capitalize(stripPrefix(s, FIELD_PREFIX));
+
for (var f : findAllFields(index, objectClass)) {
var consideredFieldName = stripPrefix(f.name(), FIELD_PREFIX);
@@ -89,7 +94,7 @@ public class ProtoSerializerGenerator {
if ((f.flags() & Opcodes.ACC_PUBLIC) != 0)
return bytecodeCreator.readInstanceField(f, object);
else {
- var fieldGetter = "get" + capitalize(stripPrefix(f.name(), FIELD_PREFIX));
+ var fieldGetter = getterGetter.apply(f.name());
return bytecodeCreator.invokeVirtualMethod(
MethodDescriptor.ofMethod(objectType.toString(), fieldGetter, f.type().name().toString()), object);
}
diff --git a/dhfs-parent/autoprotomap/integration-tests/pom.xml b/dhfs-parent/autoprotomap/integration-tests/pom.xml
index 88e789ca..1af18935 100644
--- a/dhfs-parent/autoprotomap/integration-tests/pom.xml
+++ b/dhfs-parent/autoprotomap/integration-tests/pom.xml
@@ -22,10 +22,6 @@
lombok
provided
-
- io.quarkus
- quarkus-resteasy-reactive
-
com.usatiuk
autoprotomap
@@ -41,11 +37,6 @@
quarkus-junit5
test
-
- io.rest-assured
- rest-assured
- test
-
io.quarkus
quarkus-grpc
diff --git a/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/AutoprotomapResource.java b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/AutoprotomapResource.java
deleted file mode 100644
index a56a2f81..00000000
--- a/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/AutoprotomapResource.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.usatiuk.autoprotomap.it;
-
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.ws.rs.GET;
-import jakarta.ws.rs.Path;
-
-@Path("/autoprotomap")
-@ApplicationScoped
-public class AutoprotomapResource {
- // add some rest methods here
-
- @GET
- public String hello() {
- return "Hello autoprotomap";
- }
-}
diff --git a/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/InterfaceObject.java b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/InterfaceObject.java
new file mode 100644
index 00000000..7b06b316
--- /dev/null
+++ b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/InterfaceObject.java
@@ -0,0 +1,8 @@
+package com.usatiuk.autoprotomap.it;
+
+import com.usatiuk.autoprotomap.runtime.ProtoMirror;
+
+@ProtoMirror(InterfaceObjectProto.class)
+public interface InterfaceObject {
+ String key();
+}
diff --git a/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/RecordObject.java b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/RecordObject.java
new file mode 100644
index 00000000..b314ca9a
--- /dev/null
+++ b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/RecordObject.java
@@ -0,0 +1,7 @@
+package com.usatiuk.autoprotomap.it;
+
+import com.usatiuk.autoprotomap.runtime.ProtoMirror;
+
+@ProtoMirror(RecordObjectProto.class)
+public record RecordObject(String key) implements InterfaceObject {
+}
diff --git a/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/RecordObject2.java b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/RecordObject2.java
new file mode 100644
index 00000000..4c66dfc3
--- /dev/null
+++ b/dhfs-parent/autoprotomap/integration-tests/src/main/java/com/usatiuk/autoprotomap/it/RecordObject2.java
@@ -0,0 +1,7 @@
+package com.usatiuk.autoprotomap.it;
+
+import com.usatiuk.autoprotomap.runtime.ProtoMirror;
+
+@ProtoMirror(RecordObject2Proto.class)
+public record RecordObject2(String key, int value) implements InterfaceObject {
+}
diff --git a/dhfs-parent/autoprotomap/integration-tests/src/main/proto/autoprotomap_test.proto b/dhfs-parent/autoprotomap/integration-tests/src/main/proto/autoprotomap_test.proto
index f606b3b4..c60bcec7 100644
--- a/dhfs-parent/autoprotomap/integration-tests/src/main/proto/autoprotomap_test.proto
+++ b/dhfs-parent/autoprotomap/integration-tests/src/main/proto/autoprotomap_test.proto
@@ -28,4 +28,20 @@ message AbstractProto {
SimpleObjectProto simpleObject = 2;
CustomObjectProto customObject = 3;
}
+}
+
+message RecordObjectProto {
+ string key = 1;
+}
+
+message RecordObject2Proto {
+ string key = 1;
+ int32 value = 2;
+}
+
+message InterfaceObjectProto {
+ oneof obj {
+ RecordObjectProto recordObject = 1;
+ RecordObject2Proto recordObject2 = 2;
+ }
}
\ No newline at end of file
diff --git a/dhfs-parent/autoprotomap/integration-tests/src/test/java/com/usatiuk/autoprotomap/it/AutoprotomapResourceTest.java b/dhfs-parent/autoprotomap/integration-tests/src/test/java/com/usatiuk/autoprotomap/it/AutoprotomapResourceTest.java
index 2d02ffd3..36f63bf6 100644
--- a/dhfs-parent/autoprotomap/integration-tests/src/test/java/com/usatiuk/autoprotomap/it/AutoprotomapResourceTest.java
+++ b/dhfs-parent/autoprotomap/integration-tests/src/test/java/com/usatiuk/autoprotomap/it/AutoprotomapResourceTest.java
@@ -16,6 +16,8 @@ public class AutoprotomapResourceTest {
ProtoSerializer nestedProtoSerializer;
@Inject
ProtoSerializer abstractProtoSerializer;
+ @Inject
+ ProtoSerializer interfaceProtoSerializer;
@Test
public void testSimple() {
@@ -74,7 +76,7 @@ public class AutoprotomapResourceTest {
}
@Test
- public void tesAbstractNested() {
+ public void testAbstractNested() {
var ret = abstractProtoSerializer.serialize(
new NestedObject(
new SimpleObject(333, "nested so", ByteString.copyFrom(new byte[]{1, 2, 3})),
@@ -93,4 +95,19 @@ public class AutoprotomapResourceTest {
Assertions.assertEquals("nested obj", des.get_nestedName());
Assertions.assertEquals(ByteString.copyFrom(new byte[]{4, 5, 6}), des.get_nestedSomeBytes());
}
+
+ @Test
+ public void testInterface() {
+ var ret = interfaceProtoSerializer.serialize(new RecordObject("record test"));
+ Assertions.assertEquals("record test", ret.getRecordObject().getKey());
+ var des = (RecordObject) interfaceProtoSerializer.deserialize(ret);
+ Assertions.assertEquals("record test", des.key());
+
+ var ret2 = interfaceProtoSerializer.serialize(new RecordObject2("record test 2", 1234));
+ Assertions.assertEquals("record test 2", ret2.getRecordObject2().getKey());
+ Assertions.assertEquals(1234, ret2.getRecordObject2().getValue());
+ var des2 = (RecordObject2) interfaceProtoSerializer.deserialize(ret2);
+ Assertions.assertEquals("record test 2", des2.key());
+ Assertions.assertEquals(1234, des2.value());
+ }
}
diff --git a/dhfs-parent/kleppmanntree/pom.xml b/dhfs-parent/kleppmanntree/pom.xml
index e3d133cd..077abfd1 100644
--- a/dhfs-parent/kleppmanntree/pom.xml
+++ b/dhfs-parent/kleppmanntree/pom.xml
@@ -13,19 +13,22 @@
kleppmanntree
-
- org.projectlombok
- lombok
- provided
-
org.junit.jupiter
junit-jupiter-engine
test
+
+ org.apache.commons
+ commons-collections4
+
org.apache.commons
commons-lang3
+
+ org.pcollections
+ pcollections
+
\ No newline at end of file
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/AtomicClock.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/AtomicClock.java
index 32e9b89e..f524473a 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/AtomicClock.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/AtomicClock.java
@@ -18,11 +18,6 @@ public class AtomicClock implements Clock, Serializable {
_max = timestamp;
}
- // FIXME:
- public void ungetTimestamp() {
- --_max;
- }
-
@Override
public Long peekTimestamp() {
return _max;
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java
index 42fb5de1..a84091a8 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/KleppmannTree.java
@@ -8,15 +8,16 @@ import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class KleppmannTree, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT, WrapperT extends TreeNodeWrapper> {
+public class KleppmannTree, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT> {
private static final Logger LOGGER = Logger.getLogger(KleppmannTree.class.getName());
- private final StorageInterface _storage;
+
+ private final StorageInterface _storage;
private final PeerInterface _peers;
private final Clock _clock;
private final OpRecorder _opRecorder;
- private HashMap _undoCtx = null;
+ private HashMap> _undoCtx = null;
- public KleppmannTree(StorageInterface storage,
+ public KleppmannTree(StorageInterface storage,
PeerInterface peers,
Clock clock,
OpRecorder opRecorder) {
@@ -30,13 +31,8 @@ public class KleppmannTree, PeerIdT ex
if (names.isEmpty()) return fromId;
var from = _storage.getById(fromId);
- from.rLock();
NodeIdT childId;
- try {
- childId = from.getNode().getChildren().get(names.getFirst());
- } finally {
- from.rUnlock();
- }
+ childId = from.children().get(names.getFirst());
if (childId == null)
return null;
@@ -45,69 +41,58 @@ public class KleppmannTree, PeerIdT ex
}
public NodeIdT traverse(NodeIdT fromId, List names) {
- _storage.rLock();
- try {
- return traverseImpl(fromId, names.subList(1, names.size()));
- } finally {
- _storage.rUnlock();
- }
+ return traverseImpl(fromId, names.subList(1, names.size()));
}
public NodeIdT traverse(List names) {
- _storage.rLock();
- try {
- return traverseImpl(_storage.getRootId(), names);
- } finally {
- _storage.rUnlock();
- }
+ return traverseImpl(_storage.getRootId(), names);
}
private void undoEffect(LogEffect effect) {
- _storage.assertRwLock();
if (effect.oldInfo() != null) {
var node = _storage.getById(effect.childId());
- var oldParent = _storage.getById(effect.oldInfo().oldParent());
var curParent = _storage.getById(effect.newParentId());
- curParent.rwLock();
- oldParent.rwLock();
- node.rwLock();
- try {
- curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
- if (!node.getNode().getMeta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
- throw new IllegalArgumentException("Class mismatch for meta for node " + node.getNode().getId());
- node.getNode().setMeta(effect.oldInfo().oldMeta());
- node.getNode().setParent(oldParent.getNode().getId());
- oldParent.getNode().getChildren().put(node.getNode().getMeta().getName(), node.getNode().getId());
- node.notifyRmRef(curParent.getNode().getId());
- node.notifyRef(oldParent.getNode().getId());
- node.getNode().setLastEffectiveOp(effect.oldInfo().oldEffectiveMove());
- } finally {
- node.rwUnlock();
- oldParent.rwUnlock();
- curParent.rwUnlock();
+ {
+ var newCurParentChildren = curParent.children().minus(node.meta().getName());
+ curParent = curParent.withChildren(newCurParentChildren);
+ _storage.putNode(curParent);
}
+
+ if (!node.meta().getClass().equals(effect.oldInfo().oldMeta().getClass()))
+ throw new IllegalArgumentException("Class mismatch for meta for node " + node.key());
+
+ // Needs to be read after changing curParent, as it might be the same node
+ var oldParent = _storage.getById(effect.oldInfo().oldParent());
+ {
+ var newOldParentChildren = oldParent.children().plus(node.meta().getName(), node.key());
+ oldParent = oldParent.withChildren(newOldParentChildren);
+ _storage.putNode(oldParent);
+ }
+ _storage.putNode(
+ node.withMeta(effect.oldInfo().oldMeta())
+ .withParent(effect.oldInfo().oldParent())
+ .withLastEffectiveOp(effect.oldInfo().oldEffectiveMove())
+ );
} else {
var node = _storage.getById(effect.childId());
var curParent = _storage.getById(effect.newParentId());
- curParent.rwLock();
- node.rwLock();
- try {
- curParent.getNode().getChildren().remove(node.getNode().getMeta().getName());
- node.freeze();
- node.getNode().setParent(null);
- node.getNode().setLastEffectiveOp(null);
- node.notifyRmRef(curParent.getNode().getId());
- _undoCtx.put(node.getNode().getId(), node);
- } finally {
- node.rwUnlock();
- curParent.rwUnlock();
+ {
+ var newCurParentChildren = curParent.children().minus(node.meta().getName());
+ curParent = curParent.withChildren(newCurParentChildren);
+ _storage.putNode(curParent);
}
+ _storage.putNode(
+ node.withParent(null)
+ .withLastEffectiveOp(null)
+ );
+ _undoCtx.put(node.key(), node);
}
}
private void undoOp(LogRecord op) {
- for (var e : op.effects().reversed())
- undoEffect(e);
+ if (op.effects() != null)
+ for (var e : op.effects().reversed())
+ undoEffect(e);
}
private void redoOp(Map.Entry, LogRecord> entry) {
@@ -116,7 +101,6 @@ public class KleppmannTree, PeerIdT ex
}
private void doAndPut(OpMove op, boolean failCreatingIfExists) {
- _storage.assertRwLock();
var res = doOp(op, failCreatingIfExists);
_storage.getLog().put(res.op().timestamp(), res);
}
@@ -160,22 +144,15 @@ public class KleppmannTree, PeerIdT ex
}
if (!inTrash.isEmpty()) {
var trash = _storage.getById(_storage.getTrashId());
- trash.rwLock();
- try {
- for (var n : inTrash) {
- var node = _storage.getById(n);
- node.rwLock();
- try {
- if (trash.getNode().getChildren().remove(n.toString()) == null)
- LOGGER.severe("Node " + node.getNode().getId() + " not found in trash but should be there");
- node.notifyRmRef(trash.getNode().getId());
- } finally {
- node.rwUnlock();
- }
- _storage.removeNode(n);
+ for (var n : inTrash) {
+ var node = _storage.getById(n);
+ {
+ if (!trash.children().containsKey(n.toString()))
+ LOGGER.severe("Node " + node.key() + " not found in trash but should be there");
+ trash = trash.withChildren(trash.children().minus(n.toString()));
+ _storage.putNode(trash);
}
- } finally {
- trash.rwUnlock();
+ _storage.removeNode(n);
}
}
} else {
@@ -188,29 +165,18 @@ public class KleppmannTree, PeerIdT ex
}
public void move(NodeIdT newParent, MetaT newMeta, NodeIdT child, boolean failCreatingIfExists) {
- _storage.rwLock();
- try {
- var createdMove = createMove(newParent, newMeta, child);
- _opRecorder.recordOp(createdMove);
- applyOp(_peers.getSelfId(), createdMove, failCreatingIfExists);
- } finally {
- _storage.rwUnlock();
- }
+ var createdMove = createMove(newParent, newMeta, child);
+ _opRecorder.recordOp(createdMove);
+ applyOp(_peers.getSelfId(), createdMove, failCreatingIfExists);
}
public void applyExternalOp(PeerIdT from, OpMove op) {
- _storage.rwLock();
- try {
- _clock.updateTimestamp(op.timestamp().timestamp());
- applyOp(from, op, false);
- } finally {
- _storage.rwUnlock();
- }
+ _clock.updateTimestamp(op.timestamp().timestamp());
+ applyOp(from, op, false);
}
// Returns true if the timestamp is newer than what's seen, false otherwise
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
- _storage.assertRwLock();
TimestampT oldRef = _storage.getPeerTimestampLog().getForPeer(from);
if (oldRef != null && oldRef.compareTo(newTimestamp) > 0) { // FIXME?
LOGGER.warning("Wrong op order: received older than known from " + from.toString());
@@ -221,31 +187,18 @@ public class KleppmannTree, PeerIdT ex
}
public boolean updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
- _storage.rLock();
- try {
- // TODO: Ideally no point in this separate locking?
- var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
- var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
- if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
- && (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
- } finally {
- _storage.rUnlock();
- }
- _storage.rwLock();
- try {
- updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
- updateTimestampImpl(from, timestamp);
- tryTrimLog();
- } finally {
- _storage.rwUnlock();
- }
-
+ // TODO: Ideally no point in this separate locking?
+ var gotExt = _storage.getPeerTimestampLog().getForPeer(from);
+ var gotSelf = _storage.getPeerTimestampLog().getForPeer(_peers.getSelfId());
+ if ((gotExt != null && gotExt.compareTo(timestamp) >= 0)
+ && (gotSelf != null && gotSelf.compareTo(_clock.peekTimestamp()) >= 0)) return false;
+ updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
+ updateTimestampImpl(from, timestamp);
+ tryTrimLog();
return true;
}
private void applyOp(PeerIdT from, OpMove op, boolean failCreatingIfExists) {
- _storage.assertRwLock();
-
if (!updateTimestampImpl(from, op.timestamp().timestamp())) return;
var log = _storage.getLog();
@@ -276,7 +229,6 @@ public class KleppmannTree, PeerIdT ex
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
- e.getValue().unfreeze();
_storage.removeNode(e.getKey());
}
}
@@ -292,12 +244,10 @@ public class KleppmannTree, PeerIdT ex
}
private CombinedTimestamp getTimestamp() {
- _storage.assertRwLock();
return new CombinedTimestamp<>(_clock.getTimestamp(), _peers.getSelfId());
}
private OpMove createMove(NodeIdT newParent, LocalMetaT newMeta, NodeIdT node) {
- _storage.assertRwLock();
return new OpMove<>(getTimestamp(), newParent, newMeta, node);
}
@@ -317,91 +267,73 @@ public class KleppmannTree, PeerIdT ex
return computed;
}
- private WrapperT getNewNode(TreeNode desired) {
- _storage.assertRwLock();
+ private TreeNode getNewNode(NodeIdT key, NodeIdT parent, MetaT meta) {
if (_undoCtx != null) {
- var node = _undoCtx.get(desired.getId());
+ var node = _undoCtx.get(key);
if (node != null) {
- node.rwLock();
try {
- if (!node.getNode().getChildren().isEmpty()) {
- LOGGER.log(Level.WARNING, "Not empty children for undone node " + desired.getId());
+ if (!node.children().isEmpty()) {
+ LOGGER.log(Level.WARNING, "Not empty children for undone node " + key);
}
- node.getNode().setParent(desired.getParent());
- node.notifyRef(desired.getParent());
- node.getNode().setMeta(desired.getMeta());
- node.unfreeze();
+ node = node.withParent(parent).withMeta(meta);
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Error while fixing up node " + desired.getId(), e);
- node.rwUnlock();
+ LOGGER.log(Level.SEVERE, "Error while fixing up node " + key, e);
node = null;
}
}
if (node != null) {
- _undoCtx.remove(desired.getId());
+ _undoCtx.remove(key);
return node;
}
}
- return _storage.createNewNode(desired);
+ return _storage.createNewNode(key, parent, meta);
}
private void applyEffects(OpMove sourceOp, List> effects) {
- _storage.assertRwLock();
for (var effect : effects) {
- WrapperT oldParentNode = null;
- WrapperT newParentNode;
- WrapperT node;
+ TreeNode oldParentNode = null;
+ TreeNode newParentNode;
+ TreeNode node;
- newParentNode = _storage.getById(effect.newParentId());
- newParentNode.rwLock();
- try {
- if (effect.oldInfo() != null) {
- oldParentNode = _storage.getById(effect.oldInfo().oldParent());
- oldParentNode.rwLock();
- }
- try {
- if (oldParentNode == null) {
- node = getNewNode(new TreeNode<>(effect.childId(), effect.newParentId(), effect.newMeta()));
- } else {
- node = _storage.getById(effect.childId());
- node.rwLock();
- }
- try {
-
- if (oldParentNode != null) {
- oldParentNode.getNode().getChildren().remove(effect.oldInfo().oldMeta().getName());
- node.notifyRmRef(effect.oldInfo().oldParent());
- }
-
- newParentNode.getNode().getChildren().put(effect.newMeta().getName(), effect.childId());
- if (effect.newParentId().equals(_storage.getTrashId()) &&
- !Objects.equals(effect.newMeta().getName(), effect.childId()))
- throw new IllegalArgumentException("Move to trash should have id of node as name");
- node.getNode().setParent(effect.newParentId());
- node.getNode().setMeta(effect.newMeta());
- node.getNode().setLastEffectiveOp(effect.effectiveOp());
- node.notifyRef(effect.newParentId());
-
- } finally {
- node.rwUnlock();
- }
- } finally {
- if (oldParentNode != null)
- oldParentNode.rwUnlock();
- }
- } finally {
- newParentNode.rwUnlock();
+ if (effect.oldInfo() != null) {
+ oldParentNode = _storage.getById(effect.oldInfo().oldParent());
}
+ if (oldParentNode == null) {
+ node = getNewNode(effect.childId(), effect.newParentId(), effect.newMeta());
+ } else {
+ node = _storage.getById(effect.childId());
+ }
+ if (oldParentNode != null) {
+ var newOldParentChildren = oldParentNode.children().minus(effect.oldInfo().oldMeta().getName());
+ oldParentNode = oldParentNode.withChildren(newOldParentChildren);
+ _storage.putNode(oldParentNode);
+ }
+
+ // Needs to be read after changing oldParentNode, as it might be the same node
+ newParentNode = _storage.getById(effect.newParentId());
+
+ {
+ var newNewParentChildren = newParentNode.children().plus(effect.newMeta().getName(), effect.childId());
+ newParentNode = newParentNode.withChildren(newNewParentChildren);
+ _storage.putNode(newParentNode);
+ }
+ if (effect.newParentId().equals(_storage.getTrashId()) &&
+ !Objects.equals(effect.newMeta().getName(), effect.childId().toString()))
+ throw new IllegalArgumentException("Move to trash should have id of node as name");
+ _storage.putNode(
+ node.withParent(effect.newParentId())
+ .withMeta(effect.newMeta())
+ .withLastEffectiveOp(sourceOp)
+ );
}
}
private LogRecord computeEffects(OpMove op, boolean failCreatingIfExists) {
- _storage.assertRwLock();
var node = _storage.getById(op.childId());
- NodeIdT oldParentId = (node != null && node.getNode().getParent() != null) ? node.getNode().getParent() : null;
+ NodeIdT oldParentId = (node != null && node.parent() != null) ? node.parent() : null;
NodeIdT newParentId = op.newParentId();
- WrapperT newParent = _storage.getById(newParentId);
+ TreeNode newParent = _storage.getById(newParentId);
if (newParent == null) {
LOGGER.log(Level.SEVERE, "New parent not found " + op.newMeta().getName() + " " + op.childId());
@@ -409,34 +341,29 @@ public class KleppmannTree, PeerIdT ex
}
if (oldParentId == null) {
- newParent.rLock();
- try {
- var conflictNodeId = newParent.getNode().getChildren().get(op.newMeta().getName());
+ var conflictNodeId = newParent.children().get(op.newMeta().getName());
- if (conflictNodeId != null) {
- if (failCreatingIfExists)
- throw new AlreadyExistsException("Already exists: " + op.newMeta().getName() + ": " + conflictNodeId);
+ if (conflictNodeId != null) {
+ if (failCreatingIfExists)
+ throw new AlreadyExistsException("Already exists: " + op.newMeta().getName() + ": " + conflictNodeId);
- var conflictNode = _storage.getById(conflictNodeId);
- conflictNode.rLock();
- try {
- MetaT conflictNodeMeta = conflictNode.getNode().getMeta();
- String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.getNode().getId();
- String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
- return new LogRecord<>(op, List.of(
- new LogEffect<>(new LogEffectOld<>(conflictNode.getNode().getLastEffectiveOp(), newParentId, conflictNodeMeta), conflictNode.getNode().getLastEffectiveOp(), newParentId, (MetaT) conflictNodeMeta.withName(newConflictNodeName), conflictNodeId),
- new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
- ));
- } finally {
- conflictNode.rUnlock();
- }
- } else {
- return new LogRecord<>(op, List.of(
- new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
- ));
+ var conflictNode = _storage.getById(conflictNodeId);
+ MetaT conflictNodeMeta = conflictNode.meta();
+
+ if (Objects.equals(conflictNodeMeta, op.newMeta())) {
+ return new LogRecord<>(op, null);
}
- } finally {
- newParent.rUnlock();
+
+ String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.key();
+ String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
+ return new LogRecord<>(op, List.of(
+ new LogEffect<>(new LogEffectOld<>(conflictNode.lastEffectiveOp(), newParentId, conflictNodeMeta), conflictNode.lastEffectiveOp(), newParentId, (MetaT) conflictNodeMeta.withName(newConflictNodeName), conflictNodeId),
+ new LogEffect<>(null, op, op.newParentId(), (MetaT) op.newMeta().withName(newOursName), op.childId())
+ ));
+ } else {
+ return new LogRecord<>(op, List.of(
+ new LogEffect<>(null, op, newParentId, op.newMeta(), op.childId())
+ ));
}
}
@@ -444,96 +371,69 @@ public class KleppmannTree, PeerIdT ex
return new LogRecord<>(op, null);
}
- node.rLock();
- newParent.rLock();
- try {
- MetaT oldMeta = node.getNode().getMeta();
- if (!oldMeta.getClass().equals(op.newMeta().getClass())) {
- LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.getNode().getId());
+ MetaT oldMeta = node.meta();
+ if (!oldMeta.getClass().equals(op.newMeta().getClass())) {
+ LOGGER.log(Level.SEVERE, "Class mismatch for meta for node " + node.key());
+ return new LogRecord<>(op, null);
+ }
+ var replaceNodeId = newParent.children().get(op.newMeta().getName());
+ if (replaceNodeId != null) {
+ var replaceNode = _storage.getById(replaceNodeId);
+ var replaceNodeMeta = replaceNode.meta();
+
+ if (Objects.equals(replaceNodeMeta, op.newMeta())) {
return new LogRecord<>(op, null);
}
- var replaceNodeId = newParent.getNode().getChildren().get(op.newMeta().getName());
- if (replaceNodeId != null) {
- var replaceNode = _storage.getById(replaceNodeId);
- try {
- replaceNode.rLock();
- var replaceNodeMeta = replaceNode.getNode().getMeta();
- return new LogRecord<>(op, List.of(
- new LogEffect<>(new LogEffectOld<>(replaceNode.getNode().getLastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.getNode().getLastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
- new LogEffect<>(new LogEffectOld<>(node.getNode().getLastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
- ));
- } finally {
- replaceNode.rUnlock();
- }
- }
+
return new LogRecord<>(op, List.of(
- new LogEffect<>(new LogEffectOld<>(node.getNode().getLastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
+ new LogEffect<>(new LogEffectOld<>(replaceNode.lastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.lastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
+ new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
));
- } finally {
- newParent.rUnlock();
- node.rUnlock();
}
+ return new LogRecord<>(op, List.of(
+ new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())
+ ));
}
private boolean isAncestor(NodeIdT child, NodeIdT parent) {
var node = _storage.getById(parent);
NodeIdT curParent;
- while ((curParent = node.getNode().getParent()) != null) {
+ while ((curParent = node.parent()) != null) {
if (Objects.equals(child, curParent)) return true;
node = _storage.getById(curParent);
}
return false;
}
- public void walkTree(Consumer consumer) {
- _storage.rLock();
- try {
- ArrayDeque queue = new ArrayDeque<>();
- queue.push(_storage.getRootId());
+ public void walkTree(Consumer> consumer) {
+ ArrayDeque queue = new ArrayDeque<>();
+ queue.push(_storage.getRootId());
- while (!queue.isEmpty()) {
- var id = queue.pop();
- var node = _storage.getById(id);
- if (node == null) continue;
- node.rLock();
- try {
- queue.addAll(node.getNode().getChildren().values());
- consumer.accept(node);
- } finally {
- node.rUnlock();
- }
- }
- } finally {
- _storage.rUnlock();
+ while (!queue.isEmpty()) {
+ var id = queue.pop();
+ var node = _storage.getById(id);
+ if (node == null) continue;
+ queue.addAll(node.children().values());
+ consumer.accept(node);
}
}
- public Pair findParent(Function kidPredicate) {
- _storage.rLock();
- try {
- ArrayDeque queue = new ArrayDeque<>();
- queue.push(_storage.getRootId());
+ public Pair findParent(Function, Boolean> kidPredicate) {
+ ArrayDeque queue = new ArrayDeque<>();
+ queue.push(_storage.getRootId());
- while (!queue.isEmpty()) {
- var id = queue.pop();
- var node = _storage.getById(id);
- if (node == null) continue;
- node.rLock();
- try {
- var children = node.getNode().getChildren();
- for (var childEntry : children.entrySet()) {
- var child = _storage.getById(childEntry.getValue());
- if (kidPredicate.apply(child)) {
- return Pair.of(childEntry.getKey(), node.getNode().getId());
- }
- }
- queue.addAll(children.values());
- } finally {
- node.rUnlock();
+ while (!queue.isEmpty()) {
+ var id = queue.pop();
+ var node = _storage.getById(id);
+ if (node == null) continue;
+ var children = node.children();
+ for (var childEntry : children.entrySet()) {
+ var child = _storage.getById(childEntry.getValue());
+ if (kidPredicate.apply(child)) {
+ return Pair.of(childEntry.getKey(), node.key());
}
}
- } finally {
- _storage.rUnlock();
+ queue.addAll(children.values());
}
return null;
}
@@ -541,27 +441,22 @@ public class KleppmannTree, PeerIdT ex
public void recordBoostrapFor(PeerIdT host) {
TreeMap, OpMove> result = new TreeMap<>();
- _storage.rwLock();
- try {
- walkTree(node -> {
- var op = node.getNode().getLastEffectiveOp();
- if (node.getNode().getLastEffectiveOp() == null) return;
- LOGGER.info("visited bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
- result.put(node.getNode().getLastEffectiveOp().timestamp(), node.getNode().getLastEffectiveOp());
- });
+ walkTree(node -> {
+ var op = node.lastEffectiveOp();
+ if (node.lastEffectiveOp() == null) return;
+ LOGGER.info("visited bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
+ result.put(node.lastEffectiveOp().timestamp(), node.lastEffectiveOp());
+ });
- for (var le : _storage.getLog().getAll()) {
- var op = le.getValue().op();
- LOGGER.info("bootstrap op from log for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
- result.put(le.getKey(), le.getValue().op());
- }
+ for (var le : _storage.getLog().getAll()) {
+ var op = le.getValue().op();
+ LOGGER.info("bootstrap op from log for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
+ result.put(le.getKey(), le.getValue().op());
+ }
- for (var op : result.values()) {
- LOGGER.info("Recording bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
- _opRecorder.recordOpForPeer(host, op);
- }
- } finally {
- _storage.rwUnlock();
+ for (var op : result.values()) {
+ LOGGER.info("Recording bootstrap op for " + host + ": " + op.timestamp().toString() + " " + op.newMeta().getName() + " " + op.childId() + "->" + op.newParentId());
+ _opRecorder.recordOpForPeer(host, op);
}
}
}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffect.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffect.java
index 5cd564b7..0fe9a95f 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffect.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffect.java
@@ -1,9 +1,11 @@
package com.usatiuk.kleppmanntree;
+import java.io.Serializable;
+
public record LogEffect, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT>(
LogEffectOld oldInfo,
OpMove effectiveOp,
NodeIdT newParentId,
MetaT newMeta,
- NodeIdT childId) {
+ NodeIdT childId) implements Serializable {
}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffectOld.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffectOld.java
index ec3f2662..c1c0a477 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffectOld.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogEffectOld.java
@@ -1,6 +1,9 @@
package com.usatiuk.kleppmanntree;
+import java.io.Serializable;
+
public record LogEffectOld, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT>
(OpMove oldEffectiveMove,
NodeIdT oldParent,
- MetaT oldMeta) {}
+ MetaT oldMeta) implements Serializable {
+}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogRecord.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogRecord.java
index b9a7b9da..2fb036c4 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogRecord.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/LogRecord.java
@@ -1,7 +1,9 @@
package com.usatiuk.kleppmanntree;
+import java.io.Serializable;
import java.util.List;
public record LogRecord, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT>
(OpMove op,
- List> effects) {}
+ List> effects) implements Serializable {
+}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/OpMove.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/OpMove.java
index e9c19562..85b7f383 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/OpMove.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/OpMove.java
@@ -1,5 +1,8 @@
package com.usatiuk.kleppmanntree;
+import java.io.Serializable;
+
public record OpMove, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT>
(CombinedTimestamp timestamp, NodeIdT newParentId, MetaT newMeta,
- NodeIdT childId) {}
+ NodeIdT childId) implements Serializable {
+}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/StorageInterface.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/StorageInterface.java
index 69467386..af55b35b 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/StorageInterface.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/StorageInterface.java
@@ -4,32 +4,23 @@ public interface StorageInterface<
TimestampT extends Comparable,
PeerIdT extends Comparable,
MetaT extends NodeMeta,
- NodeIdT,
- WrapperT extends TreeNodeWrapper> {
+ NodeIdT> {
NodeIdT getRootId();
NodeIdT getTrashId();
NodeIdT getNewNodeId();
- WrapperT getById(NodeIdT id);
+ TreeNode getById(NodeIdT id);
// Creates a node, returned wrapper is RW-locked
- WrapperT createNewNode(TreeNode node);
+ TreeNode createNewNode(NodeIdT key, NodeIdT parent, MetaT meta);
+
+ void putNode(TreeNode node);
void removeNode(NodeIdT id);
LogInterface getLog();
PeerTimestampLogInterface getPeerTimestampLog();
-
- void rLock();
-
- void rUnlock();
-
- void rwLock();
-
- void rwUnlock();
-
- void assertRwLock();
}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNode.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNode.java
index a2b1577f..f490bb9e 100644
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNode.java
+++ b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNode.java
@@ -1,31 +1,26 @@
package com.usatiuk.kleppmanntree;
-import lombok.Getter;
-import lombok.Setter;
+import org.pcollections.PMap;
-import java.util.HashMap;
+import java.io.Serializable;
import java.util.Map;
-@Getter
-@Setter
-public class TreeNode, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT> {
- private final NodeIdT _id;
- private NodeIdT _parent = null;
- private OpMove _lastEffectiveOp = null;
- private MetaT _meta = null;
- private Map _children = new HashMap<>();
+public interface TreeNode, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT> extends Serializable {
+ NodeIdT key();
- public TreeNode(NodeIdT id, NodeIdT parent, MetaT meta) {
- _id = id;
- _meta = meta;
- _parent = parent;
- }
+ NodeIdT parent();
- public TreeNode(NodeIdT id, NodeIdT parent, MetaT meta, Map children) {
- _id = id;
- _meta = meta;
- _parent = parent;
- _children = children;
- }
+ OpMove lastEffectiveOp();
+ MetaT meta();
+
+ PMap children();
+
+ TreeNode withParent(NodeIdT parent);
+
+ TreeNode withLastEffectiveOp(OpMove lastEffectiveOp);
+
+ TreeNode withMeta(MetaT meta);
+
+ TreeNode withChildren(PMap children);
}
diff --git a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNodeWrapper.java b/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNodeWrapper.java
deleted file mode 100644
index 57869231..00000000
--- a/dhfs-parent/kleppmanntree/src/main/java/com/usatiuk/kleppmanntree/TreeNodeWrapper.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.usatiuk.kleppmanntree;
-
-public interface TreeNodeWrapper, PeerIdT extends Comparable, MetaT extends NodeMeta, NodeIdT> {
- void rLock();
-
- void rUnlock();
-
- void rwLock();
-
- void rwUnlock();
-
- void freeze();
-
- void unfreeze();
-
- void notifyRef(NodeIdT id);
-
- void notifyRmRef(NodeIdT id);
-
- TreeNode getNode();
-}
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/KleppmanTreeSimpleTest.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/KleppmanTreeSimpleTest.java
index e95ce17a..dfe99ebd 100644
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/KleppmanTreeSimpleTest.java
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/KleppmanTreeSimpleTest.java
@@ -32,8 +32,8 @@ public class KleppmanTreeSimpleTest {
Assertions.assertEquals(d1id, testNode2._tree.traverse(List.of("Test1")));
Assertions.assertEquals(d2id, testNode2._tree.traverse(List.of("Test2")));
- Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
- Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
+ Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
+ Assertions.assertIterableEquals(List.of("Test1", "Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
var f1id = testNode1._storageInterface.getNewNodeId();
@@ -54,10 +54,10 @@ public class KleppmanTreeSimpleTest {
testNode1._tree.move(d1id, new TestNodeMetaDir("Test2"), d2id);
Assertions.assertEquals(d1id, testNode1._tree.traverse(List.of("Test1")));
Assertions.assertEquals(d2id, testNode1._tree.traverse(List.of("Test1", "Test2")));
- Assertions.assertIterableEquals(List.of("Test1"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
+ Assertions.assertIterableEquals(List.of("Test1"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
testNode2._tree.move(d2id, new TestNodeMetaDir("Test1"), d1id);
- Assertions.assertIterableEquals(List.of("Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
+ Assertions.assertIterableEquals(List.of("Test2"), testNode2._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
Assertions.assertEquals(d2id, testNode2._tree.traverse(List.of("Test2")));
Assertions.assertEquals(d1id, testNode2._tree.traverse(List.of("Test2", "Test1")));
@@ -72,8 +72,8 @@ public class KleppmanTreeSimpleTest {
}
// Second node wins as it has smaller timestamp
- Assertions.assertIterableEquals(List.of("Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).getNode().getChildren().keySet());
- Assertions.assertIterableEquals(List.of("Test1", "TestFile"), testNode1._storageInterface.getById(d2id).getNode().getChildren().keySet());
+ Assertions.assertIterableEquals(List.of("Test2"), testNode1._storageInterface.getById(testNode2._storageInterface.getRootId()).children().keySet());
+ Assertions.assertIterableEquals(List.of("Test1", "TestFile"), testNode1._storageInterface.getById(d2id).children().keySet().stream().sorted().toList());
Assertions.assertEquals(d2id, testNode1._tree.traverse(List.of("Test2")));
Assertions.assertEquals(d1id, testNode1._tree.traverse(List.of("Test2", "Test1")));
Assertions.assertEquals(f1id, testNode1._tree.traverse(List.of("Test2", "TestFile")));
@@ -81,8 +81,8 @@ public class KleppmanTreeSimpleTest {
var f11 = testNode1._storageInterface.getById(f1id);
var f12 = testNode2._storageInterface.getById(f1id);
- Assertions.assertEquals(f11.getNode().getMeta(), f12.getNode().getMeta());
- Assertions.assertInstanceOf(TestNodeMetaFile.class, f11.getNode().getMeta());
+ Assertions.assertEquals(f11.meta(), f12.meta());
+ Assertions.assertInstanceOf(TestNodeMetaFile.class, f11.meta());
// Trim test
Assertions.assertTrue(testNode1._storageInterface.getLog().size() <= 1);
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNode.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNode.java
index 005cf2b0..53f2a7c6 100644
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNode.java
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNode.java
@@ -9,7 +9,7 @@ public class TestNode {
protected final TestClock _clock;
protected final TestPeerInterface _peerInterface;
protected final TestStorageInterface _storageInterface;
- protected final KleppmannTree _tree;
+ protected final KleppmannTree _tree;
private final TestOpRecorder _recorder;
public TestNode(long id) {
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMeta.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMeta.java
index 2c2e9f79..be276c9c 100644
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMeta.java
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMeta.java
@@ -1,12 +1,16 @@
package com.usatiuk.kleppmanntree;
-import lombok.Getter;
-
public abstract class TestNodeMeta implements NodeMeta {
- @Getter
private final String _name;
- public TestNodeMeta(String name) {_name = name;}
+ public TestNodeMeta(String name) {
+ _name = name;
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
abstract public NodeMeta withName(String name);
}
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMetaFile.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMetaFile.java
index 8a5bc91d..9cb0792f 100644
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMetaFile.java
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeMetaFile.java
@@ -1,9 +1,6 @@
package com.usatiuk.kleppmanntree;
-import lombok.Getter;
-
public class TestNodeMetaFile extends TestNodeMeta {
- @Getter
private final long _inode;
public TestNodeMetaFile(String name, long inode) {
@@ -11,6 +8,10 @@ public class TestNodeMetaFile extends TestNodeMeta {
_inode = inode;
}
+ public long getInode() {
+ return _inode;
+ }
+
@Override
public NodeMeta withName(String name) {
return new TestNodeMetaFile(name, _inode);
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeWrapper.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeWrapper.java
deleted file mode 100644
index 57a4f600..00000000
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestNodeWrapper.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.usatiuk.kleppmanntree;
-
-public class TestNodeWrapper implements TreeNodeWrapper {
- private final TreeNode _backingNode;
-
- public TestNodeWrapper(TreeNode backingNode) {_backingNode = backingNode;}
-
- @Override
- public void rLock() {
-
- }
-
- @Override
- public void rUnlock() {
-
- }
-
- @Override
- public void rwLock() {
-
- }
-
- @Override
- public void rwUnlock() {
-
- }
-
- @Override
- public void freeze() {
-
- }
-
- @Override
- public void unfreeze() {
-
- }
-
- @Override
- public void notifyRef(Long id) {
-
- }
-
- @Override
- public void notifyRmRef(Long id) {
-
- }
-
- @Override
- public TreeNode getNode() {
- return _backingNode;
- }
-}
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestPeerInterface.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestPeerInterface.java
index 3f793aab..708bb204 100644
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestPeerInterface.java
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestPeerInterface.java
@@ -6,7 +6,9 @@ import java.util.List;
public class TestPeerInterface implements PeerInterface {
private final long selfId;
- public TestPeerInterface(long selfId) {this.selfId = selfId;}
+ public TestPeerInterface(long selfId) {
+ this.selfId = selfId;
+ }
@Override
public Long getSelfId() {
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestStorageInterface.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestStorageInterface.java
index 0228d9bf..415f146a 100644
--- a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestStorageInterface.java
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestStorageInterface.java
@@ -3,17 +3,17 @@ package com.usatiuk.kleppmanntree;
import java.util.HashMap;
import java.util.Map;
-public class TestStorageInterface implements StorageInterface {
+public class TestStorageInterface implements StorageInterface {
private final long _peerId;
- private final Map> _nodes = new HashMap<>();
+ private final Map _nodes = new HashMap<>();
private final TestLog _log = new TestLog();
private final TestPeerLog _peerLog = new TestPeerLog();
private long _curId = 1;
public TestStorageInterface(long peerId) {
_peerId = peerId;
- _nodes.put(getRootId(), new TreeNode<>(getRootId(), null, null));
- _nodes.put(getTrashId(), new TreeNode<>(getTrashId(), null, null));
+ _nodes.put(getRootId(), new TestTreeNode(getRootId(), null, null));
+ _nodes.put(getTrashId(), new TestTreeNode(getTrashId(), null, null));
}
@Override
@@ -32,18 +32,18 @@ public class TestStorageInterface implements StorageInterface node) {
- if (!_nodes.containsKey(node.getId())) {
- _nodes.put(node.getId(), node);
- return new TestNodeWrapper(node);
- }
- throw new IllegalStateException("Node with id " + node.getId() + " already exists");
+ public TestTreeNode createNewNode(Long key, Long parent, TestNodeMeta meta) {
+ return new TestTreeNode(key, parent, meta);
+ }
+
+ @Override
+ public void putNode(TreeNode node) {
+ _nodes.put(node.key(), (TestTreeNode) node);
}
@Override
@@ -53,7 +53,6 @@ public class TestStorageInterface implements StorageInterface getLog() {
return _log;
@@ -64,29 +63,4 @@ public class TestStorageInterface implements StorageInterface getPeerTimestampLog() {
return _peerLog;
}
-
- @Override
- public void rLock() {
-
- }
-
- @Override
- public void rUnlock() {
-
- }
-
- @Override
- public void rwLock() {
-
- }
-
- @Override
- public void rwUnlock() {
-
- }
-
- @Override
- public void assertRwLock() {
-
- }
}
diff --git a/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestTreeNode.java b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestTreeNode.java
new file mode 100644
index 00000000..373eb580
--- /dev/null
+++ b/dhfs-parent/kleppmanntree/src/test/java/com/usatiuk/kleppmanntree/TestTreeNode.java
@@ -0,0 +1,33 @@
+package com.usatiuk.kleppmanntree;
+
+import org.pcollections.HashTreePMap;
+import org.pcollections.PMap;
+
+public record TestTreeNode(Long key, Long parent, OpMove lastEffectiveOp,
+ TestNodeMeta meta,
+ PMap children) implements TreeNode {
+
+ public TestTreeNode(Long id, Long parent, TestNodeMeta meta) {
+ this(id, parent, null, meta, HashTreePMap.empty());
+ }
+
+ @Override
+ public TreeNode withParent(Long parent) {
+ return new TestTreeNode(key, parent, lastEffectiveOp, meta, children);
+ }
+
+ @Override
+ public TreeNode withLastEffectiveOp(OpMove lastEffectiveOp) {
+ return new TestTreeNode(key, parent, lastEffectiveOp, meta, children);
+ }
+
+ @Override
+ public TreeNode withMeta(TestNodeMeta meta) {
+ return new TestTreeNode(key, parent, lastEffectiveOp, meta, children);
+ }
+
+ @Override
+ public TreeNode withChildren(PMap children) {
+ return new TestTreeNode(key, parent, lastEffectiveOp, meta, children);
+ }
+}
diff --git a/dhfs-parent/objects/pom.xml b/dhfs-parent/objects/pom.xml
new file mode 100644
index 00000000..b11658fb
--- /dev/null
+++ b/dhfs-parent/objects/pom.xml
@@ -0,0 +1,112 @@
+
+
+ 4.0.0
+
+ com.usatiuk.dhfs
+ parent
+ 1.0-SNAPSHOT
+
+
+ objects
+
+
+ 21
+ 21
+ UTF-8
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.quarkus
+ quarkus-arc
+
+
+ io.quarkus
+ quarkus-grpc
+
+
+ net.openhft
+ zero-allocation-hashing
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.jboss.slf4j
+ slf4j-jboss-logmanager
+ test
+
+
+ com.usatiuk.dhfs
+ utils
+ 1.0-SNAPSHOT
+
+
+ com.usatiuk.dhfs
+ supportlib
+ 1.0-SNAPSHOT
+
+
+ io.quarkus
+ quarkus-junit5-mockito
+ test
+
+
+ org.lmdbjava
+ lmdbjava
+ 0.9.1
+
+
+ org.apache.commons
+ commons-collections4
+
+
+ org.pcollections
+ pcollections
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 1C
+ false
+ classes
+
+
+
+ ${quarkus.platform.group-id}
+ quarkus-maven-plugin
+ ${quarkus.platform.version}
+ true
+
+
+ quarkus-plugin
+
+ build
+ generate-code
+ generate-code-tests
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java
new file mode 100644
index 00000000..7014f8a2
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CloseableKvIterator.java
@@ -0,0 +1,24 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.Iterator;
+
+public interface CloseableKvIterator, V> extends Iterator>, AutoCloseableNoThrow {
+ K peekNextKey();
+
+ void skip();
+
+ K peekPrevKey();
+
+ Pair prev();
+
+ boolean hasPrev();
+
+ void skipPrev();
+
+ default CloseableKvIterator reversed() {
+ return new ReversedKvIterator<>(this);
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java
new file mode 100644
index 00000000..604a10f8
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/CurrentTransaction.java
@@ -0,0 +1,55 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
+import com.usatiuk.dhfs.objects.transaction.Transaction;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.commons.lang3.tuple.Pair;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Optional;
+
+@ApplicationScoped
+public class CurrentTransaction implements Transaction {
+ @Inject
+ TransactionManager transactionManager;
+
+ @Override
+ public void onCommit(Runnable runnable) {
+ transactionManager.current().onCommit(runnable);
+ }
+
+ @Override
+ public void onFlush(Runnable runnable) {
+ transactionManager.current().onFlush(runnable);
+ }
+
+ @Override
+ public Optional get(Class type, JObjectKey key, LockingStrategy strategy) {
+ return transactionManager.current().get(type, key, strategy);
+ }
+
+ @Override
+ public void delete(JObjectKey key) {
+ transactionManager.current().delete(key);
+ }
+
+ @Nonnull
+ @Override
+ public Collection findAllObjects() {
+ return transactionManager.current().findAllObjects();
+ }
+
+ @Override
+ public CloseableKvIterator getIterator(IteratorStart start, JObjectKey key) {
+ return transactionManager.current().getIterator(start, key);
+ }
+
+ @Override
+ public void put(JData obj) {
+ transactionManager.current().put(obj);
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java
new file mode 100644
index 00000000..b1f7bcb7
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Data.java
@@ -0,0 +1,10 @@
+package com.usatiuk.dhfs.objects;
+
+import java.util.Optional;
+
+public record Data(V value) implements MaybeTombstone {
+ @Override
+ public Optional opt() {
+ return Optional.of(value);
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java
new file mode 100644
index 00000000..01798da9
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/IterProdFn.java
@@ -0,0 +1,8 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+
+@FunctionalInterface
+public interface IterProdFn, V> {
+ CloseableKvIterator get(IteratorStart start, K key);
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JData.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JData.java
new file mode 100644
index 00000000..501e3c35
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JData.java
@@ -0,0 +1,16 @@
+package com.usatiuk.dhfs.objects;
+
+import java.io.Serializable;
+
+// TODO: This could be maybe moved to a separate module?
+// The base class for JObject data
+// Only one instance of this "exists" per key, the instance in the manager is canonical
+// When committing a transaction, the instance is checked against it, if it isn't the same, a race occurred.
+// It is immutable, its version is filled in by the allocator from the AllocVersionProvider
+public interface JData extends Serializable {
+ JObjectKey key();
+
+ default int estimateSize() {
+ return 100;
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java
new file mode 100644
index 00000000..d1aaddc2
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JDataVersionedWrapper.java
@@ -0,0 +1,8 @@
+package com.usatiuk.dhfs.objects;
+
+import jakarta.annotation.Nonnull;
+
+import java.io.Serializable;
+
+public record JDataVersionedWrapper(@Nonnull JData data, long version) implements Serializable {
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java
new file mode 100644
index 00000000..b702069b
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectKey.java
@@ -0,0 +1,44 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public record JObjectKey(String name) implements Serializable, Comparable {
+ public static JObjectKey of(String name) {
+ return new JObjectKey(name);
+ }
+
+ @Override
+ public int compareTo(JObjectKey o) {
+ return name.compareTo(o.name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ public byte[] bytes() {
+ return name.getBytes(StandardCharsets.UTF_8);
+ }
+
+ public ByteBuffer toByteBuffer() {
+ var heapBb = StandardCharsets.UTF_8.encode(name);
+ if (heapBb.isDirect()) return heapBb;
+ var directBb = UninitializedByteBuffer.allocateUninitialized(heapBb.remaining());
+ directBb.put(heapBb);
+ directBb.flip();
+ return directBb;
+ }
+
+ public static JObjectKey fromBytes(byte[] bytes) {
+ return new JObjectKey(new String(bytes, StandardCharsets.UTF_8));
+ }
+
+ public static JObjectKey fromByteBuffer(ByteBuffer buff) {
+ return new JObjectKey(StandardCharsets.UTF_8.decode(buff).toString());
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java
new file mode 100644
index 00000000..37e6798d
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JObjectManager.java
@@ -0,0 +1,239 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.snapshot.SnapshotManager;
+import com.usatiuk.dhfs.objects.transaction.*;
+import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
+import io.quarkus.logging.Log;
+import io.quarkus.runtime.StartupEvent;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+
+import java.util.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+// Manages all access to com.usatiuk.dhfs.objects.JData objects.
+// In particular, it serves as a source of truth for what is committed to the backing storage.
+// All data goes through it, it is responsible for transaction atomicity
+// TODO: persistent tx id
+@ApplicationScoped
+public class JObjectManager {
+ private final List _preCommitTxHooks;
+ private boolean _ready = false;
+ @Inject
+ SnapshotManager snapshotManager;
+ @Inject
+ TransactionFactory transactionFactory;
+ @Inject
+ LockManager lockManager;
+
+ private void verifyReady() {
+ if (!_ready) throw new IllegalStateException("Wrong service order!");
+ }
+
+ void init(@Observes @Priority(200) StartupEvent event) {
+ _ready = true;
+ }
+
+ JObjectManager(Instance preCommitTxHooks) {
+ _preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
+ }
+
+ public TransactionPrivate createTransaction() {
+ verifyReady();
+ var tx = transactionFactory.createTransaction();
+ Log.tracev("Created transaction with snapshotId={0}", tx.snapshot().id());
+ return tx;
+ }
+
+ public TransactionHandle commit(TransactionPrivate tx) {
+ verifyReady();
+ var writes = new LinkedHashMap>();
+ var dependenciesLocked = new LinkedHashMap>();
+ Map> readSet;
+ var toUnlock = new ArrayList();
+
+ Consumer addDependency =
+ key -> {
+ dependenciesLocked.computeIfAbsent(key, k -> {
+ var lock = lockManager.lockObject(k);
+ toUnlock.add(lock);
+ return snapshotManager.readObjectDirect(k);
+ });
+ };
+
+ // For existing objects:
+ // Check that their version is not higher than the version of transaction being committed
+ // TODO: check deletions, inserts
+ try {
+ try {
+ Function getCurrent =
+ key -> switch (writes.get(key)) {
+ case TxRecord.TxObjectRecordWrite> write -> write.data();
+ case TxRecord.TxObjectRecordDeleted deleted -> null;
+ case null -> tx.readSource().get(JData.class, key).orElse(null);
+ default -> {
+ throw new TxCommitException("Unexpected value: " + writes.get(key));
+ }
+ };
+
+ boolean somethingChanged;
+ do {
+ somethingChanged = false;
+ Map> currentIteration = new HashMap();
+ for (var hook : _preCommitTxHooks) {
+ for (var n : tx.drainNewWrites())
+ currentIteration.put(n.key(), n);
+ Log.trace("Commit iteration with " + currentIteration.size() + " records for hook " + hook.getClass());
+
+ for (var entry : currentIteration.entrySet()) {
+ somethingChanged = true;
+ Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
+ var oldObj = getCurrent.apply(entry.getKey());
+ switch (entry.getValue()) {
+ case TxRecord.TxObjectRecordWrite> write -> {
+ if (oldObj == null) {
+ hook.onCreate(write.key(), write.data());
+ } else {
+ hook.onChange(write.key(), oldObj, write.data());
+ }
+ }
+ case TxRecord.TxObjectRecordDeleted deleted -> {
+ hook.onDelete(deleted.key(), oldObj);
+ }
+ default -> throw new TxCommitException("Unexpected value: " + entry);
+ }
+ }
+ }
+ writes.putAll(currentIteration);
+ } while (somethingChanged);
+
+ if (writes.isEmpty()) {
+ Log.trace("Committing transaction - no changes");
+ return new TransactionHandle() {
+ @Override
+ public void onFlush(Runnable runnable) {
+ runnable.run();
+ }
+ };
+ }
+
+ } finally {
+ readSet = tx.reads();
+
+ Stream.concat(readSet.keySet().stream(), writes.keySet().stream())
+ .sorted(Comparator.comparing(JObjectKey::toString))
+ .forEach(addDependency);
+
+ for (var read : readSet.entrySet()) {
+ if (read.getValue() instanceof TransactionObjectLocked> locked) {
+ toUnlock.add(locked.lock());
+ }
+ }
+ }
+
+ Log.trace("Committing transaction start");
+ var snapshotId = tx.snapshot().id();
+
+ for (var read : readSet.entrySet()) {
+ var dep = dependenciesLocked.get(read.getKey());
+
+ if (dep.isEmpty() != read.getValue().data().isEmpty()) {
+ Log.trace("Checking read dependency " + read.getKey() + " - not found");
+ throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
+ }
+
+ if (dep.isEmpty()) {
+ // TODO: Every write gets a dependency due to hooks
+ continue;
+// assert false;
+// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
+ }
+
+ if (dep.get().version() > snapshotId) {
+ Log.trace("Checking dependency " + read.getKey() + " - newer than");
+ throw new TxCommitException("Serialization hazard: " + dep.get().version() + " vs " + snapshotId);
+ }
+
+ Log.trace("Checking dependency " + read.getKey() + " - ok with read");
+ }
+
+ var addFlushCallback = snapshotManager.commitTx(
+ writes.values().stream()
+ .filter(r -> {
+ if (r instanceof TxRecord.TxObjectRecordWrite>(JData data)) {
+ var dep = dependenciesLocked.get(data.key());
+ if (dep.isPresent() && dep.get().version() > snapshotId) {
+ Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
+ return false;
+ }
+ }
+ return true;
+ }).toList());
+
+ for (var callback : tx.getOnCommit()) {
+ callback.run();
+ }
+
+ for (var callback : tx.getOnFlush()) {
+ addFlushCallback.accept(callback);
+ }
+
+ return new TransactionHandle() {
+ @Override
+ public void onFlush(Runnable runnable) {
+ addFlushCallback.accept(runnable);
+ }
+ };
+ } catch (Throwable t) {
+ Log.trace("Error when committing transaction", t);
+ throw new TxCommitException(t.getMessage(), t);
+ } finally {
+ for (var unlock : toUnlock) {
+ unlock.close();
+ }
+ tx.close();
+ }
+ }
+
+ public void rollback(TransactionPrivate tx) {
+ verifyReady();
+ tx.reads().forEach((key, value) -> {
+ if (value instanceof TransactionObjectLocked> locked) {
+ locked.lock().close();
+ }
+ });
+ tx.close();
+ }
+
+ // private class TransactionObjectSourceImpl implements TransactionObjectSource {
+// private final long _txId;
+//
+// private TransactionObjectSourceImpl(long txId) {
+// _txId = txId;
+// }
+//
+// @Override
+// public TransactionObject get(Class type, JObjectKey key) {
+// var got = getObj(type, key);
+// if (got.data().isPresent() && got.data().get().version() > _txId) {
+// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
+// }
+// return got;
+// }
+//
+// @Override
+// public TransactionObject getWriteLocked(Class type, JObjectKey key) {
+// var got = getObjLock(type, key);
+// if (got.data().isPresent() && got.data().get().version() > _txId) {
+// got.lock().close();
+// throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
+// }
+// return got;
+// }
+// }
+}
\ No newline at end of file
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JavaDataSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JavaDataSerializer.java
new file mode 100644
index 00000000..a42ebc07
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/JavaDataSerializer.java
@@ -0,0 +1,21 @@
+package com.usatiuk.dhfs.objects;
+
+
+import com.google.protobuf.ByteString;
+import com.usatiuk.dhfs.utils.SerializationHelper;
+import jakarta.enterprise.context.ApplicationScoped;
+
+import java.io.Serializable;
+
+@ApplicationScoped
+public class JavaDataSerializer implements ObjectSerializer {
+ @Override
+ public ByteString serialize(JDataVersionedWrapper obj) {
+ return SerializationHelper.serialize((Serializable) obj);
+ }
+
+ @Override
+ public JDataVersionedWrapper deserialize(ByteString data) {
+ return SerializationHelper.deserialize(data.toByteArray());
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java
new file mode 100644
index 00000000..b43308d2
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/KeyPredicateKvIterator.java
@@ -0,0 +1,129 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+
+public class KeyPredicateKvIterator, V> extends ReversibleKvIterator {
+ private final CloseableKvIterator _backing;
+ private final Function _filter;
+ private K _next;
+
+ public KeyPredicateKvIterator(CloseableKvIterator backing, IteratorStart start, K startKey, Function filter) {
+ _goingForward = true;
+ _backing = backing;
+ _filter = filter;
+ fillNext();
+
+ boolean shouldGoBack = false;
+ if (start == IteratorStart.LE) {
+ if (_next == null || _next.compareTo(startKey) > 0) {
+ shouldGoBack = true;
+ }
+ } else if (start == IteratorStart.LT) {
+ if (_next == null || _next.compareTo(startKey) >= 0) {
+ shouldGoBack = true;
+ }
+ }
+
+ if (shouldGoBack && _backing.hasPrev()) {
+ _goingForward = false;
+ _next = null;
+ fillNext();
+ if (_next != null)
+ _backing.skipPrev();
+ _goingForward = true;
+// _backing.skip();
+ fillNext();
+ }
+
+
+ switch (start) {
+ case LT -> {
+// assert _next == null || _next.getKey().compareTo(startKey) < 0;
+ }
+ case LE -> {
+// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
+ }
+ case GT -> {
+ assert _next == null || _next.compareTo(startKey) > 0;
+ }
+ case GE -> {
+ assert _next == null || _next.compareTo(startKey) >= 0;
+ }
+ }
+ }
+
+ private void fillNext() {
+ while ((_goingForward ? _backing.hasNext() : _backing.hasPrev()) && _next == null) {
+ var next = _goingForward ? _backing.peekNextKey() : _backing.peekPrevKey();
+ if (!_filter.apply(next)) {
+ if (_goingForward)
+ _backing.skip();
+ else
+ _backing.skipPrev();
+ continue;
+ }
+ _next = next;
+ }
+ }
+
+ @Override
+ protected void reverse() {
+ _goingForward = !_goingForward;
+ _next = null;
+
+ fillNext();
+ }
+
+ @Override
+ protected K peekImpl() {
+ if (_next == null)
+ throw new NoSuchElementException();
+ return _next;
+ }
+
+ @Override
+ protected void skipImpl() {
+ if (_next == null)
+ throw new NoSuchElementException();
+ _next = null;
+ if (_goingForward)
+ _backing.skip();
+ else
+ _backing.skipPrev();
+ fillNext();
+ }
+
+ @Override
+ protected boolean hasImpl() {
+ return _next != null;
+ }
+
+ @Override
+ protected Pair nextImpl() {
+ if (_next == null)
+ throw new NoSuchElementException("No more elements");
+ var retKey = _next;
+ _next = null;
+ var got = _goingForward ? _backing.next() : _backing.prev();
+ assert got.getKey().equals(retKey);
+ fillNext();
+ return got;
+ }
+
+ @Override
+ public void close() {
+ _backing.close();
+ }
+
+ @Override
+ public String toString() {
+ return "KeyPredicateKvIterator{" +
+ "_backing=" + _backing +
+ ", _next=" + _next +
+ '}';
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java
new file mode 100644
index 00000000..8d7ae3d1
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/LockManager.java
@@ -0,0 +1,14 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
+import com.usatiuk.dhfs.utils.DataLocker;
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class LockManager {
+ private final DataLocker _objLocker = new DataLocker();
+
+ public AutoCloseableNoThrow lockObject(JObjectKey key) {
+ return _objLocker.lock(key);
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MappingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MappingKvIterator.java
new file mode 100644
index 00000000..eae8f788
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MappingKvIterator.java
@@ -0,0 +1,69 @@
+package com.usatiuk.dhfs.objects;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.function.Function;
+
+public class MappingKvIterator, V, V_T> implements CloseableKvIterator {
+ private final CloseableKvIterator _backing;
+ private final Function _transformer;
+
+ public MappingKvIterator(CloseableKvIterator backing, Function transformer) {
+ _backing = backing;
+ _transformer = transformer;
+ }
+
+ @Override
+ public K peekNextKey() {
+ return _backing.peekNextKey();
+ }
+
+ @Override
+ public void skip() {
+ _backing.skip();
+ }
+
+ @Override
+ public void close() {
+ _backing.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _backing.hasNext();
+ }
+
+ @Override
+ public K peekPrevKey() {
+ return _backing.peekPrevKey();
+ }
+
+ @Override
+ public Pair prev() {
+ var got = _backing.prev();
+ return Pair.of(got.getKey(), _transformer.apply(got.getValue()));
+ }
+
+ @Override
+ public boolean hasPrev() {
+ return _backing.hasPrev();
+ }
+
+ @Override
+ public void skipPrev() {
+ _backing.skipPrev();
+ }
+
+ @Override
+ public Pair next() {
+ var got = _backing.next();
+ return Pair.of(got.getKey(), _transformer.apply(got.getValue()));
+ }
+
+ @Override
+ public String toString() {
+ return "MappingKvIterator{" +
+ "_backing=" + _backing +
+ '}';
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java
new file mode 100644
index 00000000..f6d47c71
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MaybeTombstone.java
@@ -0,0 +1,7 @@
+package com.usatiuk.dhfs.objects;
+
+import java.util.Optional;
+
+public interface MaybeTombstone {
+ Optional opt();
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java
new file mode 100644
index 00000000..78c8e482
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/MergingKvIterator.java
@@ -0,0 +1,192 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import io.quarkus.logging.Log;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class MergingKvIterator, V> extends ReversibleKvIterator {
+ private final Map, Integer> _iterators;
+ private final NavigableMap> _sortedIterators = new TreeMap<>();
+ private final String _name;
+
+ public MergingKvIterator(String name, IteratorStart startType, K startKey, List> iterators) {
+ _goingForward = true;
+ _name = name;
+
+ IteratorStart initialStartType = startType;
+ K initialStartKey = startKey;
+ boolean fail = false;
+ if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
+ // Starting at a greatest key less than/less or equal than:
+ // We have a bunch of iterators that have given us theirs "greatest LT/LE key"
+ // now we need to pick the greatest of those to start with
+ // But if some of them don't have a lesser key, we need to pick the smallest of those
+ var initialIterators = iterators.stream().map(p -> p.get(initialStartType, initialStartKey)).toList();
+ try {
+ IteratorStart finalStartType = startType;
+ var found = initialIterators.stream()
+ .filter(CloseableKvIterator::hasNext)
+ .map((i) -> {
+ var peeked = i.peekNextKey();
+// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
+ return peeked;
+ }).distinct().collect(Collectors.partitioningBy(e -> finalStartType == IteratorStart.LE ? e.compareTo(initialStartKey) <= 0 : e.compareTo(initialStartKey) < 0));
+ K initialMaxValue;
+ if (!found.get(true).isEmpty())
+ initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
+ else
+ initialMaxValue = found.get(false).stream().min(Comparator.naturalOrder()).orElse(null);
+ if (initialMaxValue == null) {
+ fail = true;
+ }
+ startKey = initialMaxValue;
+ startType = IteratorStart.GE;
+ } finally {
+ initialIterators.forEach(CloseableKvIterator::close);
+ }
+ }
+
+ if (fail) {
+ _iterators = Map.of();
+ return;
+ }
+
+ int counter = 0;
+ var iteratorsTmp = new HashMap, Integer>();
+ for (var iteratorFn : iterators) {
+ var iterator = iteratorFn.get(startType, startKey);
+ iteratorsTmp.put(iterator, counter++);
+ }
+ _iterators = Map.copyOf(iteratorsTmp);
+
+ for (CloseableKvIterator iterator : _iterators.keySet()) {
+ advanceIterator(iterator);
+ }
+
+ Log.tracev("{0} Created: {1}", _name, _sortedIterators);
+ switch (initialStartType) {
+// case LT -> {
+// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
+// }
+// case LE -> {
+// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
+// }
+ case GT -> {
+ assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) > 0;
+ }
+ case GE -> {
+ assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) >= 0;
+ }
+ }
+ }
+
+ @SafeVarargs
+ public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn... iterators) {
+ this(name, startType, startKey, List.of(iterators));
+ }
+
+
+ private void advanceIterator(CloseableKvIterator iterator) {
+ if (!iterator.hasNext()) {
+ return;
+ }
+
+ K key = iterator.peekNextKey();
+ Log.tracev("{0} Advance peeked: {1}-{2}", _name, iterator, key);
+ if (!_sortedIterators.containsKey(key)) {
+ _sortedIterators.put(key, iterator);
+ return;
+ }
+
+ // Expects that reversed iterator returns itself when reversed again
+ var oursPrio = _iterators.get(_goingForward ? iterator : iterator.reversed());
+ var them = _sortedIterators.get(key);
+ var theirsPrio = _iterators.get(_goingForward ? them : them.reversed());
+ if (oursPrio < theirsPrio) {
+ _sortedIterators.put(key, iterator);
+ advanceIterator(them);
+ } else {
+ Log.tracev("{0} Skipped: {1}", _name, iterator.peekNextKey());
+ iterator.skip();
+ advanceIterator(iterator);
+ }
+ }
+
+ @Override
+ protected void reverse() {
+ var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
+ Log.tracev("{0} Reversing from {1}", _name, cur);
+ _goingForward = !_goingForward;
+ _sortedIterators.clear();
+ for (CloseableKvIterator iterator : _iterators.keySet()) {
+ // _goingForward inverted already
+ advanceIterator(!_goingForward ? iterator.reversed() : iterator);
+ }
+ if (_sortedIterators.isEmpty() || cur == null) {
+ return;
+ }
+ // Advance to the expected key, as we might have brought back some iterators
+ // that were at their ends
+ while (!_sortedIterators.isEmpty()
+ && ((_goingForward && peekImpl().compareTo(cur.getKey()) <= 0)
+ || (!_goingForward && peekImpl().compareTo(cur.getKey()) >= 0))) {
+ skipImpl();
+ }
+ Log.tracev("{0} Reversed to {1}", _name, _sortedIterators);
+ }
+
+ @Override
+ protected K peekImpl() {
+ if (_sortedIterators.isEmpty())
+ throw new NoSuchElementException();
+ return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey();
+ }
+
+ @Override
+ protected void skipImpl() {
+ var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
+ if (cur == null) {
+ throw new NoSuchElementException();
+ }
+ cur.getValue().skip();
+ advanceIterator(cur.getValue());
+ Log.tracev("{0} Skip: {1}, next: {2}", _name, cur, _sortedIterators);
+ }
+
+ @Override
+ protected boolean hasImpl() {
+ return !_sortedIterators.isEmpty();
+ }
+
+ @Override
+ protected Pair nextImpl() {
+ var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
+ if (cur == null) {
+ throw new NoSuchElementException();
+ }
+ var curVal = cur.getValue().next();
+ advanceIterator(cur.getValue());
+// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, cur.getValue(), curVal, _sortedIterators.keySet());
+ return curVal;
+ }
+
+
+ @Override
+ public void close() {
+ for (CloseableKvIterator iterator : _iterators.keySet()) {
+ iterator.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MergingKvIterator{" +
+ "_name='" + _name + '\'' +
+ ", _sortedIterators=" + _sortedIterators.keySet() +
+ ", _iterators=" + _iterators +
+ '}';
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/NavigableMapKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/NavigableMapKvIterator.java
new file mode 100644
index 00000000..c1f07007
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/NavigableMapKvIterator.java
@@ -0,0 +1,104 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.*;
+
+public class NavigableMapKvIterator, V> extends ReversibleKvIterator {
+ private final NavigableMap _map;
+ private Iterator> _iterator;
+ private Map.Entry _next;
+
+ public NavigableMapKvIterator(NavigableMap map, IteratorStart start, K key) {
+ _map = map;
+ SortedMap _view;
+ _goingForward = true;
+ switch (start) {
+ case GE -> _view = map.tailMap(key, true);
+ case GT -> _view = map.tailMap(key, false);
+ case LE -> {
+ var floorKey = map.floorKey(key);
+ if (floorKey == null) _view = _map;
+ else _view = map.tailMap(floorKey, true);
+ }
+ case LT -> {
+ var lowerKey = map.lowerKey(key);
+ if (lowerKey == null) _view = _map;
+ else _view = map.tailMap(lowerKey, true);
+ }
+ default -> throw new IllegalArgumentException("Unknown start type");
+ }
+ _iterator = _view.entrySet().iterator();
+ fillNext();
+ }
+
+ @Override
+ protected void reverse() {
+ var oldNext = _next;
+ _next = null;
+ if (_goingForward) {
+ _iterator
+ = oldNext == null
+ ? _map.descendingMap().entrySet().iterator()
+ : _map.headMap(oldNext.getKey(), false).descendingMap().entrySet().iterator();
+ } else {
+ _iterator
+ = oldNext == null
+ ? _map.entrySet().iterator()
+ : _map.tailMap(oldNext.getKey(), false).entrySet().iterator();
+ }
+ _goingForward = !_goingForward;
+ fillNext();
+ }
+
+ private void fillNext() {
+ while (_iterator.hasNext() && _next == null) {
+ _next = _iterator.next();
+ }
+ }
+
+ @Override
+ protected K peekImpl() {
+ if (_next == null) {
+ throw new NoSuchElementException();
+ }
+ return _next.getKey();
+ }
+
+ @Override
+ protected void skipImpl() {
+ if (_next == null) {
+ throw new NoSuchElementException();
+ }
+ _next = null;
+ fillNext();
+ }
+
+ @Override
+ protected boolean hasImpl() {
+ return _next != null;
+ }
+
+ @Override
+ protected Pair nextImpl() {
+ if (_next == null) {
+ throw new NoSuchElementException("No more elements");
+ }
+ var ret = _next;
+ _next = null;
+ fillNext();
+ return Pair.of(ret);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String toString() {
+ return "NavigableMapKvIterator{" +
+ ", _next=" + _next +
+ '}';
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ObjectSerializer.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ObjectSerializer.java
new file mode 100644
index 00000000..078dd90f
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ObjectSerializer.java
@@ -0,0 +1,9 @@
+package com.usatiuk.dhfs.objects;
+
+import com.google.protobuf.ByteString;
+
+public interface ObjectSerializer {
+ ByteString serialize(T obj);
+
+ T deserialize(ByteString data);
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingDelete.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingDelete.java
new file mode 100644
index 00000000..8ecc85b5
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingDelete.java
@@ -0,0 +1,4 @@
+package com.usatiuk.dhfs.objects;
+
+public record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingWrite.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingWrite.java
new file mode 100644
index 00000000..065224f6
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingWrite.java
@@ -0,0 +1,4 @@
+package com.usatiuk.dhfs.objects;
+
+public record PendingWrite(JDataVersionedWrapper data, long bundleId) implements PendingWriteEntry {
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingWriteEntry.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingWriteEntry.java
new file mode 100644
index 00000000..1476e167
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PendingWriteEntry.java
@@ -0,0 +1,5 @@
+package com.usatiuk.dhfs.objects;
+
+public interface PendingWriteEntry {
+ long bundleId();
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PreCommitTxHook.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PreCommitTxHook.java
new file mode 100644
index 00000000..3b1b50e4
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PreCommitTxHook.java
@@ -0,0 +1,16 @@
+package com.usatiuk.dhfs.objects;
+
+public interface PreCommitTxHook {
+ default void onChange(JObjectKey key, JData old, JData cur) {
+ }
+
+ default void onCreate(JObjectKey key, JData cur) {
+ }
+
+ default void onDelete(JObjectKey key, JData cur) {
+ }
+
+ default int getPriority() {
+ return 0;
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java
new file mode 100644
index 00000000..cfe85ffa
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/PredicateKvIterator.java
@@ -0,0 +1,129 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import io.quarkus.logging.Log;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+
+public class PredicateKvIterator, V, V_T> extends ReversibleKvIterator {
+ private final CloseableKvIterator _backing;
+ private final Function _transformer;
+ private Pair _next;
+
+ public PredicateKvIterator(CloseableKvIterator backing, IteratorStart start, K startKey, Function transformer) {
+ _goingForward = true;
+ _backing = backing;
+ _transformer = transformer;
+ fillNext();
+
+ boolean shouldGoBack = false;
+ if (start == IteratorStart.LE) {
+ if (_next == null || _next.getKey().compareTo(startKey) > 0) {
+ shouldGoBack = true;
+ }
+ } else if (start == IteratorStart.LT) {
+ if (_next == null || _next.getKey().compareTo(startKey) >= 0) {
+ shouldGoBack = true;
+ }
+ }
+
+ if (shouldGoBack && _backing.hasPrev()) {
+ _goingForward = false;
+ _next = null;
+ _backing.skipPrev();
+ fillNext();
+ _goingForward = true;
+ _backing.skip();
+ fillNext();
+ }
+
+
+ switch (start) {
+ case LT -> {
+// assert _next == null || _next.getKey().compareTo(startKey) < 0;
+ }
+ case LE -> {
+// assert _next == null || _next.getKey().compareTo(startKey) <= 0;
+ }
+ case GT -> {
+ assert _next == null || _next.getKey().compareTo(startKey) > 0;
+ }
+ case GE -> {
+ assert _next == null || _next.getKey().compareTo(startKey) >= 0;
+ }
+ }
+ }
+
+ private void fillNext() {
+ while ((_goingForward ? _backing.hasNext() : _backing.hasPrev()) && _next == null) {
+ var next = _goingForward ? _backing.next() : _backing.prev();
+ var transformed = _transformer.apply(next.getValue());
+ if (transformed == null)
+ continue;
+ _next = Pair.of(next.getKey(), transformed);
+ }
+ }
+
+ @Override
+ protected void reverse() {
+ _goingForward = !_goingForward;
+ boolean wasAtEnd = _next == null;
+
+ if (_goingForward && !wasAtEnd)
+ _backing.skip();
+ else if (!_goingForward && !wasAtEnd)
+ _backing.skipPrev();
+
+ if (!wasAtEnd)
+ Log.tracev("Skipped in reverse: {0}", _next);
+
+ _next = null;
+
+ fillNext();
+ }
+
+ @Override
+ protected K peekImpl() {
+ if (_next == null)
+ throw new NoSuchElementException();
+ return _next.getKey();
+ }
+
+ @Override
+ protected void skipImpl() {
+ if (_next == null)
+ throw new NoSuchElementException();
+ _next = null;
+ fillNext();
+ }
+
+ @Override
+ protected boolean hasImpl() {
+ return _next != null;
+ }
+
+ @Override
+ protected Pair nextImpl() {
+ if (_next == null)
+ throw new NoSuchElementException("No more elements");
+ var ret = _next;
+ _next = null;
+ fillNext();
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ _backing.close();
+ }
+
+ @Override
+ public String toString() {
+ return "PredicateKvIterator{" +
+ "_backing=" + _backing +
+ ", _next=" + _next +
+ '}';
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ReversedKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ReversedKvIterator.java
new file mode 100644
index 00000000..88b23f30
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ReversedKvIterator.java
@@ -0,0 +1,61 @@
+package com.usatiuk.dhfs.objects;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+public class ReversedKvIterator, V> implements CloseableKvIterator {
+ private final CloseableKvIterator _backing;
+
+ public ReversedKvIterator(CloseableKvIterator backing) {
+ _backing = backing;
+ }
+
+ @Override
+ public void close() {
+ _backing.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _backing.hasPrev();
+ }
+
+ @Override
+ public Pair next() {
+ return _backing.prev();
+ }
+
+ @Override
+ public K peekNextKey() {
+ return _backing.peekPrevKey();
+ }
+
+ @Override
+ public void skip() {
+ _backing.skipPrev();
+ }
+
+ @Override
+ public K peekPrevKey() {
+ return _backing.peekNextKey();
+ }
+
+ @Override
+ public Pair prev() {
+ return _backing.next();
+ }
+
+ @Override
+ public boolean hasPrev() {
+ return _backing.hasNext();
+ }
+
+ @Override
+ public void skipPrev() {
+ _backing.skip();
+ }
+
+ @Override
+ public CloseableKvIterator reversed() {
+ return _backing;
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ReversibleKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ReversibleKvIterator.java
new file mode 100644
index 00000000..a13a063d
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/ReversibleKvIterator.java
@@ -0,0 +1,79 @@
+package com.usatiuk.dhfs.objects;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+public abstract class ReversibleKvIterator, V> implements CloseableKvIterator {
+ protected boolean _goingForward;
+
+ protected abstract void reverse();
+
+ private void ensureForward() {
+ if (!_goingForward) {
+ reverse();
+ }
+ }
+
+ private void ensureBackward() {
+ if (_goingForward) {
+ reverse();
+ }
+ }
+
+ abstract protected K peekImpl();
+
+ abstract protected void skipImpl();
+
+ abstract protected boolean hasImpl();
+
+ abstract protected Pair nextImpl();
+
+ @Override
+ public K peekNextKey() {
+ ensureForward();
+ return peekImpl();
+ }
+
+ @Override
+ public void skip() {
+ ensureForward();
+ skipImpl();
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ ensureForward();
+ return hasImpl();
+ }
+
+ @Override
+ public Pair next() {
+ ensureForward();
+ return nextImpl();
+ }
+
+ @Override
+ public K peekPrevKey() {
+ ensureBackward();
+ return peekImpl();
+ }
+
+ @Override
+ public Pair prev() {
+ ensureBackward();
+ return nextImpl();
+ }
+
+ @Override
+ public boolean hasPrev() {
+ ensureBackward();
+ return hasImpl();
+ }
+
+ @Override
+ public void skipPrev() {
+ ensureBackward();
+ skipImpl();
+ }
+
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java
new file mode 100644
index 00000000..62a7ca1c
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/Tombstone.java
@@ -0,0 +1,10 @@
+package com.usatiuk.dhfs.objects;
+
+import java.util.Optional;
+
+public record Tombstone() implements MaybeTombstone {
+ @Override
+ public Optional opt() {
+ return Optional.empty();
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java
new file mode 100644
index 00000000..e8e01e27
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TombstoneMergingKvIterator.java
@@ -0,0 +1,84 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import io.quarkus.logging.Log;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.List;
+
+public class TombstoneMergingKvIterator, V> implements CloseableKvIterator {
+ private final CloseableKvIterator _backing;
+ private final String _name;
+
+ public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, List>> iterators) {
+ _name = name;
+ _backing = new PredicateKvIterator<>(
+ new MergingKvIterator<>(name + "-merging", startType, startKey, iterators),
+ startType, startKey,
+ pair -> {
+ Log.tracev("{0} - Processing pair {1}", _name, pair);
+ if (pair instanceof Tombstone) {
+ return null;
+ }
+ return ((Data) pair).value();
+ });
+ }
+
+ @SafeVarargs
+ public TombstoneMergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn>... iterators) {
+ this(name, startType, startKey, List.of(iterators));
+ }
+
+ @Override
+ public K peekNextKey() {
+ return _backing.peekNextKey();
+ }
+
+ @Override
+ public void skip() {
+ _backing.skip();
+ }
+
+ @Override
+ public K peekPrevKey() {
+ return _backing.peekPrevKey();
+ }
+
+ @Override
+ public Pair prev() {
+ return _backing.prev();
+ }
+
+ @Override
+ public boolean hasPrev() {
+ return _backing.hasPrev();
+ }
+
+ @Override
+ public void skipPrev() {
+ _backing.skipPrev();
+ }
+
+ @Override
+ public void close() {
+ _backing.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _backing.hasNext();
+ }
+
+ @Override
+ public Pair next() {
+ return _backing.next();
+ }
+
+ @Override
+ public String toString() {
+ return "TombstoneMergingKvIterator{" +
+ "_backing=" + _backing +
+ ", _name='" + _name + '\'' +
+ '}';
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java
new file mode 100644
index 00000000..2fe54390
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManager.java
@@ -0,0 +1,102 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.transaction.Transaction;
+import com.usatiuk.dhfs.objects.transaction.TransactionHandle;
+import com.usatiuk.dhfs.utils.VoidFn;
+import io.quarkus.logging.Log;
+
+import java.util.function.Supplier;
+
+public interface TransactionManager {
+ void begin();
+
+ TransactionHandle commit();
+
+ void rollback();
+
+ default T runTries(Supplier supplier, int tries) {
+ if (current() != null) {
+ return supplier.get();
+ }
+
+ begin();
+ T ret;
+ try {
+ ret = supplier.get();
+ } catch (TxCommitException txCommitException) {
+ rollback();
+ if (tries == 0) {
+ Log.error("Transaction commit failed", txCommitException);
+ throw txCommitException;
+ }
+ return runTries(supplier, tries - 1);
+ } catch (Throwable e) {
+ rollback();
+ throw e;
+ }
+ try {
+ commit();
+ return ret;
+ } catch (TxCommitException txCommitException) {
+ if (tries == 0) {
+ Log.error("Transaction commit failed", txCommitException);
+ throw txCommitException;
+ }
+ return runTries(supplier, tries - 1);
+ }
+ }
+
+ default TransactionHandle runTries(VoidFn fn, int tries) {
+ if (current() != null) {
+ fn.apply();
+ return new TransactionHandle() {
+ @Override
+ public void onFlush(Runnable runnable) {
+ current().onCommit(runnable);
+ }
+ };
+ }
+
+ begin();
+ try {
+ fn.apply();
+ } catch (TxCommitException txCommitException) {
+ rollback();
+ if (tries == 0) {
+ Log.error("Transaction commit failed", txCommitException);
+ throw txCommitException;
+ }
+ return runTries(fn, tries - 1);
+ } catch (Throwable e) {
+ rollback();
+ throw e;
+ }
+ try {
+ return commit();
+ } catch (TxCommitException txCommitException) {
+ if (tries == 0) {
+ Log.error("Transaction commit failed", txCommitException);
+ throw txCommitException;
+ }
+ return runTries(fn, tries - 1);
+ }
+ }
+
+ default TransactionHandle run(VoidFn fn) {
+ return runTries(fn, 10);
+ }
+
+ default T run(Supplier supplier) {
+ return runTries(supplier, 10);
+ }
+
+ default void executeTx(VoidFn fn) {
+ run(fn);
+ }
+
+ default T executeTx(Supplier supplier) {
+ return run(supplier);
+ }
+
+ Transaction current();
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java
new file mode 100644
index 00000000..bf617e19
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionManagerImpl.java
@@ -0,0 +1,67 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.transaction.Transaction;
+import com.usatiuk.dhfs.objects.transaction.TransactionHandle;
+import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
+import io.quarkus.logging.Log;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+@ApplicationScoped
+public class TransactionManagerImpl implements TransactionManager {
+ private static final ThreadLocal _currentTransaction = new ThreadLocal<>();
+ @Inject
+ JObjectManager jObjectManager;
+
+ @Override
+ public void begin() {
+ if (_currentTransaction.get() != null) {
+ throw new IllegalStateException("Transaction already started");
+ }
+
+ Log.trace("Starting transaction");
+ var tx = jObjectManager.createTransaction();
+ _currentTransaction.set(tx);
+ }
+
+ @Override
+ public TransactionHandle commit() {
+ if (_currentTransaction.get() == null) {
+ throw new IllegalStateException("No transaction started");
+ }
+
+ Log.trace("Committing transaction");
+ try {
+ return jObjectManager.commit(_currentTransaction.get());
+ } catch (Throwable e) {
+ Log.trace("Transaction commit failed", e);
+ throw e;
+ } finally {
+ _currentTransaction.get().close();
+ _currentTransaction.remove();
+ }
+ }
+
+ @Override
+ public void rollback() {
+ if (_currentTransaction.get() == null) {
+ throw new IllegalStateException("No transaction started");
+ }
+
+ try {
+ jObjectManager.rollback(_currentTransaction.get());
+ } catch (Throwable e) {
+ Log.error("Transaction rollback failed", e);
+ throw e;
+ } finally {
+ _currentTransaction.get().close();
+ _currentTransaction.remove();
+ }
+ }
+
+ @Override
+ public Transaction current() {
+ return _currentTransaction.get();
+ }
+}
+
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionObjectLocked.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionObjectLocked.java
new file mode 100644
index 00000000..ac3a856c
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionObjectLocked.java
@@ -0,0 +1,11 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.transaction.TransactionObject;
+import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
+
+import java.util.Optional;
+
+public record TransactionObjectLocked
+ (Optional data, AutoCloseableNoThrow lock)
+ implements TransactionObject {
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionObjectNoLock.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionObjectNoLock.java
new file mode 100644
index 00000000..7672d09a
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TransactionObjectNoLock.java
@@ -0,0 +1,10 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.transaction.TransactionObject;
+
+import java.util.Optional;
+
+public record TransactionObjectNoLock
+ (Optional data)
+ implements TransactionObject {
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxCommitException.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxCommitException.java
new file mode 100644
index 00000000..73e488d6
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/TxCommitException.java
@@ -0,0 +1,11 @@
+package com.usatiuk.dhfs.objects;
+
+public class TxCommitException extends RuntimeException {
+ public TxCommitException(String message) {
+ super(message);
+ }
+
+ public TxCommitException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java
new file mode 100644
index 00000000..2fb14558
--- /dev/null
+++ b/dhfs-parent/objects/src/main/java/com/usatiuk/dhfs/objects/WritebackObjectPersistentStore.java
@@ -0,0 +1,490 @@
+package com.usatiuk.dhfs.objects;
+
+import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
+import com.usatiuk.dhfs.objects.persistence.IteratorStart;
+import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
+import com.usatiuk.dhfs.objects.transaction.TxRecord;
+import io.quarkus.logging.Log;
+import io.quarkus.runtime.ShutdownEvent;
+import io.quarkus.runtime.StartupEvent;
+import jakarta.annotation.Priority;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.Pair;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.pcollections.PSortedMap;
+import org.pcollections.TreePMap;
+
+import javax.annotation.Nonnull;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+@ApplicationScoped
+public class WritebackObjectPersistentStore {
+ private final LinkedList _pendingBundles = new LinkedList<>();
+
+ private final AtomicReference> _pendingWrites = new AtomicReference<>(TreePMap.empty());
+ private final ReentrantReadWriteLock _pendingWritesVersionLock = new ReentrantReadWriteLock();
+ private final LinkedHashMap _notFlushedBundles = new LinkedHashMap<>();
+
+ private final Object _flushWaitSynchronizer = new Object();
+ private final AtomicLong _lastWrittenTx = new AtomicLong(-1);
+ private final AtomicLong _counter = new AtomicLong();
+ private final AtomicLong _lastCommittedTx = new AtomicLong(-1);
+ private final AtomicLong _waitedTotal = new AtomicLong(0);
+ @Inject
+ CachingObjectPersistentStore cachedStore;
+ @ConfigProperty(name = "dhfs.objects.writeback.limit")
+ long sizeLimit;
+ private long currentSize = 0;
+ private ExecutorService _writebackExecutor;
+ private ExecutorService _statusExecutor;
+ private volatile boolean _ready = false;
+
+ void init(@Observes @Priority(110) StartupEvent event) {
+ {
+ BasicThreadFactory factory = new BasicThreadFactory.Builder()
+ .namingPattern("tx-writeback-%d")
+ .build();
+
+ _writebackExecutor = Executors.newSingleThreadExecutor(factory);
+ _writebackExecutor.submit(this::writeback);
+ }
+
+ _statusExecutor = Executors.newSingleThreadExecutor();
+ _statusExecutor.submit(() -> {
+ try {
+ while (true) {
+ Thread.sleep(1000);
+ if (currentSize > 0)
+ Log.info("Tx commit status: size=" + currentSize / 1024 / 1024 + "MB");
+ }
+ } catch (InterruptedException ignored) {
+ }
+ });
+ _counter.set(cachedStore.getLastTxId());
+ _lastCommittedTx.set(cachedStore.getLastTxId());
+ _ready = true;
+ }
+
+ void shutdown(@Observes @Priority(890) ShutdownEvent event) throws InterruptedException {
+ Log.info("Waiting for all transactions to drain");
+
+ synchronized (_flushWaitSynchronizer) {
+ _ready = false;
+ while (currentSize > 0) {
+ _flushWaitSynchronizer.wait();
+ }
+ }
+
+ _writebackExecutor.shutdownNow();
+ Log.info("Total tx bundle wait time: " + _waitedTotal.get() + "ms");
+ }
+
+ private void verifyReady() {
+ if (!_ready) throw new IllegalStateException("Not doing transactions while shutting down!");
+ }
+
+ private void writeback() {
+ while (!Thread.interrupted()) {
+ try {
+ TxBundle bundle = new TxBundle(0);
+ synchronized (_pendingBundles) {
+ while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
+ _pendingBundles.wait();
+
+ long diff = 0;
+ while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
+ var toCompress = _pendingBundles.poll();
+ diff -= toCompress.calculateTotalSize();
+ bundle.compress(toCompress);
+ }
+ diff += bundle.calculateTotalSize();
+ synchronized (_flushWaitSynchronizer) {
+ currentSize += diff;
+ }
+ }
+
+ var toWrite = new ArrayList>();
+ var toDelete = new ArrayList