5 Commits

Author SHA1 Message Date
8e6ff4ca99 decrease num of directories in persistent store
TODO: configurable
2024-07-26 22:42:23 +02:00
46c2b12571 fix findAll not returning really all
e.g. if it's in writeback
2024-07-26 22:05:13 +02:00
50a1269b09 trace log getIndex/initial resync 2024-07-26 20:53:33 +02:00
d6c0120cf5 fix readdTest 2024-07-26 19:11:28 +02:00
d3ef6d6b43 handle outdated updates with more care
just in case
2024-07-26 19:10:11 +02:00
6 changed files with 40 additions and 18 deletions

View File

@@ -16,10 +16,12 @@ import lombok.Getter;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
@@ -110,8 +112,14 @@ public class JObjectManagerImpl implements JObjectManager {
@Override
public Collection<JObject<?>> findAll() {
Stream<JObject<?>> x = objectPersistentStore.findAllObjects().stream().map(f -> get(f).orElse(null));
return x.filter(Objects::nonNull).toList(); // Somehow this is needed otherwise typing breaks
var out = _map.values().stream().map(SoftReference::get)
.filter(Objects::nonNull)
.collect(Collectors.toCollection((Supplier<LinkedHashSet<JObject<?>>>) LinkedHashSet::new));
objectPersistentStore.findAllObjects().stream()
.map(f -> get(f).orElse(null))
.filter(Objects::nonNull)
.forEach(out::add);
return out;
}
@Override

View File

@@ -122,8 +122,10 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
var reqUuid = UUID.fromString(request.getSelfUuid());
for (var obj : objs)
for (var obj : objs) {
Log.trace("GI: " + obj.getName() + " to " + reqUuid);
invalidationQueueService.pushInvalidationToOne(reqUuid, obj.getName());
}
return Uni.createFrom().item(GetIndexReply.getDefaultInstance());
}

View File

@@ -43,8 +43,10 @@ public class SyncHandler {
// they didn't thing we were disconnected
var objs = jObjectManager.findAll();
for (var obj : objs)
for (var obj : objs) {
Log.trace("IS: " + obj.getName() + " to " + host);
invalidationQueueService.pushInvalidationToOne(host, obj.getName());
}
}
public void handleOneUpdate(UUID from, ObjectHeader header) {
@@ -153,7 +155,8 @@ public class SyncHandler {
try {
handleOneUpdate(UUID.fromString(request.getSelfUuid()), request.getHeader());
} catch (OutdatedUpdateException ignored) {
Log.info("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid());
Log.warn("Outdated update of " + request.getHeader().getName() + " from " + request.getSelfUuid());
invalidationQueueService.pushInvalidationToOne(UUID.fromString(request.getSelfUuid()), request.getHeader().getName());
} catch (Exception ex) {
Log.info("Error when handling update from " + request.getSelfUuid() + " of " + request.getHeader().getName(), ex);
throw ex;

View File

@@ -44,7 +44,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
metaPath.toFile().mkdirs();
dataPath.toFile().mkdirs();
for (int i = 0; i < 256; i++) {
for (int j = 0; j < 256; j++) {
for (int j = 0; j < 16; j++) {
metaPath.resolve(String.valueOf(i)).resolve(String.valueOf(j)).toFile().mkdirs();
dataPath.resolve(String.valueOf(i)).resolve(String.valueOf(j)).toFile().mkdirs();
}
@@ -59,7 +59,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
private Pair<String, String> getDirPathComponents(@Nonnull String obj) {
int h = Objects.hash(obj);
int p1 = h & 0b00000000_00000000_11111111_00000000;
int p2 = h & 0b00000000_00000000_00000000_11111111;
int p2 = h & 0b00000000_00000000_00000000_00001111;
return Pair.ofNonNull(String.valueOf(p1 >> 8), String.valueOf(p2));
}

View File

@@ -71,13 +71,22 @@ public class DhfsFileServiceSimpleTestImpl {
// FIXME: dhfs_files
jObjectManager.put(c1, Optional.of(c1i.getName()));
jObjectManager.put(c2, Optional.of(c2i.getName()));
jObjectManager.put(c3, Optional.of(c3i.getName()));
jObjectManager.put(c1i, Optional.of(f.getName()));
jObjectManager.put(c2i, Optional.of(f.getName()));
jObjectManager.put(c3i, Optional.of(f.getName()));
jObjectManager.put(f, Optional.empty());
var c1o = jObjectManager.put(c1, Optional.of(c1i.getName()));
var c2o = jObjectManager.put(c2, Optional.of(c2i.getName()));
var c3o = jObjectManager.put(c3, Optional.of(c3i.getName()));
var c1io = jObjectManager.put(c1i, Optional.of(f.getName()));
var c2io = jObjectManager.put(c2i, Optional.of(f.getName()));
var c3io = jObjectManager.put(c3i, Optional.of(f.getName()));
var fo = jObjectManager.put(f, Optional.empty());
var all = jObjectManager.findAll();
Assertions.assertTrue(all.contains(c1o));
Assertions.assertTrue(all.contains(c2o));
Assertions.assertTrue(all.contains(c3o));
Assertions.assertTrue(all.contains(c1io));
Assertions.assertTrue(all.contains(c2io));
Assertions.assertTrue(all.contains(c3io));
Assertions.assertTrue(all.contains(fo));
}
String all = "1234567891011";
@@ -207,8 +216,8 @@ public class DhfsFileServiceSimpleTestImpl {
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
fileService.write(uuid, 20, new byte[]{10, 11, 12, 13, 14, 15, 16, 17, 18, 19});
Assertions.assertArrayEquals(new byte[]{
0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19
}, fileService.read(uuid, 0, 30).get().toByteArray());
}

View File

@@ -144,7 +144,7 @@ public class HashSetDelayedBlockingQueueTest {
}
});
var thing = queue.getAllWait(); // Theoretically you can get one...
if (thing.size() == 1) thing.add(queue.getAllWait());
if (thing.size() == 1) thing.add(queue.getAllWait().stream().findFirst());
var gotTime = System.currentTimeMillis();
Assertions.assertIterableEquals(List.of("hello1", "hello2"), thing);
Assertions.assertTrue((gotTime - curTime) >= 1810);