Server: a bunch of fixes

This commit is contained in:
2025-03-22 21:31:20 +01:00
parent dc0e73b1aa
commit 8a07f37566
15 changed files with 166 additions and 47 deletions

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.stores;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.iterators.*;
import com.usatiuk.dhfs.objects.snapshot.Snapshot;
import com.usatiuk.dhfs.objects.transaction.LockManager;
@@ -145,7 +146,7 @@ public class CachingObjectPersistentStore {
long cacheVersion;
try {
Log.tracev("Getting cache snapshot");
// Log.tracev("Getting cache snapshot");
// Decrease the lock time as much as possible
_lock.readLock().lock();
try {
@@ -170,9 +171,9 @@ public class CachingObjectPersistentStore {
_lock.writeLock().lock();
try {
if (_snapshotCacheVersion != _cacheVersion) {
Log.tracev("Not caching: {0}", key);
// Log.tracev("Not caching: {0}", key);
} else {
Log.tracev("Caching: {0}", key);
// Log.tracev("Caching: {0}", key);
put(key, obj);
}
} finally {
@@ -244,7 +245,7 @@ public class CachingObjectPersistentStore {
-> new MappingKvIterator<>(
new NavigableMapKvIterator<>(_curSortedCache, mS, mK),
e -> {
Log.tracev("Taken from cache: {0}", e);
// Log.tracev("Taken from cache: {0}", e);
return e.object();
}
),

View File

@@ -69,6 +69,8 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
var theirsFile = receivedData.file();
var oursChunks = fileHelper.getChunks(oursCurFile);
File first;
File second;
List<Pair<Long, JObjectKey>> firstChunks;
@@ -77,18 +79,21 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
if (oursCurFile.mTime() >= theirsFile.mTime()) {
first = oursCurFile;
firstChunks = fileHelper.getChunks(oursCurFile);
firstChunks = oursChunks;
second = theirsFile;
secondChunks = receivedData.chunks();
otherHostname = from;
} else {
second = oursCurFile;
secondChunks = fileHelper.getChunks(oursCurFile);
secondChunks = oursChunks;
first = theirsFile;
firstChunks = receivedData.chunks();
otherHostname = persistentPeerDataService.getSelfUuid();
}
Log.tracev("Conflict resolution: ours: {0}, theirs: {1}, chunks: {2}, {3}", oursCurFile, theirsFile, oursChunks, receivedData.chunks());
Log.tracev("Conflict resolution: first: {0}, second: {1}, chunks: {2}, {3}", first, second, firstChunks, secondChunks);
HashPMap<PeerId, Long> newChangelog = HashTreePMap.from(oursCurMeta.changelog());
for (var entry : receivedChangelog.entrySet()) {
@@ -97,25 +102,27 @@ public class FileSyncHandler implements ObjSyncHandler<File, FileDto> {
);
}
oursCurMeta = oursCurMeta.withChangelog(newChangelog);
curTx.put(oursCurMeta);
boolean chunksDiff = !Objects.equals(firstChunks, secondChunks);
boolean wasChanged = first.mTime() != second.mTime()
|| first.cTime() != second.cTime()
|| first.mode() != second.mode()
|| first.symlink() != second.symlink()
|| chunksDiff;
if (wasChanged) {
oursCurMeta = oursCurMeta.withChangelog(
newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.get(persistentPeerDataService.getSelfUuid()) + 1)
newChangelog.plus(persistentPeerDataService.getSelfUuid(), newChangelog.getOrDefault(persistentPeerDataService.getSelfUuid(), 0L) + 1)
);
curTx.put(oursCurMeta);
remoteTx.putDataRaw(oursCurFile.withCTime(first.cTime()).withMTime(first.mTime()).withMode(first.mode()).withSymlink(first.symlink()));
fileHelper.replaceChunks(oursCurFile, firstChunks);
var newFile = new File(
JObjectKey.random(), second.mode(), second.cTime(), second.mTime(), second.symlink()
);
var newFile = new File(JObjectKey.random(), second.mode(), second.cTime(), second.mTime(), second.symlink());
remoteTx.putData(newFile);
fileHelper.replaceChunks(newFile, secondChunks);

View File

@@ -5,6 +5,7 @@ import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
@@ -13,7 +14,6 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFil
import com.usatiuk.dhfs.objects.jmap.JMapEntry;
import com.usatiuk.dhfs.objects.jmap.JMapHelper;
import com.usatiuk.dhfs.objects.jmap.JMapLongKey;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.transaction.LockingStrategy;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.dhfs.objects.transaction.TransactionManager;

View File

@@ -9,6 +9,7 @@ import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.lang3.mutable.MutableObject;
import org.pcollections.HashTreePSet;
import java.util.Optional;
@@ -83,6 +84,10 @@ public class RemoteTransaction {
if (!curMeta.knownType().isAssignableFrom(obj.getClass()))
throw new IllegalStateException("Object type mismatch: " + curMeta.knownType() + " vs " + obj.getClass());
var newMeta = curMeta;
newMeta = newMeta.withConfirmedDeletes(HashTreePSet.empty());
curTx.put(newMeta);
var newData = curTx.get(RemoteObjectDataWrapper.class, RemoteObjectMeta.ofDataKey(obj.key()))
.map(w -> w.withData(obj)).orElse(new RemoteObjectDataWrapper<>(obj));
curTx.put(newData);
@@ -107,6 +112,8 @@ public class RemoteTransaction {
if (!curMeta.knownType().equals(obj.getClass()))
newMeta = newMeta.withKnownType(obj.getClass());
newMeta = newMeta.withConfirmedDeletes(HashTreePSet.empty());
newMeta = newMeta.withChangelog(newMeta.changelog().plus(persistentPeerDataService.getSelfUuid(),
newMeta.changelog().get(persistentPeerDataService.getSelfUuid()) + 1));
curTx.put(newMeta);

View File

@@ -0,0 +1,20 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.InitialSyncProcessor;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@ApplicationScoped
public class JKleppmannTreeInitialSyncProcessor implements InitialSyncProcessor<JKleppmannTreePersistentData> {
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
@Override
public void prepareForInitialSync(PeerId from, JObjectKey key) {
var tree = jKleppmannTreeManager.getTree(key);
tree.recordBootstrap(from);
}
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
public interface InitialSyncProcessor<T extends JData> {
void prepareForInitialSync(PeerId from, JObjectKey key);
}

View File

@@ -104,10 +104,12 @@ public class PeerManager {
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
boolean wasReachable = isReachable(host);
boolean shouldSync = persistentPeerDataService.markInitialSyncDone(host.id());
boolean shouldSync = !persistentPeerDataService.isInitialSyncDone(host.id());
if (shouldSync)
if (shouldSync) {
syncHandler.doInitialSync(host.id());
persistentPeerDataService.markInitialSyncDone(host.id());
}
_states.put(host.id(), address);

View File

@@ -146,4 +146,11 @@ public class PersistentPeerDataService {
});
}
public boolean isInitialSyncDone(PeerId peerId) {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
return data.initialSyncDone().contains(peerId);
});
}
}

View File

@@ -67,7 +67,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
if (refMeta != null && !refMeta.seen())
curTx.put(refMeta.withSeen(true));
}
return Pair.of(meta, dtoMapperService.toDto(obj, obj.dtoClass()));
return Pair.of(meta, obj == null ? null : dtoMapperService.toDto(obj, obj.dtoClass()));
});
if ((got.getValue() != null) && (got.getKey() == null)) {

View File

@@ -1,9 +1,6 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JDataRemote;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
@@ -12,14 +9,12 @@ import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import org.pcollections.HashTreePSet;
import org.pcollections.PMap;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Stream;
@ApplicationScoped
@@ -39,12 +34,14 @@ public class SyncHandler {
private final Map<Class<? extends JDataRemote>, ObjSyncHandler> _objToSyncHandler;
private final Map<Class<? extends JDataRemoteDto>, ObjSyncHandler> _dtoToSyncHandler;
private final Map<Class<? extends JData>, InitialSyncProcessor> _initialSyncProcessors;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers) {
public SyncHandler(Instance<ObjSyncHandler<?, ?>> syncHandlers, Instance<InitialSyncProcessor<?>> initialSyncProcessors) {
HashMap<Class<? extends JDataRemote>, ObjSyncHandler> objToHandlerMap = new HashMap<>();
HashMap<Class<? extends JDataRemoteDto>, ObjSyncHandler> dtoToHandlerMap = new HashMap<>();
HashMap<Class<? extends JData>, InitialSyncProcessor> initialSyncProcessorHashMap = new HashMap<>();
for (var syncHandler : syncHandlers.handles()) {
for (var type : Arrays.stream(syncHandler.getBean().getBeanClass().getGenericInterfaces()).flatMap(
@@ -65,15 +62,37 @@ public class SyncHandler {
_objToSyncHandler = Map.copyOf(objToHandlerMap);
_dtoToSyncHandler = Map.copyOf(dtoToHandlerMap);
for (var initialSyncProcessor : initialSyncProcessors.handles()) {
for (var type : Arrays.stream(initialSyncProcessor.getBean().getBeanClass().getGenericInterfaces()).flatMap(
t -> {
if (!(t instanceof ParameterizedType pm)) return Stream.empty();
if (pm.getRawType().equals(InitialSyncProcessor.class)) return Stream.of(pm);
return Stream.empty();
}
).toList()) {
var orig = type.getActualTypeArguments()[0];
assert JData.class.isAssignableFrom((Class<?>) orig);
initialSyncProcessorHashMap.put((Class<? extends JData>) orig, initialSyncProcessor.get());
}
}
_initialSyncProcessors = Map.copyOf(initialSyncProcessorHashMap);
}
public <D extends JDataRemoteDto> void handleRemoteUpdate(PeerId from, JObjectKey key,
PMap<PeerId, Long> receivedChangelog,
@Nullable D receivedData) {
var current = remoteTx.getMeta(key).orElse(null);
if (current != null) {
current = current.withConfirmedDeletes(HashTreePSet.empty());
curTx.put(current);
}
if (receivedData == null) {
var current = remoteTx.getMeta(key);
if (current.isPresent()) {
var cmp = SyncHelper.compareChangelogs(current.get().changelog(), receivedChangelog);
if (current != null) {
var cmp = SyncHelper.compareChangelogs(current.changelog(), receivedChangelog);
if (cmp.equals(SyncHelper.ChangelogCmpResult.CONFLICT)) {
var got = remoteObjectServiceClient.getSpecificObject(key, from);
handleRemoteUpdate(from, key, got.getRight().changelog(), got.getRight().data());
@@ -92,15 +111,26 @@ public class SyncHandler {
}
public void doInitialSync(PeerId peer) {
ArrayList<JObjectKey> objs = new ArrayList<>();
txm.run(() -> {
Log.tracev("Will do initial sync for {0}", peer);
try (var it = curTx.getIterator(IteratorStart.GE, JObjectKey.first())) {
while (it.hasNext()) {
var key = it.peekNextKey();
invalidationQueueService.pushInvalidationToOne(peer, key, true);
objs.add(key);
// TODO: Nested transactions
it.skip();
}
}
});
for (var obj : objs) {
txm.run(() -> {
var proc = curTx.get(JData.class, obj).flatMap(o -> Optional.ofNullable(_initialSyncProcessors.get(o.getClass()))).orElse(null);
if (proc != null) {
proc.prepareForInitialSync(peer, obj);
}
});
}
}
}

View File

@@ -5,5 +5,5 @@ import com.usatiuk.dhfs.objects.PeerId;
import java.io.Serializable;
public record InvalidationQueueEntry(PeerId peer, JObjectKey key, boolean forced) implements Serializable {
public record InvalidationQueueEntry(PeerId peer, JObjectKey key) implements Serializable {
}

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.peersync.PeerInfoService;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
@@ -35,6 +36,9 @@ public class InvalidationQueueService {
OpPusher opPusher;
@ConfigProperty(name = "dhfs.objects.invalidation.threads")
int threads;
@Inject
PersistentPeerDataService persistentPeerDataService;
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -87,9 +91,9 @@ public class InvalidationQueueService {
var hostInfo = remoteHostManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(new InvalidationQueueEntry(h, o, false));
_queue.add(new InvalidationQueueEntry(h, o));
for (var u : hostInfo.unavailable())
deferredInvalidationQueueService.defer(new InvalidationQueueEntry(u, o, false));
deferredInvalidationQueueService.defer(new InvalidationQueueEntry(u, o));
}
}
}
@@ -108,6 +112,11 @@ public class InvalidationQueueService {
continue;
}
if (!persistentPeerDataService.isInitialSyncDone(e.peer())) {
pushInvalidationToOne(e);
continue;
}
try {
opPusher.doPush(e);
success++;
@@ -156,13 +165,9 @@ public class InvalidationQueueService {
deferredInvalidationQueueService.defer(entry);
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj, boolean forced) {
var entry = new InvalidationQueueEntry(host, obj, forced);
pushInvalidationToOne(entry);
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
pushInvalidationToOne(host, obj, false);
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOne(entry);
}
void pushDeferredInvalidations(InvalidationQueueEntry entry) {

View File

@@ -52,8 +52,6 @@ public class OpPusher {
}
case JKleppmannTreePersistentData pd -> {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (entry.forced())
tree.recordBootstrap(entry.peer());
if (!tree.hasPendingOpsForHost(entry.peer()))
return null;

View File

@@ -84,7 +84,11 @@ public class PeerInfoService {
return curTx.get(JKleppmannTreeNode.class, gotKey).map(
node -> node.children().keySet().stream()
.map(JObjectKey::of).map(this::getPeerInfoImpl)
.map(Optional::get).filter(
.filter(o -> {
if (o.isEmpty())
Log.warnv("Could not get peer info for peer {0}", o);
return o.isPresent();
}).map(Optional::get).filter(
peerInfo -> !peerInfo.id().equals(persistentPeerDataService.getSelfUuid())).toList())
.orElseThrow();
});

View File

@@ -270,12 +270,29 @@ public class DhfsFusex3IT {
waitingConsumer3.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
Log.warn("Connected");
// TODO: There's some issue with cache, so avoid file reads
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing consistency 1");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
Log.info(ls1);
Log.info(ls2);
Log.info(ls3);
return (ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && ls3.getExitCode() == 0)
&& (ls1.getStdout().equals(ls2.getStdout()) && ls2.getStdout().equals(ls3.getStdout()));
});
await().atMost(45, TimeUnit.SECONDS).until(() -> {
Log.info("Listing");
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
if (!(cat.getExitCode() == 0 && ls.getExitCode() == 0))
return false;
if (!(cat.getStdout().contains("test1") && cat.getStdout().contains("test2") && cat.getStdout().contains("test3")))
return false;
}
@@ -283,12 +300,24 @@ public class DhfsFusex3IT {
});
await().atMost(45, TimeUnit.SECONDS).until(() -> {
return container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout()) &&
container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout()) &&
container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout().equals(
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Log.info("Listing consistency");
var ls1 = container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat1 = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls2 = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat2 = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
var ls3 = container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat3 = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls1);
Log.info(cat1);
Log.info(ls2);
Log.info(cat2);
Log.info(ls3);
Log.info(cat3);
return (ls1.getExitCode() == 0 && ls2.getExitCode() == 0 && ls3.getExitCode() == 0)
&& (cat1.getExitCode() == 0 && cat2.getExitCode() == 0 && cat3.getExitCode() == 0)
&& (cat1.getStdout().equals(cat2.getStdout()) && cat2.getStdout().equals(cat3.getStdout()))
&& (ls1.getStdout().equals(ls2.getStdout()) && ls2.getStdout().equals(ls3.getStdout()));
});
}