mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
9 Commits
b998871e7f
...
ffef8959df
| Author | SHA1 | Date | |
|---|---|---|---|
| ffef8959df | |||
| cb909478dc | |||
| 06335b4b99 | |||
| 8351bec59a | |||
| 29663f575d | |||
| 0f8002dc2c | |||
| 5c50d572d0 | |||
| edebb6d8f0 | |||
| 5d620c64c5 |
@@ -734,13 +734,22 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@Override
|
||||
public Boolean setTimes(JObjectKey fileUuid, long atimeMs, long mtimeMs) {
|
||||
return jObjectTxManager.executeTx(() -> {
|
||||
var file = remoteTx.getData(File.class, fileUuid).orElseThrow(
|
||||
() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(
|
||||
"File not found for setTimes: " + fileUuid))
|
||||
);
|
||||
var dent = curTx.get(JData.class, fileUuid).orElseThrow(() -> new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND));
|
||||
|
||||
remoteTx.putData(file.withCTime(atimeMs).withMTime(mtimeMs));
|
||||
return true;
|
||||
// FIXME:
|
||||
if (dent instanceof JKleppmannTreeNode) {
|
||||
return true;
|
||||
} else if (dent instanceof RemoteObjectMeta) {
|
||||
var remote = remoteTx.getData(JDataRemote.class, fileUuid).orElse(null);
|
||||
if (remote instanceof File f) {
|
||||
remoteTx.putData(f.withCTime(atimeMs).withMTime(mtimeMs));
|
||||
return true;
|
||||
} else {
|
||||
throw new IllegalArgumentException(fileUuid + " is not a file");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(fileUuid + " is not a file");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -105,7 +105,14 @@ public class AutosyncProcessor {
|
||||
try {
|
||||
JObjectKey finalName = name;
|
||||
boolean ok = txm.run(() -> {
|
||||
var obj = remoteTx.getMeta(finalName).orElse(null);
|
||||
RemoteObjectMeta obj;
|
||||
try {
|
||||
obj = remoteTx.getMeta(finalName).orElse(null);
|
||||
} catch (ClassCastException cex) {
|
||||
Log.debugv("Not downloading object {0}, not remote object", finalName);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (obj == null) {
|
||||
Log.debugv("Not downloading object {0}, not found", finalName);
|
||||
return true;
|
||||
|
||||
@@ -46,6 +46,8 @@ public class RemoteObjectServiceClient {
|
||||
ProtoSerializer<OpP, Op> opProtoSerializer;
|
||||
@Inject
|
||||
ProtoSerializer<GetObjectReply, ReceivedObject> receivedObjectProtoSerializer;
|
||||
@Inject
|
||||
PeerManager peerManager;
|
||||
|
||||
public Pair<PeerId, ReceivedObject> getSpecificObject(JObjectKey key, PeerId peerId) {
|
||||
return rpcClientFactory.withObjSyncClient(peerId, (peer, client) -> {
|
||||
@@ -63,7 +65,9 @@ public class RemoteObjectServiceClient {
|
||||
}
|
||||
|
||||
var targetVersion = objMeta.versionSum();
|
||||
var targets = objMeta.knownRemoteVersions().entrySet().stream()
|
||||
var targets = objMeta.knownRemoteVersions().isEmpty()
|
||||
? peerManager.getAvailableHosts()
|
||||
: objMeta.knownRemoteVersions().entrySet().stream()
|
||||
.filter(entry -> entry.getValue().equals(targetVersion))
|
||||
.map(Map.Entry::getKey).toList();
|
||||
|
||||
@@ -92,10 +96,13 @@ public class RemoteObjectServiceClient {
|
||||
curTx.get(RemoteObjectMeta.class, ref).map(m -> m.withSeen(true)).ifPresent(curTx::put);
|
||||
}
|
||||
});
|
||||
var serialized = opProtoSerializer.serialize(op);
|
||||
var built = OpPushRequest.newBuilder().addMsg(serialized).build();
|
||||
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
|
||||
}
|
||||
var builder = OpPushRequest.newBuilder();
|
||||
for (Op op : ops) {
|
||||
builder.addMsg(opProtoSerializer.serialize(op));
|
||||
}
|
||||
var built = builder.build();
|
||||
rpcClientFactory.withObjSyncClient(target, (tgt, client) -> client.opPush(built));
|
||||
return OpPushReply.getDefaultInstance();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.usatiuk.dhfs.repository.invalidation;
|
||||
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import com.usatiuk.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.PeerId;
|
||||
import com.usatiuk.dhfs.repository.PeerManager;
|
||||
@@ -39,6 +40,7 @@ public class InvalidationQueueService {
|
||||
@Inject
|
||||
PersistentPeerDataService persistentPeerDataService;
|
||||
|
||||
private final DataLocker _locker = new DataLocker();
|
||||
private ExecutorService _executor;
|
||||
private volatile boolean _shutdown = false;
|
||||
|
||||
@@ -120,7 +122,11 @@ public class InvalidationQueueService {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
try (var lock = _locker.tryLock(e)) {
|
||||
if (lock == null) {
|
||||
pushInvalidationToOne(e);
|
||||
continue;
|
||||
}
|
||||
opPusher.doPush(e);
|
||||
success++;
|
||||
} catch (Exception ex) {
|
||||
@@ -168,11 +174,23 @@ public class InvalidationQueueService {
|
||||
deferredInvalidationQueueService.defer(entry);
|
||||
}
|
||||
|
||||
void pushInvalidationToOneNoDelay(InvalidationQueueEntry entry) {
|
||||
if (remoteHostManager.isReachable(entry.peer()))
|
||||
_queue.addNoDelay(entry);
|
||||
else
|
||||
deferredInvalidationQueueService.defer(entry);
|
||||
}
|
||||
|
||||
public void pushInvalidationToOne(PeerId host, JObjectKey obj) {
|
||||
var entry = new InvalidationQueueEntry(host, obj);
|
||||
pushInvalidationToOne(entry);
|
||||
}
|
||||
|
||||
public void pushInvalidationToOneNoDelay(PeerId host, JObjectKey obj) {
|
||||
var entry = new InvalidationQueueEntry(host, obj);
|
||||
pushInvalidationToOneNoDelay(entry);
|
||||
}
|
||||
|
||||
void pushDeferredInvalidations(InvalidationQueueEntry entry) {
|
||||
_queue.add(entry);
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ public class OpPusher {
|
||||
|
||||
if (tree.hasPendingOpsForHost(entry.peer())) {
|
||||
doAgain.set(true);
|
||||
invalidationQueueService.pushInvalidationToOne(entry.peer(), pd.key());
|
||||
invalidationQueueService.pushInvalidationToOneNoDelay(entry.peer(), pd.key());
|
||||
}
|
||||
return ops;
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ dhfs.objects.peerdiscovery.interval=4s
|
||||
dhfs.objects.peerdiscovery.broadcast=true
|
||||
dhfs.objects.sync.timeout=30
|
||||
dhfs.objects.sync.ping.timeout=5
|
||||
dhfs.objects.invalidation.threads=1
|
||||
dhfs.objects.invalidation.threads=4
|
||||
dhfs.objects.invalidation.delay=1000
|
||||
dhfs.objects.reconnect_interval=5s
|
||||
dhfs.objects.write_log=false
|
||||
|
||||
@@ -19,16 +19,17 @@ public class DataLocker {
|
||||
var tag = _locks.get(data);
|
||||
if (tag != null) {
|
||||
synchronized (tag) {
|
||||
if (!tag.released) {
|
||||
while (!tag.released) {
|
||||
if (tag.owner == Thread.currentThread()) {
|
||||
return DUMMY_LOCK;
|
||||
}
|
||||
tag.wait(4000L);
|
||||
if (!tag.released) {
|
||||
System.out.println("Timeout waiting for lock: " + data);
|
||||
System.exit(1);
|
||||
throw new InterruptedException();
|
||||
}
|
||||
tag.wait();
|
||||
// tag.wait(4000L);
|
||||
// if (!tag.released) {
|
||||
// System.out.println("Timeout waiting for lock: " + data);
|
||||
// System.exit(1);
|
||||
// throw new InterruptedException();
|
||||
// }
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -44,6 +44,24 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Adds the object to the queue, if it exists re-adds it
|
||||
// With no delay
|
||||
// Returns the old object, or null
|
||||
public T addNoDelay(T el) {
|
||||
synchronized (this) {
|
||||
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
|
||||
|
||||
SetElement<T> old = _set.putFirst(el, new SetElement<>(el, 0));
|
||||
this.notify();
|
||||
|
||||
if (old != null)
|
||||
return old.el();
|
||||
else
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Adds the object to the queue, if it exists re-adds it with a new delay
|
||||
// Returns the old object, or null
|
||||
public T readd(T el) {
|
||||
|
||||
@@ -24,6 +24,19 @@ public class HashSetDelayedBlockingQueueTest {
|
||||
Assertions.assertTrue((gotTime - curTime) >= 1000);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void addNoDelay() throws InterruptedException {
|
||||
var queue = new HashSetDelayedBlockingQueue<>(1000);
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
queue.addNoDelay("hello!");
|
||||
var thing = queue.get();
|
||||
var gotTime = System.currentTimeMillis();
|
||||
Assertions.assertEquals("hello!", thing);
|
||||
Assertions.assertTrue((gotTime - curTime) < 500);
|
||||
}
|
||||
|
||||
@Test
|
||||
void GetImmediate() throws InterruptedException {
|
||||
var queue = new HashSetDelayedBlockingQueue<>(0);
|
||||
|
||||
Reference in New Issue
Block a user