somewhat... working... initial sync

This commit is contained in:
2025-02-21 17:25:42 +01:00
parent 12d7f3a427
commit 891b15a75a
14 changed files with 148 additions and 238 deletions

View File

@@ -90,8 +90,9 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
private void undoOp(LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT> op) {
for (var e : op.effects().reversed())
undoEffect(e);
if (op.effects() != null)
for (var e : op.effects().reversed())
undoEffect(e);
}
private void redoOp(Map.Entry<CombinedTimestamp<TimestampT, PeerIdT>, LogRecord<TimestampT, PeerIdT, MetaT, NodeIdT>> entry) {
@@ -343,11 +344,16 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
var conflictNodeId = newParent.children().get(op.newMeta().getName());
if (conflictNodeId != null) {
var conflictNode = _storage.getById(conflictNodeId);
MetaT conflictNodeMeta = conflictNode.meta();
if (Objects.equals(conflictNodeMeta, op.newMeta())) {
return new LogRecord<>(op, null);
}
if (failCreatingIfExists)
throw new AlreadyExistsException("Already exists: " + op.newMeta().getName() + ": " + conflictNodeId);
var conflictNode = _storage.getById(conflictNodeId);
MetaT conflictNodeMeta = conflictNode.meta();
String newConflictNodeName = conflictNodeMeta.getName() + ".conflict." + conflictNode.key();
String newOursName = op.newMeta().getName() + ".conflict." + op.childId();
return new LogRecord<>(op, List.of(
@@ -374,6 +380,11 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
if (replaceNodeId != null) {
var replaceNode = _storage.getById(replaceNodeId);
var replaceNodeMeta = replaceNode.meta();
if (Objects.equals(replaceNodeMeta, op.newMeta())) {
return new LogRecord<>(op, null);
}
return new LogRecord<>(op, List.of(
new LogEffect<>(new LogEffectOld<>(replaceNode.lastEffectiveOp(), newParentId, replaceNodeMeta), replaceNode.lastEffectiveOp(), _storage.getTrashId(), (MetaT) replaceNodeMeta.withName(replaceNodeId.toString()), replaceNodeId),
new LogEffect<>(new LogEffectOld<>(node.lastEffectiveOp(), oldParentId, oldMeta), op, op.newParentId(), op.newMeta(), op.childId())

View File

@@ -5,6 +5,9 @@ import com.usatiuk.dhfs.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@ApplicationScoped
@@ -37,6 +40,12 @@ public class CurrentTransaction implements Transaction {
transactionManager.current().delete(key);
}
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
return transactionManager.current().findAllObjects();
}
@Override
public <T extends JData> void put(JData obj) {
transactionManager.current().put(obj);

View File

@@ -1,7 +1,6 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.utils.VoidFn;
import java.util.Collection;
import java.util.Optional;
public interface TxWriteback {
@@ -14,6 +13,7 @@ public interface TxWriteback {
void fence(long bundleId);
Optional<PendingWriteEntry> getPendingWrite(JObjectKey key);
Collection<PendingWriteEntry> getPendingWrites();
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread

View File

@@ -2,7 +2,6 @@ package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
@@ -277,6 +276,13 @@ public class TxWritebackImpl implements TxWriteback {
}
}
@Override
public Collection<PendingWriteEntry> getPendingWrites() {
synchronized (_pendingBundles) {
return Collections.unmodifiableCollection(_pendingWrites.values());
}
}
@Override
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();

View File

@@ -8,6 +8,7 @@ import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.function.Consumer;
@@ -19,8 +20,17 @@ public class WritebackObjectPersistentStore {
TxWriteback txWriteback;
@Nonnull
Collection<JObjectKey> findAllObjects() {
return delegate.findAllObjects();
public Collection<JObjectKey> findAllObjects() {
var pending = txWriteback.getPendingWrites();
var found = new HashSet<>(delegate.findAllObjects());
for (var p : pending) {
switch (p) {
case TxWriteback.PendingWrite write -> found.add(write.data().data().key());
case TxWriteback.PendingDelete deleted -> found.remove(deleted.key());
default -> throw new IllegalStateException("Unexpected value: " + p);
}
}
return found;
}
@Nonnull

View File

@@ -3,6 +3,8 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
// The transaction interface actually used by user code to retrieve objects
@@ -17,6 +19,9 @@ public interface Transaction extends TransactionHandle {
void delete(JObjectKey key);
@Nonnull
Collection<JObjectKey> findAllObjects(); // FIXME: This is crap
default <T extends JData> Optional<T> get(Class<T> type, JObjectKey key) {
return get(type, key, LockingStrategy.OPTIMISTIC);
}

View File

@@ -3,12 +3,18 @@ package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.WritebackObjectPersistentStore;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import javax.annotation.Nonnull;
import java.util.*;
@ApplicationScoped
public class TransactionFactoryImpl implements TransactionFactory {
@Inject
WritebackObjectPersistentStore store; // FIXME:
@Override
public TransactionPrivate createTransaction(long id, TransactionObjectSource source) {
return new TransactionImpl(id, source);
@@ -98,6 +104,12 @@ public class TransactionFactoryImpl implements TransactionFactory {
_newWrites.put(key, new TxRecord.TxObjectRecordDeleted(key));
}
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
return store.findAllObjects();
}
@Override
public void put(JData obj) {
// get(JData.class, obj.getKey(), LockingStrategy.OPTIMISTIC);

View File

@@ -115,12 +115,12 @@ public class JKleppmannTreeManager {
throw new IllegalArgumentException("Committed op push was not the oldest");
_data = _data.withQueues(_data.queues().plus(host, _data.queues().get(host).minus(_data.queues().get(host).firstKey())));
curTx.put(_data);
}
// @Override
// public void pushBootstrap(PeerId host) {
// _tree.recordBoostrapFor(host);
// }
public void recordBootstrap(PeerId host) {
_tree.recordBoostrapFor(host);
}
public Pair<String, JObjectKey> findParent(Function<TreeNode<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey>, Boolean> predicate) {
return _tree.findParent(predicate);
@@ -207,42 +207,12 @@ public class JKleppmannTreeManager {
for (var p : peerInfoService.getPeersNoSelf()) {
recordOpForPeer(p.id(), op);
}
// _persistentData.get().assertRwLock();
// _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// var hostUuds = persistentPeerDataService.getHostUuids().stream().toList();
// _persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
// @Override
// public boolean mutate(JKleppmannTreePersistentData object) {
// object.recordOp(hostUuds, op);
// return true;
// }
//
// @Override
// public void revert(JKleppmannTreePersistentData object) {
// object.removeOp(hostUuds, op);
// }
// });
// opSender.push(JKleppmannTree.this);
}
@Override
public void recordOpForPeer(PeerId peer, OpMove<Long, PeerId, JKleppmannTreeNodeMeta, JObjectKey> op) {
_data = _data.withQueues(_data.queues().plus(peer, _data.queues().getOrDefault(peer, TreePMap.empty()).plus(op.timestamp(), op)));
// _persistentData.get().assertRwLock();
// _persistentData.get().tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
// _persistentData.get().mutate(new JMutator<JKleppmannTreePersistentData>() {
// @Override
// public boolean mutate(JKleppmannTreePersistentData object) {
// object.recordOp(peer, op);
// return true;
// }
//
// @Override
// public void revert(JKleppmannTreePersistentData object) {
// object.removeOp(peer, op);
// }
// });
// opSender.push(JKleppmannTree.this);
curTx.put(_data);
}
}

View File

@@ -103,6 +103,11 @@ public class PeerManager {
private void handleConnectionSuccess(PeerInfo host, PeerAddress address) {
boolean wasReachable = isReachable(host);
boolean shouldSync = persistentPeerDataService.markInitialSyncDone(host.id());
if (shouldSync)
syncHandler.doInitialSync(host.id());
_states.put(host.id(), address);
if (wasReachable) return;

View File

@@ -13,6 +13,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.pcollections.HashTreePSet;
import java.io.IOException;
import java.security.KeyPair;
@@ -63,7 +64,7 @@ public class PersistentPeerDataService {
_selfKeyPair = CertificateTools.generateKeyPair();
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
curTx.put(new PersistentRemoteHostsData(_selfUuid, _selfCertificate, _selfKeyPair));
curTx.put(new PersistentRemoteHostsData(_selfUuid, _selfCertificate, _selfKeyPair, HashTreePSet.empty()));
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
} catch (CertificateEncodingException e) {
throw new RuntimeException(e);
@@ -116,5 +117,28 @@ public class PersistentPeerDataService {
return _selfCertificate;
}
// Returns true if host's initial sync wasn't done before, and marks it as done
public boolean markInitialSyncDone(PeerId peerId) {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
boolean exists = data.initialSyncDone().contains(peerId);
if (exists) return false;
curTx.put(data.withInitialSyncDone(data.initialSyncDone().plus(peerId)));
return true;
});
}
// Returns true if it was marked as done before, and resets it
public boolean resetInitialSyncDone(PeerId peerId) {
return txm.run(() -> {
var data = curTx.get(PersistentRemoteHostsData.class, PersistentRemoteHostsData.KEY).orElse(null);
if (data == null) throw new IllegalStateException("Self data not found");
boolean exists = data.initialSyncDone().contains(peerId);
if (!exists) return false;
curTx.put(data.withInitialSyncDone(data.initialSyncDone().minus(peerId)));
return true;
});
}
}

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
import org.pcollections.PSet;
import java.io.Serializable;
import java.security.KeyPair;
@@ -10,11 +11,17 @@ import java.security.cert.X509Certificate;
public record PersistentRemoteHostsData(PeerId selfUuid,
X509Certificate selfCertificate,
KeyPair selfKeyPair) implements JData, Serializable {
KeyPair selfKeyPair,
PSet<PeerId> initialSyncDone) implements JData, Serializable {
public static final JObjectKey KEY = JObjectKey.of("self_peer_data");
@Override
public JObjectKey key() {
return KEY;
}
public PersistentRemoteHostsData withInitialSyncDone(PSet<PeerId> initialSyncDone) {
return new PersistentRemoteHostsData(selfUuid, selfCertificate, selfKeyPair, initialSyncDone);
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.objects.*;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
@@ -10,191 +11,17 @@ import org.pcollections.PMap;
import javax.annotation.Nullable;
//
//import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
//import com.usatiuk.dhfs.objects.jrepository.JObject;
//import com.usatiuk.dhfs.objects.jrepository.JObjectData;
//import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
//import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
//import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
//import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
//import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
//import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
//import io.grpc.Status;
//import io.quarkus.logging.Log;
//import jakarta.enterprise.context.ApplicationScoped;
//import jakarta.enterprise.inject.Instance;
//import jakarta.inject.Inject;
//
//import java.util.HashMap;
//import java.util.Objects;
//import java.util.Optional;
//import java.util.UUID;
//import java.util.concurrent.atomic.AtomicReference;
//import java.util.stream.Collectors;
//import java.util.stream.Stream;
//
@ApplicationScoped
public class SyncHandler {
@Inject
Transaction curTx;
@Inject
PersistentPeerDataService persistentPeerDataService;
// @Inject
// JObjectManager jObjectManager;
// @Inject
// PeerManager peerManager;
// @Inject
// RemoteObjectServiceClient remoteObjectServiceClient;
// @Inject
// InvalidationQueueService invalidationQueueService;
// @Inject
// Instance<ConflictResolver> conflictResolvers;
// @Inject
// PersistentPeerDataService persistentPeerDataService;
// @Inject
// ProtoSerializer<JObjectDataP, JObjectData> dataProtoSerializer;
// @Inject
// OpObjectRegistry opObjectRegistry;
// @Inject
// JObjectTxManager jObjectTxManager;
//
// public void pushInitialResyncObj(UUID host) {
// Log.info("Doing initial object push for " + host);
//
// var objs = jObjectManager.findAll();
//
// for (var obj : objs) {
// Log.trace("IS: " + obj + " to " + host);
// invalidationQueueService.pushInvalidationToOne(host, obj);
// }
// }
//
// public void pushInitialResyncOp(UUID host) {
// Log.info("Doing initial op push for " + host);
//
// jObjectTxManager.executeTxAndFlush(
// () -> {
// opObjectRegistry.pushBootstrapData(host);
// }
// );
// }
//
@Inject
TransactionManager txm;
@Inject
InvalidationQueueService invalidationQueueService;
// public <T extends JDataRemote> RemoteObjectMeta<T> handleOneUpdate(PeerId from, RemoteObjectMeta<T> current, PMap<PeerId, Long> rcvChangelog) {
//// if (!rcv.key().equals(current.key())) {
//// Log.error("Received update for different object: " + rcv.key() + " from " + from);
//// throw new IllegalArgumentException("Received update for different object: " + rcv.key() + " from " + from);
//// }
//
// var receivedTotalVer = rcvChangelog.values().stream().mapToLong(Long::longValue).sum();
//
// if (current.meta().knownRemoteVersions().getOrDefault(from, 0L) > receivedTotalVer) {
// Log.error("Received older index update than was known for host: " + from + " " + current.key());
// throw new IllegalStateException(); // FIXME: OutdatedUpdateException
// }
//
// Log.trace("Handling update: " + current.key() + " from " + from + "\n" + "ours: " + current + " \n" + "received: " + rcvChangelog);
//
// boolean conflict = false;
// boolean updatedRemoteVersion = false;
//
// var newObj = current;
// var curKnownRemoteVersion = current.meta().knownRemoteVersions().get(from);
//
// if (curKnownRemoteVersion == null || !curKnownRemoteVersion.equals(receivedTotalVer))
// updatedRemoteVersion = true;
//
// if (updatedRemoteVersion)
// newObj = current.withMeta(current.meta().withKnownRemoteVersions(
// current.meta().knownRemoteVersions().plus(from, receivedTotalVer)
// ));
//
//
// boolean hasLower = false;
// boolean hasHigher = false;
// for (var e : Stream.concat(current.meta().changelog().keySet().stream(), rcvChangelog.keySet().stream()).collect(Collectors.toUnmodifiableSet())) {
// if (rcvChangelog.getOrDefault(e, 0L) < current.meta().changelog().getOrDefault(e, 0L))
// hasLower = true;
// if (rcvChangelog.getOrDefault(e, 0L) > current.meta().changelog().getOrDefault(e, 0L))
// hasHigher = true;
// }
//
// if (hasLower && hasHigher) {
// Log.info("Conflict on update (inconsistent version): " + current.key() + " from " + from);
//// Log.
////
//// info("Trying conflict resolution: " + header.getName() + " from " + from);
//// var found = foundExt.get();
////
//// JObjectData theirsData;
//// ObjectHeader theirsHeader;
//// if (header. hasPushedData()) {
//// theirsHeader = header;
//// theirsData = dataProtoSerializer.
////
//// deserialize(header.getPushedData());
//// } else {
//// var got = remoteObjectServiceClient.getSpecificObject(from, header.getName());
//// theirsData = dataProtoSerializer.
////
//// deserialize(got.getRight());
//// theirsHeader = got.
////
//// getLeft();
//// }
////
//// jObjectTxManager.
////
//// executeTx(() -> {
//// var resolverClass = found.runReadLocked(JObjectManager.ResolutionStrategy.LOCAL_ONLY, (m, d) -> {
//// if (d == null)
//// throw new StatusRuntimeExceptionNoStacktrace(Status.UNAVAILABLE.withDescription("No local data when conflict " + header.getName()));
//// return d.getConflictResolver();
//// });
//// var resolver = conflictResolvers.select(resolverClass);
//// resolver.
////
//// get().
////
//// resolve(from, theirsHeader, theirsData, found);
//// });
//// Log. info("Resolved conflict for " + from + " " + header.getName());
//// throw new NotImplementedException();
// } else if (hasLower) {
// Log.info("Received older index update than known: " + from + " " + current.key());
//// throw new OutdatedUpdateException();
//// throw new NotImplementedException();
// } else if (hasHigher) {
// var newChangelog = rcvChangelog.containsKey(persistentPeerDataService.getSelfUuid()) ?
// rcvChangelog : rcvChangelog.plus(persistentPeerDataService.getSelfUuid(), 0L);
//
// newObj = newObj.withData(null).withMeta(newObj.meta().withChangelog(newChangelog));
//// if (header.hasPushedData())
//// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
// }
//// else if (data == null && header.hasPushedData()) {
//// found.tryResolve(JObjectManager.ResolutionStrategy.LOCAL_ONLY);
//// if (found.getData() == null)
//// found.externalResolution(dataProtoSerializer.deserialize(header.getPushedData()));
//// }
//
/// / assert Objects.equals(receivedTotalVer, md.getOurVersion());
//
// if (!updatedRemoteVersion)
// Log.debug("No action on update: " + current.meta().key() + " from " + from);
//
// return newObj;
// }
public <T extends JDataRemote> void handleRemoteUpdate(PeerId from, JObjectKey key, PMap<PeerId, Long> receivedChangelog, @Nullable JDataRemote receivedData) {
var current = curTx.get(RemoteObjectMeta.class, key).orElse(null);
if (current == null) {
@@ -250,6 +77,8 @@ public class SyncHandler {
}
public void doInitialSync(PeerId peer) {
//TODO:
txm.run(() -> {
for (var cur : curTx.findAllObjects()) invalidationQueueService.pushInvalidationToOne(peer, cur, true);
});
}
}

View File

@@ -3,5 +3,7 @@ package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.PeerId;
public record InvalidationQueueEntry(PeerId peer, JObjectKey key, boolean forced) {
import java.io.Serializable;
public record InvalidationQueueEntry(PeerId peer, JObjectKey key, boolean forced) implements Serializable {
}

View File

@@ -4,7 +4,7 @@ import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.RemoteObjectMeta;
import com.usatiuk.dhfs.objects.RemoteTransaction;
import com.usatiuk.dhfs.objects.TransactionManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreePersistentData;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.transaction.Transaction;
@@ -25,25 +25,30 @@ public class OpPusher {
RemoteObjectServiceClient remoteObjectServiceClient;
@Inject
InvalidationQueueService invalidationQueueService;
@Inject
JKleppmannTreeManager jKleppmannTreeManager;
public void doPush(InvalidationQueueEntry entry) {
Op info = txm.run(() -> {
List<Op> info = txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case RemoteObjectMeta remote -> {
return new IndexUpdateOp(entry.key(), remote.changelog());
return List.of(new IndexUpdateOp(entry.key(), remote.changelog()));
}
case JKleppmannTreePersistentData pd -> {
var maybeQueue = pd.queues().get(entry.peer());
if (maybeQueue == null || maybeQueue.isEmpty()) {
var tree = jKleppmannTreeManager.getTree(pd.key());
if (entry.forced())
tree.recordBootstrap(entry.peer());
if (!tree.hasPendingOpsForHost(entry.peer()))
return null;
}
var ret = new JKleppmannTreeOpWrapper(entry.key(), pd.queues().get(entry.peer()).firstEntry().getValue());
var newPd = pd.withQueues(pd.queues().plus(entry.peer(), pd.queues().get(entry.peer()).minus(ret.op().timestamp())));
curTx.put(newPd);
if (!newPd.queues().get(entry.peer()).isEmpty())
var ops = tree.getPendingOpsForHost(entry.peer(), 1);
if (tree.hasPendingOpsForHost(entry.peer()))
invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key());
return ret;
return ops;
}
case null,
default -> {
@@ -54,6 +59,21 @@ public class OpPusher {
if (info == null) {
return;
}
remoteObjectServiceClient.pushOps(entry.peer(), List.of(info));
remoteObjectServiceClient.pushOps(entry.peer(), info);
txm.run(() -> {
var obj = curTx.get(JData.class, entry.key()).orElse(null);
switch (obj) {
case JKleppmannTreePersistentData pd: {
var tree = jKleppmannTreeManager.getTree(pd.key());
for (var op : info) {
tree.commitOpForHost(entry.peer(), op);
}
break;
}
case null:
default:
}
});
}
}