27 Commits

Author SHA1 Message Date
d27c51583d update run-wrapper defaults 2024-07-27 17:54:44 +02:00
1ae12402d0 run-wrapper fixie 2024-07-27 17:53:15 +02:00
6143c9afbd disable fileConflictTest2
there's a race condition in echo >>
2024-07-27 17:39:27 +02:00
8929e10dbf fix update 2024-07-27 17:38:37 +02:00
384069260f tar run wrapper 2024-07-27 17:37:43 +02:00
379f3cc0f5 write logging fixie 2024-07-27 17:00:10 +02:00
0f482ff446 write logging 2024-07-27 16:56:39 +02:00
aea38724ad fixup readd 2024-07-27 16:32:28 +02:00
6ee952163b do not possibly lose stuff in timed getAllWait 2024-07-27 16:22:41 +02:00
08e59ff9a3 remove chunks from files after they've been added to second file when resolving conflicts 2024-07-27 16:09:42 +02:00
e91f1040e8 better fileConflictTest2 logs 2024-07-27 15:59:18 +02:00
ab43676927 setHaveLocalCopy(false) when deleting 2024-07-27 15:58:59 +02:00
f71a2bef7b do not override hashCode and equals for JObject
as we rely on the JObjectManager to make them all equal
2024-07-27 15:37:43 +02:00
f7bce3b0e5 use weak references
we have our own cache now, and soft refs stress the GC a lot
2024-07-27 15:29:40 +02:00
316f9a798c a little less garbage, again 2024-07-27 15:23:44 +02:00
c2b8890a1a some more garbage containment 2024-07-27 15:08:26 +02:00
9f38d778e8 more writing tweaks 2024-07-27 14:57:02 +02:00
2d8744b426 some logging tweaks 2024-07-27 14:48:04 +02:00
8384cc9efc don't use streams in deletion
they make a lot of garbage!
2024-07-27 14:37:09 +02:00
c12e92c52f lock-free pushInvalidationToAll 2024-07-27 14:20:31 +02:00
37722d1604 timed getAllWait 2024-07-27 13:51:05 +02:00
6596f5a6b8 fixie 2024-07-27 13:26:14 +02:00
ff529f7f86 fix paths in tests 2024-07-27 12:52:19 +02:00
d85918d8ff small tweaks 2024-07-27 12:49:12 +02:00
2ededf5bb2 better default paths 2024-07-27 12:07:45 +02:00
aebaf6d8fd a little JObject locks refactor 2024-07-27 11:59:15 +02:00
9cad2c1e28 findAll returns strings
the consumer could get an object if they want to
2024-07-27 11:13:15 +02:00
27 changed files with 560 additions and 406 deletions

View File

@@ -35,9 +35,12 @@ jobs:
distribution: "zulu"
cache: maven
- name: Build and test with Maven
- name: Test with Maven
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package verify
- name: Build with Maven
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
- uses: actions/upload-artifact@v3
with:
name: DHFS Server Package
@@ -207,7 +210,10 @@ jobs:
- name: Add version to run wrapper
run: echo $GITHUB_RUN_ID > "run-wrapper-out/dhfs/app/"version
- name: Tar run wrapper
run: tar -cvf ~/run-wrapper.tar.gz ./run-wrapper-out
- uses: actions/upload-artifact@v3
with:
name: Run wrapper
path: run-wrapper-out
path: ~/run-wrapper.tar.gz

View File

@@ -115,16 +115,6 @@ public class FileConflictResolver implements ConflictResolver {
}
HashSet<String> oursNew = new HashSet<>(oursFile.getChunks().values());
for (var cuuid : oursBackup) {
if (!oursNew.contains(cuuid))
jObjectManager
.get(ChunkInfo.getNameFromHash(cuuid))
.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
mc.removeRef(oursFile.getName());
return null;
}));
}
oursFile.setMtime(first.getMtime());
oursFile.setCtime(first.getCtime());
@@ -157,6 +147,16 @@ public class FileConflictResolver implements ConflictResolver {
} while (true);
bumpDir.apply();
for (var cuuid : oursBackup) {
if (!oursNew.contains(cuuid))
jObjectManager
.get(ChunkInfo.getNameFromHash(cuuid))
.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
mc.removeRef(oursFile.getName());
return null;
}));
}
}
m.setChangelog(newChangelog);

View File

@@ -51,6 +51,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@ConfigProperty(name = "dhfs.objects.ref_verification")
boolean refVerification;
@ConfigProperty(name = "dhfs.objects.write_log")
boolean writeLogging;
@Inject
PersistentRemoteHostsService persistentRemoteHostsService;
@@ -149,7 +152,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
String fname = Path.of(name).getFileName().toString();
var fuuid = UUID.randomUUID();
Log.trace("Creating file " + fuuid);
Log.debug("Creating file " + fuuid);
File f = new File(fuuid, mode, UUID.fromString(parent.getName()), false);
if (!parent.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
@@ -181,7 +184,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
String dname = Path.of(name).getFileName().toString();
var duuid = UUID.randomUUID();
Log.trace("Creating dir " + duuid);
Log.debug("Creating dir " + duuid);
Directory ndir = new Directory(duuid, mode); //FIXME:
if (!found.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
@@ -325,14 +328,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
dentToD.setMtime(System.currentTimeMillis());
dentFrom.bumpVer();
dentFrom.notifyWrite();
dentTo.bumpVer();
dentTo.notifyWrite();
} finally {
dentFrom.rwUnlock();
dentTo.rwUnlock();
theFile.notifyWrite();
theFile.rwUnlock();
}
@@ -517,6 +516,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (!(fDataU instanceof File fData))
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
if (writeLogging) {
StringBuffer sb = new StringBuffer();
sb.append("Writing to file: ");
sb.append(meta.getName());
sb.append(" size=");
sb.append(size(fileUuid));
sb.append(" ");
sb.append(offset);
sb.append(" ");
sb.append(data.length);
sb.append(" ");
sb.append(Arrays.toString(data));
Log.info(sb.toString());
}
if (size(fileUuid) < offset)
truncate(fileUuid, offset);
@@ -803,7 +817,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
String fname = Path.of(newpath).getFileName().toString();
var fuuid = UUID.randomUUID();
Log.trace("Creating file " + fuuid);
Log.debug("Creating file " + fuuid);
File f = new File(fuuid, 0, UUID.fromString(parent.getName()), true);
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));

View File

@@ -15,21 +15,71 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class JObject<T extends JObjectData> implements Serializable, Comparable<JObject<?>> {
public class JObject<T extends JObjectData> {
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
private final ObjectMetadata _metaPart;
private final JObjectResolver _resolver;
private final AtomicReference<T> _dataPart = new AtomicReference<>();
private static final int lockTimeoutSecs = 15;
private class TransactionState {
final int dataHash;
final int metaHash;
final int externalHash;
final boolean data;
final HashSet<String> oldRefs;
TransactionState() {
this.dataHash = _metaPart.dataHash();
this.metaHash = _metaPart.metaHash();
this.externalHash = _metaPart.externalHash();
this.data = _dataPart.get() != null || hasLocalCopy();
if (_resolver.refVerification) {
tryLocalResolve();
if (_dataPart.get() != null)
oldRefs = new HashSet<>(_dataPart.get().extractRefs());
else
oldRefs = null;
} else {
oldRefs = null;
}
}
void commit(boolean forceInvalidate) {
_resolver.updateDeletionState(JObject.this);
var newDataHash = _metaPart.dataHash();
var newMetaHash = _metaPart.metaHash();
var newExternalHash = _metaPart.externalHash();
var newData = _dataPart.get() != null || hasLocalCopy();
if (_dataPart.get() != null)
_metaPart.narrowClass(_dataPart.get().getClass());
notifyWrite(
newMetaHash != metaHash || forceInvalidate,
newExternalHash != externalHash || forceInvalidate,
newDataHash != dataHash
|| newData != data
|| forceInvalidate
);
verifyRefs(oldRefs);
}
}
private TransactionState _transactionState = null;
// Create a new object
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
_resolver = resolver;
_metaPart = new ObjectMetadata(name, false, obj.getClass());
_metaPart.getHaveLocalCopy().set(true);
_metaPart.setHaveLocalCopy(true);
_dataPart.set(obj);
_metaPart.bumpVersion(selfUuid);
Log.trace("new JObject: " + getName());
_metaPart.getChangelog().put(selfUuid, 1L);
if (Log.isTraceEnabled())
Log.trace("new JObject: " + getName());
}
// Create an object from existing metadata
@@ -43,29 +93,11 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
objects.stream().sorted(Comparator.comparingInt(System::identityHashCode)).forEach(JObject::rwLock);
}
@Override
public int compareTo(JObject<?> o) {
return getName().compareTo(o.getName());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JObject<?> jObject = (JObject<?>) o;
return Objects.equals(_metaPart.getName(), jObject._metaPart.getName());
}
@Override
public int hashCode() {
return Objects.hashCode(_metaPart.getName());
}
public Class<? extends JObjectData> getKnownClass() {
return _metaPart.getKnownClass();
}
public void narrowClass(Class<? extends JObjectData> klass) {
protected void narrowClass(Class<? extends JObjectData> klass) {
_metaPart.narrowClass(klass);
}
@@ -83,8 +115,8 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
return _metaPart;
}
public boolean hasLocalCopyMd() {
return _metaPart.getHaveLocalCopy().get();
protected boolean hasLocalCopyMd() {
return _metaPart.isHaveLocalCopy();
}
public Class<? extends ConflictResolver> getConflictResolver() {
@@ -126,12 +158,12 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
var res = _resolver.resolveDataRemote(this);
_metaPart.narrowClass(res.getClass());
_dataPart.set(res);
_metaPart.getHaveLocalCopy().set(true);
_metaPart.setHaveLocalCopy(true);
hydrateRefs();
verifyRefs();
} // _dataPart.get() == null
} finally {
_lock.writeLock().unlock();
rwUnlock();
} // try
} // _dataPart.get() == null
}
@@ -151,7 +183,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_dataPart.compareAndSet(null, res.get());
} // _dataPart.get() == null
} finally {
_lock.readLock().unlock();
rUnlock();
} // try
} // _dataPart.get() == null
}
@@ -169,7 +201,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
throw new IllegalStateException("Expected external resolution only for classes with pushResolution " + getName());
_metaPart.narrowClass(data.getClass());
_dataPart.set(data);
_metaPart.getHaveLocalCopy().set(true);
_metaPart.setHaveLocalCopy(true);
if (!_metaPart.isLocked())
_metaPart.lock();
hydrateRefs();
@@ -195,6 +227,9 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
public void rwLock() {
if (!tryRWLock())
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire write lock for " + getName()));
if (_lock.writeLock().getHoldCount() == 1) {
_transactionState = new TransactionState();
}
}
public void rLock() {
@@ -202,6 +237,32 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire read lock for " + getName()));
}
public void rUnlock() {
_lock.readLock().unlock();
}
public void rwUnlock() {
rwUnlock(false);
}
public void rwUnlock(boolean forceInvalidate) {
try {
if (_lock.writeLock().getHoldCount() == 1) {
_transactionState.commit(forceInvalidate);
_transactionState = null;
}
} catch (Exception ex) {
Log.error("When committing changes to " + getName(), ex);
} finally {
_lock.writeLock().unlock();
}
}
public void assertRWLock() {
if (!_lock.isWriteLockedByCurrentThread())
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
}
public <R> R runReadLocked(ResolutionStrategy resolutionStrategy, ObjectFnRead<T, R> fn) {
tryResolve(resolutionStrategy);
@@ -211,7 +272,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
throw new DeletedObjectAccessException();
return fn.apply(_metaPart, _dataPart.get());
} finally {
_lock.readLock().unlock();
rUnlock();
}
}
@@ -231,45 +292,15 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
rwLock();
try {
tryResolve(resolutionStrategy);
var dataHash = _metaPart.dataHash();
var metaHash = Objects.hash(_metaPart.metaHash(), dataHash);
var prevData = _dataPart.get() != null || hasLocalCopy();
HashSet<String> oldRefs = null;
if (_resolver.refVerification) {
tryLocalResolve();
if (_dataPart.get() != null)
oldRefs = new HashSet<>(_dataPart.get().extractRefs());
}
VoidFn invalidateFn = () -> {
tryLocalResolve();
_resolver.backupRefs(this);
_dataPart.set(null);
_resolver.removeLocal(this, _metaPart.getName());
};
var ret = fn.apply(_metaPart, _dataPart.get(), this::bumpVer, invalidateFn);
_resolver.updateDeletionState(this);
var newDataHash = _metaPart.dataHash();
var newMetaHash = Objects.hash(_metaPart.metaHash(), newDataHash);
var newData = _dataPart.get() != null || hasLocalCopy();
if (_dataPart.get() != null)
_metaPart.narrowClass(_dataPart.get().getClass());
if (!Objects.equals(newMetaHash, metaHash)
|| newData != prevData)
notifyWriteMeta();
if (!Objects.equals(newDataHash, dataHash)
|| newData != prevData)
notifyWriteData();
verifyRefs(oldRefs);
return ret;
return fn.apply(_metaPart, _dataPart.get(), this::bumpVer, invalidateFn);
} finally {
_lock.writeLock().unlock();
rwUnlock();
}
}
@@ -280,20 +311,9 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
return _dataPart.get() != null;
}
public void notifyWriteMeta() {
private void notifyWrite(boolean metaChanged, boolean externalChanged, boolean hasDataChanged) {
assertRWLock();
_resolver.notifyWriteMeta(this);
}
public void notifyWriteData() {
assertRWLock();
_resolver.notifyWriteData(this);
}
public void notifyWrite() {
_resolver.updateDeletionState(this);
notifyWriteMeta();
notifyWriteData();
_resolver.notifyWrite(this, metaChanged, externalChanged, hasDataChanged);
}
public void bumpVer() {
@@ -301,23 +321,16 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
_resolver.bumpVersionSelf(this);
}
public void rwUnlock() {
_lock.writeLock().unlock();
}
public void discardData() {
assertRWLock();
if (!isDeleted())
throw new IllegalStateException("Expected to be deleted when discarding data");
_dataPart.set(null);
_metaPart.setHaveLocalCopy(false);
_metaPart.setSavedRefs(Collections.emptySet());
}
public void assertRWLock() {
if (!_lock.isWriteLockedByCurrentThread())
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
}
public enum ResolutionStrategy {
NO_RESOLUTION,
LOCAL_ONLY,

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects.jrepository;
public record JObjectKey(short type) {
}

View File

@@ -7,7 +7,7 @@ public interface JObjectManager {
Optional<JObject<?>> get(String name);
Collection<JObject<?>> findAll();
Collection<String> findAll();
// Put a new object
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);

View File

@@ -14,7 +14,7 @@ import jakarta.inject.Inject;
import lombok.Getter;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Objects;
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
@ApplicationScoped
public class JObjectManagerImpl implements JObjectManager {
private final ConcurrentHashMap<String, NamedSoftReference> _map = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, NamedWeakReference> _map = new ConcurrentHashMap<>();
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
@Inject
ObjectPersistentStore objectPersistentStore;
@@ -56,7 +56,7 @@ public class JObjectManagerImpl implements JObjectManager {
private void refCleanupThread() {
try {
while (!Thread.interrupted()) {
NamedSoftReference cur = (NamedSoftReference) _refQueue.remove();
NamedWeakReference cur = (NamedWeakReference) _refQueue.remove();
_map.remove(cur._key, cur);
}
} catch (InterruptedException ignored) {
@@ -102,7 +102,7 @@ public class JObjectManagerImpl implements JObjectManager {
JObject<?> ret = null;
var newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(newObj, _refQueue));
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(newObj, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
@@ -111,14 +111,12 @@ public class JObjectManagerImpl implements JObjectManager {
}
@Override
public Collection<JObject<?>> findAll() {
var out = _map.values().stream().map(SoftReference::get)
public Collection<String> findAll() {
var out = _map.values().stream().map(WeakReference::get)
.filter(Objects::nonNull)
.collect(Collectors.toCollection((Supplier<LinkedHashSet<JObject<?>>>) LinkedHashSet::new));
objectPersistentStore.findAllObjects().stream()
.map(f -> get(f).orElse(null))
.filter(Objects::nonNull)
.forEach(out::add);
.map(JObject::getName)
.collect(Collectors.toCollection((Supplier<LinkedHashSet<String>>) LinkedHashSet::new));
out.addAll(objectPersistentStore.findAllObjects());
return out;
}
@@ -126,7 +124,6 @@ public class JObjectManagerImpl implements JObjectManager {
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
while (true) {
JObject<?> ret;
boolean created = false;
JObject<?> newObj = null;
try {
ret = getFromMap(object.getName());
@@ -138,15 +135,13 @@ public class JObjectManagerImpl implements JObjectManager {
newObj.rwLock();
while (ret == null) {
JObject<?> finalNewObj = newObj;
var ref = _map.computeIfAbsent(object.getName(), k -> new NamedSoftReference(finalNewObj, _refQueue));
var ref = _map.computeIfAbsent(object.getName(), k -> new NamedWeakReference(finalNewObj, _refQueue));
if (ref.get() == null) _map.remove(object.getName(), ref);
else ret = ref.get();
}
if (ret != newObj) continue;
created = true;
}
JObject<D> finalRet = (JObject<D>) ret;
boolean finalCreated = created;
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
if (object.getClass().isAnnotationPresent(PushResolution.class)
&& object.getClass().isAnnotationPresent(AssumedUnique.class)
@@ -162,13 +157,13 @@ public class JObjectManagerImpl implements JObjectManager {
m.lock();
}
if (finalCreated) finalRet.notifyWrite();// Kind of a hack?
return null;
});
} finally {
if (newObj != null) newObj.rwUnlock();
if (newObj != null)
newObj.rwUnlock(true);
}
if (!created)
if (newObj == null)
jObjectLRU.notifyAccess(ret);
return (JObject<D>) ret;
}
@@ -197,7 +192,7 @@ public class JObjectManagerImpl implements JObjectManager {
created.rwLock();
try {
while (ret == null) {
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(created, _refQueue));
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
if (ref.get() == null) _map.remove(name, ref);
else ret = ref.get();
}
@@ -215,11 +210,11 @@ public class JObjectManagerImpl implements JObjectManager {
}
}
private static class NamedSoftReference extends SoftReference<JObject<?>> {
private static class NamedWeakReference extends WeakReference<JObject<?>> {
@Getter
final String _key;
public NamedSoftReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
public NamedWeakReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
super(target, q);
this._key = target.getName();
}

View File

@@ -12,10 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
@@ -99,7 +96,7 @@ public class JObjectRefProcessor {
}
if (ok != missing.size()) {
Log.trace("Delaying deletion check of " + obj.getName());
Log.debug("Delaying deletion check of " + obj.getName());
delay = true;
}
@@ -138,6 +135,13 @@ public class JObjectRefProcessor {
return false;
}
private void deleteRef(JObject<?> self, String name) {
jObjectManager.get(name).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
mc.removeRef(self.getName());
return null;
}));
}
private void refProcessorThread() {
try {
while (!Thread.interrupted()) {
@@ -149,38 +153,32 @@ public class JObjectRefProcessor {
next = _candidates.get(canDeleteRetryDelay);
}
var got = jObjectManager.get(next);
if (got.isEmpty()) continue;
var got = jObjectManager.get(next).orElse(null);
if (got == null) continue;
try {
got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, v, i) -> {
got.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, v, i) -> {
if (m.isLocked()) return null;
if (m.isDeleted()) return null;
if (!m.isDeletionCandidate()) return null;
if (m.isSeen() && m.getKnownClass().isAnnotationPresent(Movable.class)) {
if (!processMovable(got.get()))
if (!processMovable(got))
return null;
}
got.get().tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
got.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
Log.trace("Deleting " + m.getName());
Log.debug("Deleting " + m.getName());
m.markDeleted();
Stream<String> refs = Stream.empty();
Collection<String> extracted = null;
if (got.getData() != null) extracted = got.getData().extractRefs();
if (m.getSavedRefs() != null)
refs = m.getSavedRefs().stream();
if (got.get().getData() != null)
refs = Streams.concat(refs, got.get().getData().extractRefs().stream());
got.discardData();
got.get().discardData();
refs.forEach(c -> {
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
mc.removeRef(m.getName());
return null;
}));
});
if (got.getMeta().getSavedRefs() != null)
for (var r : got.getMeta().getSavedRefs()) deleteRef(got, r);
if (extracted != null)
for (var r : extracted) deleteRef(got, r);
return null;
});

View File

@@ -16,6 +16,7 @@ import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Optional;
@@ -101,7 +102,6 @@ public class JObjectResolver {
}
}
self.getMeta().setSavedRefs(null);
notifyWriteMeta(self);
}
}
@@ -132,28 +132,32 @@ public class JObjectResolver {
}
}
private void quickDeleteRef(JObject<?> self, String name) {
jObjectManager.get(name)
.ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
mc.removeRef(self.getName());
return null;
}));
}
private void tryQuickDelete(JObject<?> self) {
self.assertRWLock();
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
Log.trace("Quick delete of: " + self.getName());
if (Log.isTraceEnabled())
Log.trace("Quick delete of: " + self.getName());
self.getMeta().markDeleted();
Stream<String> refs = Stream.empty();
if (self.getMeta().getSavedRefs() != null)
refs = self.getMeta().getSavedRefs().stream();
if (self.getData() != null)
refs = Streams.concat(refs, self.getData().extractRefs().stream());
Collection<String> extracted = null;
if (self.getData() != null) extracted = self.getData().extractRefs();
self.discardData();
refs.forEach(c -> {
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
mc.removeRef(self.getName());
return null;
}));
});
if (self.getMeta().getSavedRefs() != null)
for (var r : self.getMeta().getSavedRefs()) quickDeleteRef(self, r);
if (extracted != null)
for (var r : extracted) quickDeleteRef(self, r);
}
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
@@ -174,8 +178,8 @@ public class JObjectResolver {
public void removeLocal(JObject<?> jObject, String name) {
jObject.assertRWLock();
try {
Log.trace("Invalidating " + name);
jObject.getMeta().getHaveLocalCopy().set(false);
Log.debug("Invalidating " + name);
jObject.getMeta().setHaveLocalCopy(false);
jObjectWriteback.remove(jObject);
objectPersistentStore.deleteObjectData(name);
} catch (StatusRuntimeException sx) {
@@ -186,26 +190,25 @@ public class JObjectResolver {
}
}
public <T extends JObjectData> void notifyWriteMeta(JObject<T> self) {
public <T extends JObjectData> void notifyWrite(JObject<T> self, boolean metaChanged,
boolean externalChanged, boolean hasDataChanged) {
self.assertRWLock();
jObjectWriteback.markDirty(self);
for (var t : _metaWriteListeners.keySet()) { // FIXME:?
if (t.isAssignableFrom(self.getKnownClass()))
for (var cb : _metaWriteListeners.get(t))
cb.apply((JObject) self);
}
}
public <T extends JObjectData> void notifyWriteData(JObject<T> self) {
self.assertRWLock();
jObjectWriteback.markDirty(self);
if (self.isResolved()) {
invalidationQueueService.pushInvalidationToAll(self.getName());
if (metaChanged || hasDataChanged)
jObjectWriteback.markDirty(self);
if (metaChanged)
for (var t : _metaWriteListeners.keySet()) { // FIXME:?
if (t.isAssignableFrom(self.getKnownClass()))
for (var cb : _metaWriteListeners.get(t))
cb.apply((JObject) self);
}
if (hasDataChanged)
for (var t : _writeListeners.keySet()) { // FIXME:?
if (t.isAssignableFrom(self.getKnownClass()))
for (var cb : _writeListeners.get(t))
cb.apply((JObject) self);
}
if (externalChanged) {
invalidationQueueService.pushInvalidationToAll(self.getName());
}
}

View File

@@ -161,7 +161,7 @@ public class JObjectWriteback {
if (!m.isDeletionCandidate())
throw new IllegalStateException("Object deleted but not deletable! " + m.getName());
// FIXME: assert Rw lock here?
Log.trace("Deleting from persistent storage " + m.getName());
Log.debug("Deleting from persistent storage " + m.getName());
objectPersistentStore.deleteObject(m.getName());
return;
}

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.repository.ObjectHeader;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
import io.vertx.core.impl.ConcurrentHashSet;
import lombok.Getter;
import lombok.Setter;
@@ -27,26 +28,30 @@ public class ObjectMetadata implements Serializable {
@Getter
private final Map<UUID, Long> _remoteCopies = new LinkedHashMap<>();
private final AtomicReference<Class<? extends JObjectData>> _knownClass = new AtomicReference<>();
private final AtomicBoolean _seen = new AtomicBoolean(false);
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@Getter
private final HashSet<UUID> _confirmedDeletes = new HashSet<>();
private final Set<String> _referrers = new ConcurrentSkipListSet<>();
private volatile boolean _seen = false;
@Getter
private volatile boolean _deleted = false;
@Getter
private final HashSet<UUID> _confirmedDeletes = new LinkedHashSet<>();
private final Set<String> _referrers = new LinkedHashSet<>();
@Getter
@Setter
private Map<UUID, Long> _changelog = new LinkedHashMap<>();
private Map<UUID, Long> _changelog = new LinkedHashMap<>(4);
@Getter
@Setter
private Set<String> _savedRefs = Collections.emptySet();
@Getter
private boolean _locked = false;
@Getter
private AtomicBoolean _haveLocalCopy = new AtomicBoolean(false);
private transient AtomicBoolean _written = new AtomicBoolean(true);
@Setter
private volatile boolean _haveLocalCopy = false;
@Getter
private transient volatile boolean _written = true;
public ObjectMetadata(String name, boolean written, Class<? extends JObjectData> knownClass) {
_name = name;
_written.set(written);
_written = written;
_knownClass.set(knownClass);
}
@@ -68,37 +73,25 @@ public class ObjectMetadata implements Serializable {
@Serial
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
_written = new AtomicBoolean(true);
}
public boolean isSeen() {
return _seen.get();
}
public boolean isDeleted() {
return _deleted.get();
_written = true;
}
public void markSeen() {
Log.trace("Marking seen: " + getName());
_seen.set(true);
_seen = true;
}
public void markDeleted() {
_deleted.set(true);
_deleted = true;
}
public void undelete() {
_confirmedDeletes.clear();
_deleted.set(false);
}
public boolean isWritten() {
return _written.get();
_deleted = false;
}
public void markWritten() {
_written.set(true);
_written = true;
}
public boolean isReferred() {
@@ -125,7 +118,8 @@ public class ObjectMetadata implements Serializable {
public void addRef(String from) {
_confirmedDeletes.clear();
_referrers.add(from);
Log.trace("Adding ref " + from + " to " + getName());
if (Log.isTraceEnabled())
Log.trace("Adding ref " + from + " to " + getName());
}
public void removeRef(String from) {
@@ -133,7 +127,8 @@ public class ObjectMetadata implements Serializable {
unlock();
Log.error("Object " + getName() + " is locked, but we removed a reference to it, unlocking!");
}
Log.trace("Removing ref " + from + " from " + getName());
if (Log.isTraceEnabled())
Log.trace("Removing ref " + from + " from " + getName());
_referrers.remove(from);
}
@@ -190,7 +185,24 @@ public class ObjectMetadata implements Serializable {
}
public int metaHash() {
return Objects.hash(isSeen(), getKnownClass(), isDeleted(), _referrers, _locked, _remoteCopies, _savedRefs, _haveLocalCopy);
int res = Objects.hashCode(_name);
res = 31 * res + Objects.hashCode(isSeen());
res = 31 * res + Objects.hashCode(getKnownClass());
res = 31 * res + Objects.hashCode(isDeleted());
res = 31 * res + Objects.hashCode(_confirmedDeletes);
res = 31 * res + Objects.hashCode(_referrers);
res = 31 * res + Objects.hashCode(_changelog);
res = 31 * res + Objects.hashCode(_locked);
res = 31 * res + Objects.hashCode(_remoteCopies);
res = 31 * res + Objects.hashCode(_savedRefs);
res = 31 * res + Objects.hashCode(_haveLocalCopy);
return res;
}
public int externalHash() {
int res = Objects.hashCode(_changelog);
res = 31 * res + Objects.hashCode(_haveLocalCopy);
return res;
}
// Not really a hash

View File

@@ -28,7 +28,7 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
.putAllChangelog(object.getChangelog().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)))
.addAllSavedRefs(object.getSavedRefs() != null ? object.getSavedRefs() : Collections.emptyList())
.setLocked(object.isLocked())
.setHaveLocalCopy(object.getHaveLocalCopy().get())
.setHaveLocalCopy(object.isHaveLocalCopy())
.build();
}
@@ -51,7 +51,7 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
if (message.getLocked())
obj.lock();
if (message.getHaveLocalCopy())
obj.getHaveLocalCopy().set(true);
obj.setHaveLocalCopy(true);
return obj;
} catch (ClassNotFoundException cx) {

View File

@@ -6,6 +6,7 @@ import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import io.vertx.core.impl.ConcurrentHashSet;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@@ -13,10 +14,14 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import java.lang.ref.Reference;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ApplicationScoped
public class InvalidationQueueService {
@@ -39,7 +44,9 @@ public class InvalidationQueueService {
int threads;
private final HashSetDelayedBlockingQueue<Pair<UUID, String>> _queue;
private final AtomicReference<ConcurrentHashSet<String>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
private ExecutorService _executor;
private boolean _shutdown = false;
public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) {
_queue = new HashSetDelayedBlockingQueue<>(delay);
@@ -55,6 +62,7 @@ public class InvalidationQueueService {
}
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
_shutdown = true;
_executor.shutdownNow();
if (!_executor.awaitTermination(30, TimeUnit.SECONDS)) {
Log.error("Failed to shut down invalidation sender thread");
@@ -62,10 +70,35 @@ public class InvalidationQueueService {
}
private void sender() {
try {
while (!Thread.interrupted()) {
while (!_shutdown) {
try {
try {
var data = _queue.getAllWait(100); // TODO: config?
if (!_queue.hasImmediate()) {
ConcurrentHashSet<String> toAllQueue;
while (true) {
toAllQueue = _toAllQueue.get();
if (toAllQueue != null) {
if (_toAllQueue.compareAndSet(toAllQueue, null))
break;
} else {
break;
}
}
if (toAllQueue != null) {
var hostInfo = remoteHostManager.getHostStateSnapshot();
for (var o : toAllQueue) {
for (var h : hostInfo.available())
_queue.add(Pair.of(h, o));
for (var u : hostInfo.unavailable())
deferredInvalidationQueueService.defer(u, o);
}
}
}
var data = _queue.getAllWait(100, _queue.getDelay()); // TODO: config?
if (data.isEmpty()) continue;
String stats = "Sent invalidation: ";
long success = 0;
@@ -87,7 +120,7 @@ public class InvalidationQueueService {
Log.info("Failed to send invalidation to " + e.getLeft() + ", will retry", ex);
pushInvalidationToOne(e.getLeft(), e.getRight());
}
if (Thread.interrupted()) {
if (_shutdown) {
Log.info("Invalidation sender exiting");
break;
}
@@ -100,8 +133,8 @@ public class InvalidationQueueService {
} catch (Exception e) {
Log.error("Exception in invalidation sender thread: ", e);
}
} catch (InterruptedException ignored) {
}
} catch (InterruptedException ignored) {
}
Log.info("Invalidation sender exiting");
var data = _queue.close();
@@ -111,12 +144,18 @@ public class InvalidationQueueService {
}
public void pushInvalidationToAll(String name) {
var hosts = remoteHostManager.getAvailableHosts();
for (var h : hosts)
_queue.add(Pair.of(h, name));
var unavailable = remoteHostManager.getUnavailableHosts();
for (var u : unavailable)
deferredInvalidationQueueService.defer(u, name);
while (true) {
var queue = _toAllQueue.get();
if (queue == null) {
var nq = new ConcurrentHashSet<String>();
if (!_toAllQueue.compareAndSet(null, nq)) continue;
queue = nq;
}
queue.add(name);
if (_toAllQueue.get() == queue) break;
}
}
public void pushInvalidationToOne(UUID host, String name) {

View File

@@ -63,7 +63,7 @@ public class RemoteHostManager {
.<Callable<Void>>map(host -> () -> {
try {
if (isReachable(host))
Log.trace("Heartbeat: " + host);
Log.debug("Heartbeat: " + host);
else
Log.info("Trying to connect to " + host);
if (pingCheck(host))
@@ -173,6 +173,25 @@ public class RemoteHostManager {
.map(Map.Entry::getKey).toList());
}
public record HostStateSnapshot(List<UUID> available, List<UUID> unavailable) {
}
public HostStateSnapshot getHostStateSnapshot() {
ArrayList<UUID> available = new ArrayList<>();
ArrayList<UUID> unavailable = new ArrayList<>();
_transientPeersState.runReadLocked(d -> {
for (var v : d.getStates().entrySet()) {
if (v.getValue().isReachable())
available.add(v.getKey());
else
unavailable.add(v.getKey());
}
return null;
}
);
return new HostStateSnapshot(available, unavailable);
}
public void notifyAddr(UUID host, String addr, Integer port, Integer securePort) {
if (host.equals(persistentRemoteHostsService.getSelfUuid())) {
return;
@@ -187,7 +206,7 @@ public class RemoteHostManager {
var prev = _seenHostsButNotAdded.put(host, state);
// Needed for tests
if (prev == null)
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
Log.debug("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
return;
} else {
_seenHostsButNotAdded.remove(host);

View File

@@ -123,8 +123,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
var reqUuid = UUID.fromString(request.getSelfUuid());
for (var obj : objs) {
Log.trace("GI: " + obj.getName() + " to " + reqUuid);
invalidationQueueService.pushInvalidationToOne(reqUuid, obj.getName());
Log.trace("GI: " + obj + " to " + reqUuid);
invalidationQueueService.pushInvalidationToOne(reqUuid, obj);
}
return Uni.createFrom().item(GetIndexReply.getDefaultInstance());

View File

@@ -37,7 +37,7 @@ public class RpcClientFactory {
return withObjSyncClient(target, fn);
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode()))
Log.trace("Host " + target + " is unreachable: " + e.getMessage());
Log.debug("Host " + target + " is unreachable: " + e.getMessage());
else
Log.warn("When calling " + target + " " + e.getMessage());
} catch (Exception e) {

View File

@@ -44,8 +44,8 @@ public class SyncHandler {
var objs = jObjectManager.findAll();
for (var obj : objs) {
Log.trace("IS: " + obj.getName() + " to " + host);
invalidationQueueService.pushInvalidationToOne(host, obj.getName());
Log.trace("IS: " + obj + " to " + host);
invalidationQueueService.pushInvalidationToOne(host, obj);
}
}

View File

@@ -49,8 +49,9 @@ public class AutoSyncProcessor {
if (downloadAll)
executorService.submit(() -> {
for (var obj : jObjectManager.findAll()) {
if (!obj.hasLocalCopy())
add(obj.getName());
var got = jObjectManager.get(obj);
if (got.isEmpty() || !got.get().hasLocalCopy())
add(obj);
}
});
}

View File

@@ -1,6 +1,7 @@
package com.usatiuk.dhfs.objects.repository.persistence;
import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
@@ -107,9 +108,9 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
}
private <T extends Message> T readObjectImpl(T defaultInstance, Path path) {
try (var fsb = new FileInputStream(path.toFile());
var fs = new BufferedInputStream(fsb, 1048576)) {
return (T) defaultInstance.getParserForType().parseFrom(fs);
try (var fsb = new FileInputStream(path.toFile())) {
var file = fsb.readAllBytes();
return (T) defaultInstance.getParserForType().parseFrom(UnsafeByteOperations.unsafeWrap(file));
} catch (FileNotFoundException | NoSuchFileException fx) {
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
} catch (IOException e) {
@@ -133,8 +134,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
private void writeObjectImpl(Path path, Message data) {
try {
try (var fsb = new FileOutputStream(path.toFile(), false);
var fs = new BufferedOutputStream(fsb, 1048576)) {
data.writeTo(fs);
var buf = new BufferedOutputStream(fsb, Math.min(65536, data.getSerializedSize()))) {
data.writeTo(buf);
}
} catch (IOException e) {
Log.error("Error writing file " + path, e);

View File

@@ -1,6 +1,7 @@
package com.usatiuk.utils;
import jakarta.annotation.Nullable;
import lombok.Getter;
import java.util.*;
@@ -30,6 +31,7 @@ public class HashSetDelayedBlockingQueue<T> {
private final LinkedHashMap<SetElement, SetElement> _set = new LinkedHashMap<>();
private final LinkedHashSet<Thread> _waiting = new LinkedHashSet<>();
@Getter
private final long _delay;
private boolean _closed = false;
@@ -57,8 +59,7 @@ public class HashSetDelayedBlockingQueue<T> {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
var sel = new SetElement(el, System.currentTimeMillis());
SetElement contains = _set.remove(sel);
_set.put(sel, sel);
SetElement contains = _set.putLast(sel, sel);
this.notify();
if (contains != null)
return contains._el;
@@ -122,6 +123,17 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
public boolean hasImmediate() {
synchronized (this) {
if (_set.isEmpty()) return false;
var curTime = System.currentTimeMillis();
var first = _set.firstEntry().getValue()._time;
return first + _delay <= curTime;
}
}
@Nullable
public T tryGet() throws InterruptedException {
synchronized (this) {
@@ -161,19 +173,34 @@ public class HashSetDelayedBlockingQueue<T> {
}
public Collection<T> getAllWait() throws InterruptedException {
return getAllWait(Integer.MAX_VALUE);
return getAllWait(Integer.MAX_VALUE, -1);
}
public Collection<T> getAllWait(int max) throws InterruptedException {
return getAllWait(max, -1);
}
public Collection<T> getAllWait(int max, long timeout) throws InterruptedException {
ArrayList<T> out = new ArrayList<>();
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
try {
synchronized (this) {
_waiting.add(Thread.currentThread());
}
while (!Thread.interrupted()) {
if (timeout > 0)
if (System.currentTimeMillis() > (startedWaiting + timeout)) return out;
long sleep = 0;
synchronized (this) {
while (_set.isEmpty()) this.wait();
while (_set.isEmpty()) {
if (timeout > 0) {
this.wait(timeout);
if (System.currentTimeMillis() > (startedWaiting + timeout))
return out;
} else {
this.wait();
}
}
var curTime = System.currentTimeMillis();
@@ -187,6 +214,13 @@ public class HashSetDelayedBlockingQueue<T> {
}
}
}
if (timeout > 0) {
var cur = System.currentTimeMillis();
if (cur > (startedWaiting + timeout)) return out;
sleep = Math.min(sleep, (startedWaiting + timeout) - cur);
}
if (sleep > 0)
Thread.sleep(sleep);
else
@@ -198,7 +232,7 @@ public class HashSetDelayedBlockingQueue<T> {
_waiting.remove(Thread.currentThread());
}
}
throw new InterruptedException();
return out;
}
public void interrupt() {
@@ -206,5 +240,4 @@ public class HashSetDelayedBlockingQueue<T> {
for (var t : _waiting) t.interrupt();
}
}
}

View File

@@ -1,6 +1,6 @@
quarkus.grpc.server.use-separate-server=false
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.root=${HOME}/dhfs_default/data/stuff
dhfs.objects.peerdiscovery.port=42069
dhfs.objects.peerdiscovery.interval=5000
dhfs.objects.sync.timeout=30
@@ -8,7 +8,8 @@ dhfs.objects.sync.ping.timeout=5
dhfs.objects.invalidation.threads=4
dhfs.objects.invalidation.delay=1000
dhfs.objects.reconnect_interval=5s
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
dhfs.objects.write_log=false
dhfs.fuse.root=${HOME}/dhfs_default/fuse
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.target_chunk_size=524288

View File

@@ -80,13 +80,13 @@ public class DhfsFileServiceSimpleTestImpl {
var fo = jObjectManager.put(f, Optional.empty());
var all = jObjectManager.findAll();
Assertions.assertTrue(all.contains(c1o));
Assertions.assertTrue(all.contains(c2o));
Assertions.assertTrue(all.contains(c3o));
Assertions.assertTrue(all.contains(c1io));
Assertions.assertTrue(all.contains(c2io));
Assertions.assertTrue(all.contains(c3io));
Assertions.assertTrue(all.contains(fo));
Assertions.assertTrue(all.contains(c1o.getName()));
Assertions.assertTrue(all.contains(c2o.getName()));
Assertions.assertTrue(all.contains(c3o.getName()));
Assertions.assertTrue(all.contains(c1io.getName()));
Assertions.assertTrue(all.contains(c2io.getName()));
Assertions.assertTrue(all.contains(c3io.getName()));
Assertions.assertTrue(all.contains(fo.getName()));
}
String all = "1234567891011";
@@ -224,7 +224,7 @@ public class DhfsFileServiceSimpleTestImpl {
@Test
void moveTest2() throws InterruptedException {
var ret = fileService.create("/moveTest", 777);
var ret = fileService.create("/moveTest2", 777);
Assertions.assertTrue(ret.isPresent());
var uuid = ret.get();
@@ -240,14 +240,14 @@ public class DhfsFileServiceSimpleTestImpl {
return null;
});
Assertions.assertTrue(fileService.rename("/moveTest", "/movedTest"));
Assertions.assertFalse(fileService.open("/moveTest").isPresent());
Assertions.assertTrue(fileService.open("/movedTest").isPresent());
Assertions.assertTrue(fileService.rename("/moveTest2", "/movedTest2"));
Assertions.assertFalse(fileService.open("/moveTest2").isPresent());
Assertions.assertTrue(fileService.open("/movedTest2").isPresent());
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray());
fileService.read(fileService.open("/movedTest2").get(), 0, 10).get().toByteArray());
var newfile = fileService.open("/movedTest").get();
var newfile = fileService.open("/movedTest2").get();
Thread.sleep(1000);

View File

@@ -49,6 +49,7 @@ public class DhfsFuseIT {
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=20",
"-Ddhfs.objects.sync.ping.timeout=20",
"-Ddhfs.objects.reconnect_interval=1s",
@@ -74,8 +75,8 @@ public class DhfsFuseIT {
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
@@ -108,80 +109,80 @@ public class DhfsFuseIT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
}
@Test
void readWriteRewriteFileTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
}
@Test
void createDelayedTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_data/dhfs_fuse_root/testf2").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_default/fuse/testf2").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
Thread.sleep(2000);
Assertions.assertEquals("newfile\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
Assertions.assertEquals("newfile\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
Assertions.assertEquals("newfile\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout());
Assertions.assertEquals("newfile\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout());
}
@Test
void writeRewriteDelayedTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
client.unpauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
Thread.sleep(2000);
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
}
@Test
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var client = DockerClientFactory.instance().client();
client.pauseContainerCmd(container2.getContainerId()).exec();
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
client.unpauseContainerCmd(container2.getContainerId()).exec();
@@ -189,86 +190,86 @@ public class DhfsFuseIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
Thread.sleep(1000);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Thread.sleep(1000);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
Thread.sleep(1000);
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
}
@Test
void deleteTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
Thread.sleep(500);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Thread.sleep(500);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 2);
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
Thread.sleep(1000);
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
}
@Test
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating");
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Thread.sleep(1000);
Log.info("Listing");
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
Thread.sleep(1000);
Log.info("Moving");
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_data/dhfs_fuse_root/testf1 /root/dhfs_data/dhfs_fuse_root/testf2").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
Thread.sleep(1000);
Log.info("Listing");
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
Thread.sleep(1000);
Log.info("Reading");
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout());
}
@Test
void moveDirTest() throws IOException, InterruptedException, TimeoutException {
Log.info("Creating");
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_data/dhfs_fuse_root/testdir").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testdir/testf1").getExitCode());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testdir/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testdir/testf1").getExitCode());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir/testf1").getStdout());
Thread.sleep(1000);
Log.info("Listing");
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
Thread.sleep(1000);
Log.info("Moving");
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_data/dhfs_fuse_root/testdir2").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_data/dhfs_fuse_root/testdir /root/dhfs_data/dhfs_fuse_root/testdir2/testdirm").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir2").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testdir /root/dhfs_default/fuse/testdir2/testdirm").getExitCode());
Thread.sleep(1000);
Log.info("Listing");
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
Thread.sleep(1000);
Log.info("Reading");
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testdir2/testdirm/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir2/testdirm/testf1").getStdout());
}
@Test
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var c2curl = container2.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
@@ -276,9 +277,9 @@ public class DhfsFuseIT {
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
@@ -291,17 +292,17 @@ public class DhfsFuseIT {
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
Thread.sleep(2000);
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
}
@Test
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Thread.sleep(1000);
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -310,8 +311,8 @@ public class DhfsFuseIT {
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
@@ -321,10 +322,10 @@ public class DhfsFuseIT {
@Test
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -333,8 +334,8 @@ public class DhfsFuseIT {
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
@@ -343,8 +344,8 @@ public class DhfsFuseIT {
@Test
void dirConflictTest3() throws IOException, InterruptedException, TimeoutException {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -353,8 +354,8 @@ public class DhfsFuseIT {
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
@@ -363,12 +364,12 @@ public class DhfsFuseIT {
@Test
void dirConflictTest4() throws IOException, InterruptedException, TimeoutException {
boolean createdOk = (container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf").getExitCode() == 0)
&& (container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf").getExitCode() == 0);
boolean createdOk = (container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode() == 0)
&& (container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode() == 0);
Assumptions.assumeTrue(createdOk, "Failed creating one or more files");
Thread.sleep(1000);
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));

View File

@@ -53,6 +53,7 @@ public class DhfsFusex3IT {
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.deletion.delay=0",
"-Ddhfs.objects.ref_verification=true",
"-Ddhfs.objects.write_log=true",
"-Ddhfs.objects.sync.timeout=20",
"-Ddhfs.objects.sync.ping.timeout=20",
"-Ddhfs.objects.reconnect_interval=1s",
@@ -76,9 +77,9 @@ public class DhfsFusex3IT {
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
waitingConsumer1 = new WaitingConsumer();
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
@@ -129,7 +130,7 @@ public class DhfsFusex3IT {
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
Thread.sleep(1000);
Thread.sleep(2000);
}
@AfterEach
@@ -139,26 +140,26 @@ public class DhfsFusex3IT {
@Test
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(2000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
}
@Test
void gccHelloWorldTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /root/dhfs_data/dhfs_fuse_root/hello.c").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_data/dhfs_fuse_root && gcc hello.c").getExitCode());
var helloOut1 = container1.execInContainer("/bin/sh", "-c", "/root/dhfs_data/dhfs_fuse_root/a.out");
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /root/dhfs_default/fuse/hello.c").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && gcc hello.c").getExitCode());
var helloOut1 = container1.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
Log.info(helloOut1);
Assertions.assertEquals(0, helloOut1.getExitCode());
Assertions.assertEquals("hello world", helloOut1.getStdout());
Thread.sleep(1000);
var helloOut2 = container2.execInContainer("/bin/sh", "-c", "/root/dhfs_data/dhfs_fuse_root/a.out");
Thread.sleep(2000);
var helloOut2 = container2.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
Log.info(helloOut2);
Assertions.assertEquals(0, helloOut2.getExitCode());
Assertions.assertEquals("hello world", helloOut2.getStdout());
var helloOut3 = container3.execInContainer("/bin/sh", "-c", "/root/dhfs_data/dhfs_fuse_root/a.out");
var helloOut3 = container3.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
Log.info(helloOut3);
Assertions.assertEquals(0, helloOut3.getExitCode());
Assertions.assertEquals("hello world", helloOut3.getStdout());
@@ -166,10 +167,10 @@ public class DhfsFusex3IT {
@Test
void removeHostTest() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(2000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
var c3curl = container3.execInContainer("/bin/sh", "-c",
"curl --header \"Content-Type: application/json\" " +
@@ -177,19 +178,19 @@ public class DhfsFusex3IT {
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
" http://localhost:8080/objects-manage/known-peers");
Thread.sleep(1000);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Thread.sleep(2000);
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(2000);
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
}
@Test
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container3, "echo test3 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -197,36 +198,36 @@ public class DhfsFusex3IT {
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
Thread.sleep(2000);
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
Assertions.assertTrue(cat.getStdout().contains("test3"));
}
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
}
@Test
void dirConflictTest3() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
Assertions.assertEquals(0, container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
boolean createFail = Stream.of(
Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf"),
Pair.of(container3, "echo test3 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
try {
Log.info("Creating");
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
@@ -235,36 +236,37 @@ public class DhfsFusex3IT {
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
Thread.sleep(1000);
Thread.sleep(2000);
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
Assertions.assertTrue(cat.getStdout().contains("test3"));
}
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
}
@Test
@Disabled
void fileConflictTest2() throws IOException, InterruptedException, TimeoutException {
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
Thread.sleep(1000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
Thread.sleep(2000);
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
boolean writeFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf1"),
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf1"),
Pair.of(container3, "echo test3 >> /root/dhfs_data/dhfs_fuse_root/testf1")).parallel().map(p -> {
boolean writeFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf1"),
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf1"),
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf1")).parallel().map(p -> {
try {
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
} catch (Exception e) {
@@ -272,24 +274,30 @@ public class DhfsFusex3IT {
}
}).anyMatch(r -> r != 0);
Assumptions.assumeTrue(!writeFail, "Failed creating one or more files");
Thread.sleep(1000);
Thread.sleep(2000);
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info("fileConflictTest2: " + ls);
Log.info("fileConflictTest2: " + cat);
}
for (var c : List.of(container1, container2, container3)) {
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
Log.info(ls);
Log.info(cat);
Assertions.assertTrue(cat.getStdout().contains("test1"));
Assertions.assertTrue(cat.getStdout().contains("test2"));
Assertions.assertTrue(cat.getStdout().contains("test3"));
}
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
}
}

View File

@@ -144,7 +144,7 @@ public class HashSetDelayedBlockingQueueTest {
}
});
var thing = queue.getAllWait(); // Theoretically you can get one...
if (thing.size() == 1) thing.add(queue.getAllWait().stream().findFirst());
if (thing.size() == 1) thing.add(queue.getAllWait().stream().findFirst().get());
var gotTime = System.currentTimeMillis();
Assertions.assertIterableEquals(List.of("hello1", "hello2"), thing);
Assertions.assertTrue((gotTime - curTime) >= 1810);

View File

@@ -24,12 +24,15 @@ EXTRAOPTS_PARSED="$(tr '\n\r' ' ' <"$EXTRAOPTS")"
echo "Extra options: $EXTRAOPTS_PARSED"
java \
-Xmx256M -Ddhfs.objects.writeback.limit=134217728 \
-Xmx384M \
-Ddhfs.objects.writeback.limit=134217728 \
-Ddhfs.objects.lru.limit=134217728 \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
-Ddhfs.objects.persistence.files.root="$SCRIPT_DIR"/../data/objects \
-Ddhfs.objects.root="$SCRIPT_DIR"/../data/configs \
-Ddhfs.fuse.root="$SCRIPT_DIR"/../fuse \
-Dquarkus.http.host=0.0.0.0 \
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &

View File

@@ -6,7 +6,7 @@ set -o pipefail
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
# 💀
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -En "s/.*\[{\"id\":([0-9]*).*/\1/p")
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -en "s/.*\[\{\"id\":([0-9]*).*/\1/p")
echo Latest: $LATEST
@@ -24,11 +24,14 @@ echo Downloading...
cd "$SCRIPT_DIR"
rm "Run wrapper.zip" || true
rm "Run wrapper.tar.gz" || true
rm -rf "dhfs" || true
wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
unzip "Run wrapper.zip"
rm "Run wrapper.zip"
tar xvf "Run wrapper.tar.gz"
rm -rf "DHFS Package"
rm -rf "Webui"