motivate the log to be trimmed periodically

This commit is contained in:
2024-08-10 20:00:06 +02:00
parent 0a90a2c462
commit e7d2126d8f
15 changed files with 197 additions and 84 deletions

View File

@@ -11,6 +11,11 @@ public class AtomicClock implements Clock<Long>, Serializable {
return _max.incrementAndGet();
}
@Override
public Long peekTimestamp() {
return _max.get();
}
@Override
public void updateTimestamp(Long receivedTimestamp) {
long exp = _max.get();

View File

@@ -3,5 +3,7 @@ package com.usatiuk.kleppmanntree;
public interface Clock<TimestampT extends Comparable<TimestampT>> {
TimestampT getTimestamp();
TimestampT peekTimestamp();
void updateTimestamp(TimestampT receivedTimestamp);
}

View File

@@ -211,64 +211,79 @@ public class KleppmannTree<TimestampT extends Comparable<TimestampT>, PeerIdT ex
}
}
// Returns true if the timestamp is newer than what's seen, false otherwise
private boolean updateTimestampImpl(PeerIdT from, TimestampT newTimestamp) {
assertRwLock();
var ref = _storage.getPeerTimestampLog().computeIfAbsent(from, f -> new AtomicReference<>());
TimestampT oldRef;
TimestampT newRef;
do {
oldRef = ref.get();
if (oldRef != null && oldRef.compareTo(newTimestamp) > 0) { // FIXME?
LOGGER.warning("Wrong op order: received older than known from " + from.toString());
return false;
}
newRef = newTimestamp;
} while (!ref.compareAndSet(oldRef, newRef));
return true;
}
public void updateExternalTimestamp(PeerIdT from, TimestampT timestamp) {
_lock.writeLock().lock();
try {
updateTimestampImpl(_peers.getSelfId(), _clock.peekTimestamp()); // FIXME:? Kind of a hack?
updateTimestampImpl(from, timestamp);
tryTrimLog();
} finally {
_lock.writeLock().unlock();
}
}
private void applyOp(PeerIdT from, OpMove<TimestampT, PeerIdT, MetaT, NodeIdT> op, boolean failCreatingIfExists) {
assertRwLock();
synchronized (this) {
var ref = _storage.getPeerTimestampLog().computeIfAbsent(from, f -> new AtomicReference<>());
// TODO: I guess it's not actually needed since one peer can't handle concurrent updates?
TimestampT oldRef;
TimestampT newRef;
do {
oldRef = ref.get();
if (oldRef != null && oldRef.compareTo(op.timestamp().timestamp()) > 0) { // FIXME?
LOGGER.warning("Wrong op order: received older than known from " + from.toString());
return;
if (!updateTimestampImpl(from, op.timestamp().timestamp())) return;
var log = _storage.getLog();
// FIXME: hack?
int cmp = log.isEmpty() ? 1 : op.timestamp().compareTo(log.lastEntry().getKey());
if (log.containsKey(op.timestamp())) {
tryTrimLog();
return;
}
assert cmp != 0;
if (cmp < 0) {
try {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.tailMap(op.timestamp(), false);
_undoCtx = new HashMap<>();
for (var entry : toUndo.reversed().entrySet()) {
undoOp(entry.getValue());
}
newRef = op.timestamp().timestamp();
} while (!ref.compareAndSet(oldRef, newRef));
var log = _storage.getLog();
// FIXME: hack?
int cmp = log.isEmpty() ? 1 : op.timestamp().compareTo(log.lastEntry().getKey());
if (log.containsKey(op.timestamp())) {
tryTrimLog();
return;
}
assert cmp != 0;
if (cmp < 0) {
try {
if (log.containsKey(op.timestamp())) return;
var toUndo = log.tailMap(op.timestamp(), false);
_undoCtx = new HashMap<>();
for (var entry : toUndo.reversed().entrySet()) {
undoOp(entry.getValue());
}
try {
doAndPut(op, failCreatingIfExists);
} finally {
for (var entry : toUndo.entrySet()) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
e.getValue().unlock();
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
}
doAndPut(op, failCreatingIfExists);
} finally {
tryTrimLog();
for (var entry : toUndo.entrySet()) {
redoOp(entry);
}
if (!_undoCtx.isEmpty()) {
for (var e : _undoCtx.entrySet()) {
LOGGER.log(Level.FINE, "Dropping node " + e.getKey());
e.getValue().unlock();
_storage.removeNode(e.getKey());
}
}
_undoCtx = null;
}
} else {
doAndPut(op, failCreatingIfExists);
} finally {
tryTrimLog();
}
} else {
doAndPut(op, failCreatingIfExists);
tryTrimLog();
}
}

View File

@@ -8,6 +8,11 @@ public class TestClock implements Clock<Long> {
return max++;
}
@Override
public Long peekTimestamp() {
return max;
}
@Override
public void updateTimestamp(Long receivedTimestamp) {
max = Math.max(max, receivedTimestamp) + 1;

View File

@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import com.usatiuk.dhfs.objects.repository.opsupport.OpObject;
import com.usatiuk.dhfs.objects.repository.opsupport.OpSender;
import com.usatiuk.kleppmanntree.KleppmannTree;
@@ -18,7 +19,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
public class JKleppmannTree implements OpObject<JKleppmannTreeOpWrapper> {
public class JKleppmannTree implements OpObject {
private final KleppmannTree<Long, UUID, JKleppmannTreeNodeMeta, String, JKleppmannTreeNodeWrapper> _tree;
private final JKleppmannTreePersistentData _persistentData;
@@ -77,7 +78,7 @@ public class JKleppmannTree implements OpObject<JKleppmannTreeOpWrapper> {
}
@Override
public JKleppmannTreeOpWrapper getPendingOpForHost(UUID host) {
public Op getPendingOpForHost(UUID host) {
if (_persistentData.getQueues().containsKey(host)) {
var peeked = _persistentData.getQueues().get(host).firstEntry();
return peeked != null ? new JKleppmannTreeOpWrapper(_persistentData.getQueues().get(host).firstEntry().getValue()) : null;
@@ -91,9 +92,12 @@ public class JKleppmannTree implements OpObject<JKleppmannTreeOpWrapper> {
}
@Override
public void commitOpForHost(UUID host, JKleppmannTreeOpWrapper op) {
public void commitOpForHost(UUID host, Op op) {
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
var got = _persistentData.getQueues().get(host).pollFirstEntry().getValue();
if (op.getOp() != got) {
if (jop.getOp() != got) {
throw new IllegalArgumentException("Committed op push was not the oldest");
}
}
@@ -108,7 +112,12 @@ public class JKleppmannTree implements OpObject<JKleppmannTreeOpWrapper> {
}
@Override
public void acceptExternalOp(UUID from, JKleppmannTreeOpWrapper op) {
public void acceptExternalOp(UUID from, Op op) {
if (op instanceof JKleppmannTreePeriodicPushOp pushOp) {
_tree.updateExternalTimestamp(pushOp.getFrom(), pushOp.getTimestamp());
return;
}
if (!(op instanceof JKleppmannTreeOpWrapper jop))
throw new IllegalArgumentException("Invalid incoming op type for JKleppmannTree: " + op.getClass() + " " + getId());
@@ -122,5 +131,10 @@ public class JKleppmannTree implements OpObject<JKleppmannTreeOpWrapper> {
_tree.applyExternalOp(from, jop.getOp());
}
@Override
public Op getPeriodicPushOp() {
return new JKleppmannTreePeriodicPushOp(_persistentPeerDataService.getSelfUuid(), _persistentData.getClock().peekTimestamp());
}
}

View File

@@ -0,0 +1,20 @@
package com.usatiuk.dhfs.objects.jkleppmanntree;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import com.usatiuk.kleppmanntree.OpMove;
import lombok.Getter;
import java.util.UUID;
public class JKleppmannTreePeriodicPushOp implements Op {
@Getter
private final UUID _from;
@Getter
private final long _timestamp;
public JKleppmannTreePeriodicPushOp(UUID from, long timestamp) {
_from = from;
_timestamp = timestamp;
}
}

View File

@@ -0,0 +1,23 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.serializers;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreePeriodicPushOp;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializer;
import com.usatiuk.dhfs.objects.repository.JKleppmannTreePeriodicPushOpP;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.UUID;
@ApplicationScoped
public class JKleppmannTreePeriodicPushOpProtoSerializer implements ProtoDeserializer<JKleppmannTreePeriodicPushOpP, JKleppmannTreePeriodicPushOp>, ProtoSerializer<JKleppmannTreePeriodicPushOpP, JKleppmannTreePeriodicPushOp> {
@Override
public JKleppmannTreePeriodicPushOp deserialize(JKleppmannTreePeriodicPushOpP message) {
return new JKleppmannTreePeriodicPushOp(UUID.fromString(message.getFromUuid()), message.getTimestamp());
}
@Override
public JKleppmannTreePeriodicPushOpP serialize(JKleppmannTreePeriodicPushOp object) {
return JKleppmannTreePeriodicPushOpP.newBuilder().setTimestamp(object.getTimestamp()).setFromUuid(object.getFrom().toString()).build();
}
}

View File

@@ -3,6 +3,7 @@ package com.usatiuk.dhfs.objects.protoserializer;
import com.google.protobuf.Message;
import com.usatiuk.dhfs.objects.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
import com.usatiuk.dhfs.objects.persistence.*;
import com.usatiuk.dhfs.objects.repository.JKleppmannTreePeriodicPushOpP;
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import io.quarkus.arc.ClientProxy;
@@ -43,11 +44,11 @@ public class ProtoSerializerService {
void init() {
for (var s : _protoSerializers) {
var args = ((ParameterizedType) Arrays.stream(ClientProxy.unwrap(s).getClass().getGenericInterfaces())
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoSerializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoSerializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.getActualTypeArguments(); //FIXME:
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
Class<?> objClass = (Class<?>) args[1];
@@ -60,11 +61,11 @@ public class ProtoSerializerService {
for (var s : _protoDeserializers) {
var args = ((ParameterizedType) Arrays.stream(ClientProxy.unwrap(s).getClass().getGenericInterfaces())
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoDeserializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.filter(t -> {
if (t instanceof ParameterizedType)
return ((ParameterizedType) t).getRawType().equals(ProtoDeserializer.class);
return false;
}).findFirst().orElseThrow(() -> new IllegalArgumentException("ProtoSerializer interface not found on ProtoSerializer?")))
.getActualTypeArguments(); //FIXME:
Class<? extends Message> messageClass = (Class<? extends Message>) args[0];
Class<?> objClass = (Class<?>) args[1];
@@ -130,6 +131,8 @@ public class ProtoSerializerService {
var ser = serialize(object);
if (ser instanceof JKleppmannTreeOpP) {
return OpPushPayload.newBuilder().setJKleppmannTreeOp((JKleppmannTreeOpP) ser).build();
} else if (ser instanceof JKleppmannTreePeriodicPushOpP) {
return OpPushPayload.newBuilder().setJKleppmannTreePeriodicPushOp((JKleppmannTreePeriodicPushOpP) ser).build();
} else {
throw new IllegalArgumentException("Unexpected object type on input to serializeToTreeNodeMetaP: " + object.getClass());
}

View File

@@ -143,6 +143,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
}
@Override
@Blocking
public Uni<OpPushReply> opPush(OpPushMsg request) {
if (request.getSelfUuid().isBlank()) throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
if (!persistentPeerDataService.existsHost(UUID.fromString(request.getSelfUuid())))

View File

@@ -1,16 +1,17 @@
package com.usatiuk.dhfs.objects.repository.opsupport;
import java.util.List;
import java.util.UUID;
public interface OpObject<OpT extends Op> {
public interface OpObject {
String getId();
OpT getPendingOpForHost(UUID host);
Op getPendingOpForHost(UUID host);
void commitOpForHost(UUID host, OpT op);
void commitOpForHost(UUID host, Op op);
void pushBootstrap(UUID host);
void acceptExternalOp(UUID from, OpT op);
void acceptExternalOp(UUID from, Op op);
Op getPeriodicPushOp();
}

View File

@@ -1,8 +1,12 @@
package com.usatiuk.dhfs.objects.repository.opsupport;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -15,10 +19,12 @@ public class OpObjectRegistry {
OpSender opSender;
@Inject
PeerManager remoteHostManager;
@Inject
RemoteObjectServiceClient remoteObjectServiceClient;
private final ConcurrentHashMap<String, OpObject<?>> _objects = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, OpObject> _objects = new ConcurrentHashMap<>();
public void registerObject(OpObject<?> obj) {
public void registerObject(OpObject obj) {
_objects.put(obj.getId(), obj);
remoteHostManager.registerConnectEventListener(host -> {
opSender.push(obj);
@@ -29,11 +35,7 @@ public class OpObjectRegistry {
var got = _objects.get(objId);
if (got == null)
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("Queue with id " + objId + " not registered"));
dispatchOp(got, from, op);
}
private <OpLocalT extends Op> void dispatchOp(OpObject<OpLocalT> obj, UUID from, Op op) {
obj.acceptExternalOp(from, (OpLocalT) op);
got.acceptExternalOp(from, op);
}
public void pushBootstrapData(UUID host) {
@@ -41,4 +43,21 @@ public class OpObjectRegistry {
o.pushBootstrap(host);
}
}
@Scheduled(every = "10s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP)
@Blocking
void periodicPush() {
for (var obj : _objects.values()) {
var periodicPushOp = obj.getPeriodicPushOp();
if (periodicPushOp == null) continue;
for (var h : remoteHostManager.getAvailableHosts()) {
try {
remoteObjectServiceClient.pushOp(periodicPushOp, obj.getId(), h);
} catch (Exception e) {
Log.warn("Error pushing periodic op for " + h + " of " + obj.getId(), e);
}
}
}
}
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository.opsupport;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreeOpWrapper;
import com.usatiuk.dhfs.objects.jkleppmanntree.JKleppmannTreePeriodicPushOp;
import com.usatiuk.dhfs.objects.protoserializer.ProtoDeserializer;
import com.usatiuk.dhfs.objects.protoserializer.ProtoSerializerService;
import com.usatiuk.dhfs.objects.repository.OpPushPayload;
@@ -18,6 +19,8 @@ public class OpPushPayloadProtoDeserializer implements ProtoDeserializer<OpPushP
return switch (message.getPayloadCase()) {
case JKLEPPMANNTREEOP ->
(JKleppmannTreeOpWrapper) protoSerializerService.deserialize(message.getJKleppmannTreeOp());
case JKLEPPMANNTREEPERIODICPUSHOP ->
(JKleppmannTreePeriodicPushOp) protoSerializerService.deserialize(message.getJKleppmannTreePeriodicPushOp());
case PAYLOAD_NOT_SET -> throw new IllegalArgumentException("OpPushPayload is null");
};
}

View File

@@ -26,7 +26,7 @@ public class OpSender {
private ExecutorService _executor;
private volatile boolean _shutdown = false;
private final HashSetDelayedBlockingQueue<OpObject<?>> _queue = new HashSetDelayedBlockingQueue<>(0); // FIXME:
private final HashSetDelayedBlockingQueue<OpObject> _queue = new HashSetDelayedBlockingQueue<>(0); // FIXME:
@Startup
void init() {
@@ -61,8 +61,8 @@ public class OpSender {
}
}
private <OpLocal extends Op> void sendForHost(OpObject<OpLocal> queue, UUID host) {
OpLocal op;
void sendForHost(OpObject queue, UUID host) {
Op op;
while ((op = queue.getPendingOpForHost(host)) != null) {
try {
remoteObjectServiceClient.pushOp(op, queue.getId(), host);
@@ -74,7 +74,7 @@ public class OpSender {
}
}
public void push(OpObject<?> queue) {
public void push(OpObject queue) {
_queue.readd(queue);
}
}

View File

@@ -86,9 +86,15 @@ message IndexUpdatePush {
message IndexUpdateReply {}
message JKleppmannTreePeriodicPushOpP {
string fromUuid = 1;
int64 timestamp = 2;
}
message OpPushPayload {
oneof payload {
dhfs.objects.persistence.JKleppmannTreeOpP jKleppmannTreeOp = 1;
JKleppmannTreePeriodicPushOpP jKleppmannTreePeriodicPushOp = 2;
}
}

View File

@@ -217,10 +217,6 @@ public class DhfsFuseIT {
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Thread.sleep(500);
// Motivate the log a little
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty2 > /root/dhfs_default/fuse/testf2").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo tesempty3 > /root/dhfs_default/fuse/testf3").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);