9 Commits

9 changed files with 94 additions and 21 deletions

View File

@@ -734,13 +734,22 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Override
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
return jObjectTxManager.executeTx(() -> {
var file = remoteTx.getData(File.class, fileUuid).orElseThrow(
() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(
"File not found for setTimes: " + fileUuid))
);
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
remoteTx.putData(file.withCTime(atimeMs).withMTime(mtimeMs));
return true;
// FIXME:
if (dent instanceof JKleppmannTreeNode) {
return true;
} else if (dent instanceof RemoteObjectMeta) {
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);
if (remote instanceof File f) {
remoteTx.putData(f.withCTime(atimeMs).withMTime(mtimeMs));
return true;
} else {
throw new IllegalArgumentException(fileUuid + " is not a file");
}
} else {
throw new IllegalArgumentException(fileUuid + " is not a file");
}
});
}

View File

@@ -105,7 +105,14 @@ public class AutosyncProcessor {
try {
JObjectKey finalName = name;
boolean ok = txm.run(() -> {
var obj = remoteTx.getMeta(finalName).orElse(null);
RemoteObjectMeta obj;
try {
obj = remoteTx.getMeta(finalName).orElse(null);
} catch (ClassCastException cex) {
Log.debugv("Not downloading object {0}, not remote object", finalName);
return true;
}
if (obj == null) {
Log.debugv("Not downloading object {0}, not found", finalName);
return true;

View File

@@ -46,6 +46,8 @@ public class RemoteObjectServiceClient {
ProtoSerializer<OpP, Op> opProtoSerializer;
@Inject
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
@Inject
PeerManager peerManager;
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
@@ -63,7 +65,9 @@ public class RemoteObjectServiceClient {
}
var targetVersion = objMeta.versionSum();
var targets = objMeta.knownRemoteVersions().entrySet().stream()
var targets = objMeta.knownRemoteVersions().isEmpty()
? peerManager.getAvailableHosts()
: objMeta.knownRemoteVersions().entrySet().stream()
.filter(entry -> entry.getValue().equals(targetVersion))
.map(Map.Entry::getKey).toList();
@@ -92,10 +96,13 @@ public class RemoteObjectServiceClient {
curTx.get(RemoteObjectMeta.class, ref).map(m -> m.withSeen(true)).ifPresent(curTx::put);
}
});
var serialized = opProtoSerializer.serialize(op);
var built = OpPushRequest.newBuilder().addMsg(serialized).build();
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
}
var builder = OpPushRequest.newBuilder();
for (Op op : ops) {
builder.addMsg(opProtoSerializer.serialize(op));
}
var built = builder.build();
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
return OpPushReply.getDefaultInstance();
}

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.repository.invalidation;
import com.usatiuk.dhfs.utils.DataLocker;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.PeerId;
import com.usatiuk.dhfs.repository.PeerManager;
@@ -39,6 +40,7 @@ public class InvalidationQueueService {
@Inject
PersistentPeerDataService persistentPeerDataService;
private final DataLocker _locker = new DataLocker();
private ExecutorService _executor;
private volatile boolean _shutdown = false;
@@ -120,7 +122,11 @@ public class InvalidationQueueService {
continue;
}
try {
try (var lock = _locker.tryLock(e)) {
if (lock == null) {
pushInvalidationToOne(e);
continue;
}
opPusher.doPush(e);
success++;
} catch (Exception ex) {
@@ -168,11 +174,23 @@ public class InvalidationQueueService {
deferredInvalidationQueueService.defer(entry);
}
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
if (remoteHostManager.isReachable(entry.peer()))
_queue.addNoDelay(entry);
else
deferredInvalidationQueueService.defer(entry);
}
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOne(entry);
}
public void pushInvalidationToOneNoDelay(PeerId host, JObjectKey obj) {
var entry = new InvalidationQueueEntry(host, obj);
pushInvalidationToOneNoDelay(entry);
}
void pushDeferredInvalidations(InvalidationQueueEntry entry) {
_queue.add(entry);
}

View File

@@ -65,7 +65,7 @@ public class OpPusher {
if (tree.hasPendingOpsForHost(entry.peer())) {
doAgain.set(true);
invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key());
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
}
return ops;
}

View File

@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
dhfs.objects.peerdiscovery.broadcast=true
dhfs.objects.sync.timeout=30
dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=1
dhfs.objects.invalidation.threads=4
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.objects.write_log=false

View File

@@ -19,16 +19,17 @@ public class DataLocker {
var tag = _locks.get(data);
if (tag != null) {
synchronized (tag) {
if (!tag.released) {
while (!tag.released) {
if (tag.owner == Thread.currentThread()) {
return DUMMY_LOCK;
}
tag.wait(4000L);
if (!tag.released) {
System.out.println("Timeout waiting for lock: " + data);
System.exit(1);
throw new InterruptedException();
}
tag.wait();
// tag.wait(4000L);
// if (!tag.released) {
// System.out.println("Timeout waiting for lock: " + data);
// System.exit(1);
// throw new InterruptedException();
// }
}
continue;
}

View File

@@ -44,6 +44,24 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
// Adds the object to the queue, if it exists re-adds it
// With no delay
// Returns the old object, or null
public T addNoDelay(T el) {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
SetElement<T> old = _set.putFirst(el, new SetElement<>(el, 0));
this.notify();
if (old != null)
return old.el();
else
return null;
}
}
// Adds the object to the queue, if it exists re-adds it with a new delay
// Returns the old object, or null
public T readd(T el) {

View File

@@ -24,6 +24,19 @@ public class HashSetDelayedBlockingQueueTest {
Assertions.assertTrue((gotTime - curTime) >= 1000);
}
@Test
void addNoDelay() throws InterruptedException {
var queue = new HashSetDelayedBlockingQueue<>(1000);
var curTime = System.currentTimeMillis();
queue.addNoDelay("hello!");
var thing = queue.get();
var gotTime = System.currentTimeMillis();
Assertions.assertEquals("hello!", thing);
Assertions.assertTrue((gotTime - curTime) < 500);
}
@Test
void GetImmediate() throws InterruptedException {
var queue = new HashSetDelayedBlockingQueue<>(0);