mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
Compare commits
27 Commits
8e6ff4ca99
...
d27c51583d
| Author | SHA1 | Date | |
|---|---|---|---|
| d27c51583d | |||
| 1ae12402d0 | |||
| 6143c9afbd | |||
| 8929e10dbf | |||
| 384069260f | |||
| 379f3cc0f5 | |||
| 0f482ff446 | |||
| aea38724ad | |||
| 6ee952163b | |||
| 08e59ff9a3 | |||
| e91f1040e8 | |||
| ab43676927 | |||
| f71a2bef7b | |||
| f7bce3b0e5 | |||
| 316f9a798c | |||
| c2b8890a1a | |||
| 9f38d778e8 | |||
| 2d8744b426 | |||
| 8384cc9efc | |||
| c12e92c52f | |||
| 37722d1604 | |||
| 6596f5a6b8 | |||
| ff529f7f86 | |||
| d85918d8ff | |||
| 2ededf5bb2 | |||
| aebaf6d8fd | |||
| 9cad2c1e28 |
10
.github/workflows/server.yml
vendored
10
.github/workflows/server.yml
vendored
@@ -35,9 +35,12 @@ jobs:
|
||||
distribution: "zulu"
|
||||
cache: maven
|
||||
|
||||
- name: Build and test with Maven
|
||||
- name: Test with Maven
|
||||
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package verify
|
||||
|
||||
- name: Build with Maven
|
||||
run: cd dhfs-parent && mvn --batch-mode --update-snapshots package -Dquarkus.log.category.\"com.usatiuk.dhfs\".min-level=DEBUG
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: DHFS Server Package
|
||||
@@ -207,7 +210,10 @@ jobs:
|
||||
- name: Add version to run wrapper
|
||||
run: echo $GITHUB_RUN_ID > "run-wrapper-out/dhfs/app/"version
|
||||
|
||||
- name: Tar run wrapper
|
||||
run: tar -cvf ~/run-wrapper.tar.gz ./run-wrapper-out
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: Run wrapper
|
||||
path: run-wrapper-out
|
||||
path: ~/run-wrapper.tar.gz
|
||||
|
||||
@@ -115,16 +115,6 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
}
|
||||
HashSet<String> oursNew = new HashSet<>(oursFile.getChunks().values());
|
||||
|
||||
for (var cuuid : oursBackup) {
|
||||
if (!oursNew.contains(cuuid))
|
||||
jObjectManager
|
||||
.get(ChunkInfo.getNameFromHash(cuuid))
|
||||
.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
|
||||
mc.removeRef(oursFile.getName());
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
|
||||
oursFile.setMtime(first.getMtime());
|
||||
oursFile.setCtime(first.getCtime());
|
||||
|
||||
@@ -157,6 +147,16 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
} while (true);
|
||||
|
||||
bumpDir.apply();
|
||||
|
||||
for (var cuuid : oursBackup) {
|
||||
if (!oursNew.contains(cuuid))
|
||||
jObjectManager
|
||||
.get(ChunkInfo.getNameFromHash(cuuid))
|
||||
.ifPresent(jObject -> jObject.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, d, b, v) -> {
|
||||
mc.removeRef(oursFile.getName());
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
m.setChangelog(newChangelog);
|
||||
|
||||
@@ -51,6 +51,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
@ConfigProperty(name = "dhfs.objects.ref_verification")
|
||||
boolean refVerification;
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.write_log")
|
||||
boolean writeLogging;
|
||||
|
||||
@Inject
|
||||
PersistentRemoteHostsService persistentRemoteHostsService;
|
||||
|
||||
@@ -149,7 +152,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
String fname = Path.of(name).getFileName().toString();
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.trace("Creating file " + fuuid);
|
||||
Log.debug("Creating file " + fuuid);
|
||||
File f = new File(fuuid, mode, UUID.fromString(parent.getName()), false);
|
||||
|
||||
if (!parent.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
|
||||
@@ -181,7 +184,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
String dname = Path.of(name).getFileName().toString();
|
||||
|
||||
var duuid = UUID.randomUUID();
|
||||
Log.trace("Creating dir " + duuid);
|
||||
Log.debug("Creating dir " + duuid);
|
||||
Directory ndir = new Directory(duuid, mode); //FIXME:
|
||||
|
||||
if (!found.runWriteLocked(JObject.ResolutionStrategy.REMOTE, (m, d, bump, invalidate) -> {
|
||||
@@ -325,14 +328,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
dentToD.setMtime(System.currentTimeMillis());
|
||||
|
||||
dentFrom.bumpVer();
|
||||
dentFrom.notifyWrite();
|
||||
|
||||
dentTo.bumpVer();
|
||||
dentTo.notifyWrite();
|
||||
} finally {
|
||||
dentFrom.rwUnlock();
|
||||
dentTo.rwUnlock();
|
||||
theFile.notifyWrite();
|
||||
theFile.rwUnlock();
|
||||
}
|
||||
|
||||
@@ -517,6 +516,21 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
if (!(fDataU instanceof File fData))
|
||||
throw new StatusRuntimeException(Status.INVALID_ARGUMENT);
|
||||
|
||||
if (writeLogging) {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("Writing to file: ");
|
||||
sb.append(meta.getName());
|
||||
sb.append(" size=");
|
||||
sb.append(size(fileUuid));
|
||||
sb.append(" ");
|
||||
sb.append(offset);
|
||||
sb.append(" ");
|
||||
sb.append(data.length);
|
||||
sb.append(" ");
|
||||
sb.append(Arrays.toString(data));
|
||||
Log.info(sb.toString());
|
||||
}
|
||||
|
||||
if (size(fileUuid) < offset)
|
||||
truncate(fileUuid, offset);
|
||||
|
||||
@@ -803,7 +817,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
String fname = Path.of(newpath).getFileName().toString();
|
||||
|
||||
var fuuid = UUID.randomUUID();
|
||||
Log.trace("Creating file " + fuuid);
|
||||
Log.debug("Creating file " + fuuid);
|
||||
File f = new File(fuuid, 0, UUID.fromString(parent.getName()), true);
|
||||
|
||||
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
@@ -15,21 +15,71 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
|
||||
public class JObject<T extends JObjectData> implements Serializable, Comparable<JObject<?>> {
|
||||
public class JObject<T extends JObjectData> {
|
||||
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||
private final ObjectMetadata _metaPart;
|
||||
private final JObjectResolver _resolver;
|
||||
private final AtomicReference<T> _dataPart = new AtomicReference<>();
|
||||
private static final int lockTimeoutSecs = 15;
|
||||
|
||||
private class TransactionState {
|
||||
final int dataHash;
|
||||
final int metaHash;
|
||||
final int externalHash;
|
||||
final boolean data;
|
||||
final HashSet<String> oldRefs;
|
||||
|
||||
TransactionState() {
|
||||
this.dataHash = _metaPart.dataHash();
|
||||
this.metaHash = _metaPart.metaHash();
|
||||
this.externalHash = _metaPart.externalHash();
|
||||
this.data = _dataPart.get() != null || hasLocalCopy();
|
||||
|
||||
if (_resolver.refVerification) {
|
||||
tryLocalResolve();
|
||||
if (_dataPart.get() != null)
|
||||
oldRefs = new HashSet<>(_dataPart.get().extractRefs());
|
||||
else
|
||||
oldRefs = null;
|
||||
} else {
|
||||
oldRefs = null;
|
||||
}
|
||||
}
|
||||
|
||||
void commit(boolean forceInvalidate) {
|
||||
_resolver.updateDeletionState(JObject.this);
|
||||
|
||||
var newDataHash = _metaPart.dataHash();
|
||||
var newMetaHash = _metaPart.metaHash();
|
||||
var newExternalHash = _metaPart.externalHash();
|
||||
var newData = _dataPart.get() != null || hasLocalCopy();
|
||||
|
||||
if (_dataPart.get() != null)
|
||||
_metaPart.narrowClass(_dataPart.get().getClass());
|
||||
|
||||
notifyWrite(
|
||||
newMetaHash != metaHash || forceInvalidate,
|
||||
newExternalHash != externalHash || forceInvalidate,
|
||||
newDataHash != dataHash
|
||||
|| newData != data
|
||||
|| forceInvalidate
|
||||
);
|
||||
|
||||
verifyRefs(oldRefs);
|
||||
}
|
||||
}
|
||||
|
||||
private TransactionState _transactionState = null;
|
||||
|
||||
// Create a new object
|
||||
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
|
||||
_resolver = resolver;
|
||||
_metaPart = new ObjectMetadata(name, false, obj.getClass());
|
||||
_metaPart.getHaveLocalCopy().set(true);
|
||||
_metaPart.setHaveLocalCopy(true);
|
||||
_dataPart.set(obj);
|
||||
_metaPart.bumpVersion(selfUuid);
|
||||
Log.trace("new JObject: " + getName());
|
||||
_metaPart.getChangelog().put(selfUuid, 1L);
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("new JObject: " + getName());
|
||||
}
|
||||
|
||||
// Create an object from existing metadata
|
||||
@@ -43,29 +93,11 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
objects.stream().sorted(Comparator.comparingInt(System::identityHashCode)).forEach(JObject::rwLock);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(JObject<?> o) {
|
||||
return getName().compareTo(o.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
JObject<?> jObject = (JObject<?>) o;
|
||||
return Objects.equals(_metaPart.getName(), jObject._metaPart.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(_metaPart.getName());
|
||||
}
|
||||
|
||||
public Class<? extends JObjectData> getKnownClass() {
|
||||
return _metaPart.getKnownClass();
|
||||
}
|
||||
|
||||
public void narrowClass(Class<? extends JObjectData> klass) {
|
||||
protected void narrowClass(Class<? extends JObjectData> klass) {
|
||||
_metaPart.narrowClass(klass);
|
||||
}
|
||||
|
||||
@@ -83,8 +115,8 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
return _metaPart;
|
||||
}
|
||||
|
||||
public boolean hasLocalCopyMd() {
|
||||
return _metaPart.getHaveLocalCopy().get();
|
||||
protected boolean hasLocalCopyMd() {
|
||||
return _metaPart.isHaveLocalCopy();
|
||||
}
|
||||
|
||||
public Class<? extends ConflictResolver> getConflictResolver() {
|
||||
@@ -126,12 +158,12 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
var res = _resolver.resolveDataRemote(this);
|
||||
_metaPart.narrowClass(res.getClass());
|
||||
_dataPart.set(res);
|
||||
_metaPart.getHaveLocalCopy().set(true);
|
||||
_metaPart.setHaveLocalCopy(true);
|
||||
hydrateRefs();
|
||||
verifyRefs();
|
||||
} // _dataPart.get() == null
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
rwUnlock();
|
||||
} // try
|
||||
} // _dataPart.get() == null
|
||||
}
|
||||
@@ -151,7 +183,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
_dataPart.compareAndSet(null, res.get());
|
||||
} // _dataPart.get() == null
|
||||
} finally {
|
||||
_lock.readLock().unlock();
|
||||
rUnlock();
|
||||
} // try
|
||||
} // _dataPart.get() == null
|
||||
}
|
||||
@@ -169,7 +201,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
throw new IllegalStateException("Expected external resolution only for classes with pushResolution " + getName());
|
||||
_metaPart.narrowClass(data.getClass());
|
||||
_dataPart.set(data);
|
||||
_metaPart.getHaveLocalCopy().set(true);
|
||||
_metaPart.setHaveLocalCopy(true);
|
||||
if (!_metaPart.isLocked())
|
||||
_metaPart.lock();
|
||||
hydrateRefs();
|
||||
@@ -195,6 +227,9 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
public void rwLock() {
|
||||
if (!tryRWLock())
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire write lock for " + getName()));
|
||||
if (_lock.writeLock().getHoldCount() == 1) {
|
||||
_transactionState = new TransactionState();
|
||||
}
|
||||
}
|
||||
|
||||
public void rLock() {
|
||||
@@ -202,6 +237,32 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
throw new StatusRuntimeException(Status.UNAVAILABLE.withDescription("Failed to acquire read lock for " + getName()));
|
||||
}
|
||||
|
||||
public void rUnlock() {
|
||||
_lock.readLock().unlock();
|
||||
}
|
||||
|
||||
public void rwUnlock() {
|
||||
rwUnlock(false);
|
||||
}
|
||||
|
||||
public void rwUnlock(boolean forceInvalidate) {
|
||||
try {
|
||||
if (_lock.writeLock().getHoldCount() == 1) {
|
||||
_transactionState.commit(forceInvalidate);
|
||||
_transactionState = null;
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
Log.error("When committing changes to " + getName(), ex);
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void assertRWLock() {
|
||||
if (!_lock.isWriteLockedByCurrentThread())
|
||||
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
public <R> R runReadLocked(ResolutionStrategy resolutionStrategy, ObjectFnRead<T, R> fn) {
|
||||
tryResolve(resolutionStrategy);
|
||||
|
||||
@@ -211,7 +272,7 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
throw new DeletedObjectAccessException();
|
||||
return fn.apply(_metaPart, _dataPart.get());
|
||||
} finally {
|
||||
_lock.readLock().unlock();
|
||||
rUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -231,45 +292,15 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
rwLock();
|
||||
try {
|
||||
tryResolve(resolutionStrategy);
|
||||
|
||||
var dataHash = _metaPart.dataHash();
|
||||
var metaHash = Objects.hash(_metaPart.metaHash(), dataHash);
|
||||
var prevData = _dataPart.get() != null || hasLocalCopy();
|
||||
|
||||
HashSet<String> oldRefs = null;
|
||||
|
||||
if (_resolver.refVerification) {
|
||||
tryLocalResolve();
|
||||
if (_dataPart.get() != null)
|
||||
oldRefs = new HashSet<>(_dataPart.get().extractRefs());
|
||||
}
|
||||
|
||||
VoidFn invalidateFn = () -> {
|
||||
tryLocalResolve();
|
||||
_resolver.backupRefs(this);
|
||||
_dataPart.set(null);
|
||||
_resolver.removeLocal(this, _metaPart.getName());
|
||||
};
|
||||
var ret = fn.apply(_metaPart, _dataPart.get(), this::bumpVer, invalidateFn);
|
||||
_resolver.updateDeletionState(this);
|
||||
|
||||
var newDataHash = _metaPart.dataHash();
|
||||
var newMetaHash = Objects.hash(_metaPart.metaHash(), newDataHash);
|
||||
var newData = _dataPart.get() != null || hasLocalCopy();
|
||||
|
||||
if (_dataPart.get() != null)
|
||||
_metaPart.narrowClass(_dataPart.get().getClass());
|
||||
|
||||
if (!Objects.equals(newMetaHash, metaHash)
|
||||
|| newData != prevData)
|
||||
notifyWriteMeta();
|
||||
if (!Objects.equals(newDataHash, dataHash)
|
||||
|| newData != prevData)
|
||||
notifyWriteData();
|
||||
verifyRefs(oldRefs);
|
||||
return ret;
|
||||
return fn.apply(_metaPart, _dataPart.get(), this::bumpVer, invalidateFn);
|
||||
} finally {
|
||||
_lock.writeLock().unlock();
|
||||
rwUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,20 +311,9 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
return _dataPart.get() != null;
|
||||
}
|
||||
|
||||
public void notifyWriteMeta() {
|
||||
private void notifyWrite(boolean metaChanged, boolean externalChanged, boolean hasDataChanged) {
|
||||
assertRWLock();
|
||||
_resolver.notifyWriteMeta(this);
|
||||
}
|
||||
|
||||
public void notifyWriteData() {
|
||||
assertRWLock();
|
||||
_resolver.notifyWriteData(this);
|
||||
}
|
||||
|
||||
public void notifyWrite() {
|
||||
_resolver.updateDeletionState(this);
|
||||
notifyWriteMeta();
|
||||
notifyWriteData();
|
||||
_resolver.notifyWrite(this, metaChanged, externalChanged, hasDataChanged);
|
||||
}
|
||||
|
||||
public void bumpVer() {
|
||||
@@ -301,23 +321,16 @@ public class JObject<T extends JObjectData> implements Serializable, Comparable<
|
||||
_resolver.bumpVersionSelf(this);
|
||||
}
|
||||
|
||||
public void rwUnlock() {
|
||||
_lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
public void discardData() {
|
||||
assertRWLock();
|
||||
if (!isDeleted())
|
||||
throw new IllegalStateException("Expected to be deleted when discarding data");
|
||||
_dataPart.set(null);
|
||||
_metaPart.setHaveLocalCopy(false);
|
||||
_metaPart.setSavedRefs(Collections.emptySet());
|
||||
}
|
||||
|
||||
public void assertRWLock() {
|
||||
if (!_lock.isWriteLockedByCurrentThread())
|
||||
throw new IllegalStateException("Expected to be write-locked there: " + getName() + " " + Thread.currentThread().getName());
|
||||
}
|
||||
|
||||
public enum ResolutionStrategy {
|
||||
NO_RESOLUTION,
|
||||
LOCAL_ONLY,
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
package com.usatiuk.dhfs.objects.jrepository;
|
||||
|
||||
public record JObjectKey(short type) {
|
||||
}
|
||||
@@ -7,7 +7,7 @@ public interface JObjectManager {
|
||||
|
||||
Optional<JObject<?>> get(String name);
|
||||
|
||||
Collection<JObject<?>> findAll();
|
||||
Collection<String> findAll();
|
||||
|
||||
// Put a new object
|
||||
<T extends JObjectData> JObject<T> put(T object, Optional<String> parent);
|
||||
|
||||
@@ -14,7 +14,7 @@ import jakarta.inject.Inject;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.SoftReference;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Objects;
|
||||
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
|
||||
|
||||
@ApplicationScoped
|
||||
public class JObjectManagerImpl implements JObjectManager {
|
||||
private final ConcurrentHashMap<String, NamedSoftReference> _map = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, NamedWeakReference> _map = new ConcurrentHashMap<>();
|
||||
private final ReferenceQueue<JObject<?>> _refQueue = new ReferenceQueue<>();
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
@@ -56,7 +56,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
private void refCleanupThread() {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
NamedSoftReference cur = (NamedSoftReference) _refQueue.remove();
|
||||
NamedWeakReference cur = (NamedWeakReference) _refQueue.remove();
|
||||
_map.remove(cur._key, cur);
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
@@ -102,7 +102,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
JObject<?> ret = null;
|
||||
var newObj = new JObject<>(jObjectResolver, (ObjectMetadata) meta);
|
||||
while (ret == null) {
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(newObj, _refQueue));
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(newObj, _refQueue));
|
||||
if (ref.get() == null) _map.remove(name, ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
@@ -111,14 +111,12 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<JObject<?>> findAll() {
|
||||
var out = _map.values().stream().map(SoftReference::get)
|
||||
public Collection<String> findAll() {
|
||||
var out = _map.values().stream().map(WeakReference::get)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toCollection((Supplier<LinkedHashSet<JObject<?>>>) LinkedHashSet::new));
|
||||
objectPersistentStore.findAllObjects().stream()
|
||||
.map(f -> get(f).orElse(null))
|
||||
.filter(Objects::nonNull)
|
||||
.forEach(out::add);
|
||||
.map(JObject::getName)
|
||||
.collect(Collectors.toCollection((Supplier<LinkedHashSet<String>>) LinkedHashSet::new));
|
||||
out.addAll(objectPersistentStore.findAllObjects());
|
||||
return out;
|
||||
}
|
||||
|
||||
@@ -126,7 +124,6 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
public <D extends JObjectData> JObject<D> put(D object, Optional<String> parent) {
|
||||
while (true) {
|
||||
JObject<?> ret;
|
||||
boolean created = false;
|
||||
JObject<?> newObj = null;
|
||||
try {
|
||||
ret = getFromMap(object.getName());
|
||||
@@ -138,15 +135,13 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
newObj.rwLock();
|
||||
while (ret == null) {
|
||||
JObject<?> finalNewObj = newObj;
|
||||
var ref = _map.computeIfAbsent(object.getName(), k -> new NamedSoftReference(finalNewObj, _refQueue));
|
||||
var ref = _map.computeIfAbsent(object.getName(), k -> new NamedWeakReference(finalNewObj, _refQueue));
|
||||
if (ref.get() == null) _map.remove(object.getName(), ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
if (ret != newObj) continue;
|
||||
created = true;
|
||||
}
|
||||
JObject<D> finalRet = (JObject<D>) ret;
|
||||
boolean finalCreated = created;
|
||||
ret.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
if (object.getClass().isAnnotationPresent(PushResolution.class)
|
||||
&& object.getClass().isAnnotationPresent(AssumedUnique.class)
|
||||
@@ -162,13 +157,13 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
m.lock();
|
||||
}
|
||||
|
||||
if (finalCreated) finalRet.notifyWrite();// Kind of a hack?
|
||||
return null;
|
||||
});
|
||||
} finally {
|
||||
if (newObj != null) newObj.rwUnlock();
|
||||
if (newObj != null)
|
||||
newObj.rwUnlock(true);
|
||||
}
|
||||
if (!created)
|
||||
if (newObj == null)
|
||||
jObjectLRU.notifyAccess(ret);
|
||||
return (JObject<D>) ret;
|
||||
}
|
||||
@@ -197,7 +192,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
created.rwLock();
|
||||
try {
|
||||
while (ret == null) {
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedSoftReference(created, _refQueue));
|
||||
var ref = _map.computeIfAbsent(name, k -> new NamedWeakReference(created, _refQueue));
|
||||
if (ref.get() == null) _map.remove(name, ref);
|
||||
else ret = ref.get();
|
||||
}
|
||||
@@ -215,11 +210,11 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
}
|
||||
}
|
||||
|
||||
private static class NamedSoftReference extends SoftReference<JObject<?>> {
|
||||
private static class NamedWeakReference extends WeakReference<JObject<?>> {
|
||||
@Getter
|
||||
final String _key;
|
||||
|
||||
public NamedSoftReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
|
||||
public NamedWeakReference(JObject<?> target, ReferenceQueue<JObject<?>> q) {
|
||||
super(target, q);
|
||||
this._key = target.getName();
|
||||
}
|
||||
|
||||
@@ -12,10 +12,7 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Stream;
|
||||
@@ -99,7 +96,7 @@ public class JObjectRefProcessor {
|
||||
}
|
||||
|
||||
if (ok != missing.size()) {
|
||||
Log.trace("Delaying deletion check of " + obj.getName());
|
||||
Log.debug("Delaying deletion check of " + obj.getName());
|
||||
delay = true;
|
||||
}
|
||||
|
||||
@@ -138,6 +135,13 @@ public class JObjectRefProcessor {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void deleteRef(JObject<?> self, String name) {
|
||||
jObjectManager.get(name).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
mc.removeRef(self.getName());
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
|
||||
private void refProcessorThread() {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
@@ -149,38 +153,32 @@ public class JObjectRefProcessor {
|
||||
next = _candidates.get(canDeleteRetryDelay);
|
||||
}
|
||||
|
||||
var got = jObjectManager.get(next);
|
||||
if (got.isEmpty()) continue;
|
||||
var got = jObjectManager.get(next).orElse(null);
|
||||
if (got == null) continue;
|
||||
try {
|
||||
got.get().runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, v, i) -> {
|
||||
got.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, v, i) -> {
|
||||
if (m.isLocked()) return null;
|
||||
if (m.isDeleted()) return null;
|
||||
if (!m.isDeletionCandidate()) return null;
|
||||
if (m.isSeen() && m.getKnownClass().isAnnotationPresent(Movable.class)) {
|
||||
if (!processMovable(got.get()))
|
||||
if (!processMovable(got))
|
||||
return null;
|
||||
}
|
||||
|
||||
got.get().tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
got.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
|
||||
Log.trace("Deleting " + m.getName());
|
||||
Log.debug("Deleting " + m.getName());
|
||||
m.markDeleted();
|
||||
|
||||
Stream<String> refs = Stream.empty();
|
||||
Collection<String> extracted = null;
|
||||
if (got.getData() != null) extracted = got.getData().extractRefs();
|
||||
|
||||
if (m.getSavedRefs() != null)
|
||||
refs = m.getSavedRefs().stream();
|
||||
if (got.get().getData() != null)
|
||||
refs = Streams.concat(refs, got.get().getData().extractRefs().stream());
|
||||
got.discardData();
|
||||
|
||||
got.get().discardData();
|
||||
|
||||
refs.forEach(c -> {
|
||||
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
mc.removeRef(m.getName());
|
||||
return null;
|
||||
}));
|
||||
});
|
||||
if (got.getMeta().getSavedRefs() != null)
|
||||
for (var r : got.getMeta().getSavedRefs()) deleteRef(got, r);
|
||||
if (extracted != null)
|
||||
for (var r : extracted) deleteRef(got, r);
|
||||
|
||||
return null;
|
||||
});
|
||||
|
||||
@@ -16,6 +16,7 @@ import org.apache.commons.collections4.MultiValuedMap;
|
||||
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Optional;
|
||||
@@ -101,7 +102,6 @@ public class JObjectResolver {
|
||||
}
|
||||
}
|
||||
self.getMeta().setSavedRefs(null);
|
||||
notifyWriteMeta(self);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,28 +132,32 @@ public class JObjectResolver {
|
||||
}
|
||||
}
|
||||
|
||||
private void quickDeleteRef(JObject<?> self, String name) {
|
||||
jObjectManager.get(name)
|
||||
.ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
mc.removeRef(self.getName());
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
|
||||
private void tryQuickDelete(JObject<?> self) {
|
||||
self.assertRWLock();
|
||||
self.tryResolve(JObject.ResolutionStrategy.LOCAL_ONLY);
|
||||
|
||||
Log.trace("Quick delete of: " + self.getName());
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Quick delete of: " + self.getName());
|
||||
|
||||
self.getMeta().markDeleted();
|
||||
|
||||
Stream<String> refs = Stream.empty();
|
||||
|
||||
if (self.getMeta().getSavedRefs() != null)
|
||||
refs = self.getMeta().getSavedRefs().stream();
|
||||
if (self.getData() != null)
|
||||
refs = Streams.concat(refs, self.getData().extractRefs().stream());
|
||||
Collection<String> extracted = null;
|
||||
if (self.getData() != null) extracted = self.getData().extractRefs();
|
||||
|
||||
self.discardData();
|
||||
|
||||
refs.forEach(c -> {
|
||||
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
mc.removeRef(self.getName());
|
||||
return null;
|
||||
}));
|
||||
});
|
||||
if (self.getMeta().getSavedRefs() != null)
|
||||
for (var r : self.getMeta().getSavedRefs()) quickDeleteRef(self, r);
|
||||
if (extracted != null)
|
||||
for (var r : extracted) quickDeleteRef(self, r);
|
||||
}
|
||||
|
||||
public <T extends JObjectData> Optional<T> resolveDataLocal(JObject<T> jObject) {
|
||||
@@ -174,8 +178,8 @@ public class JObjectResolver {
|
||||
public void removeLocal(JObject<?> jObject, String name) {
|
||||
jObject.assertRWLock();
|
||||
try {
|
||||
Log.trace("Invalidating " + name);
|
||||
jObject.getMeta().getHaveLocalCopy().set(false);
|
||||
Log.debug("Invalidating " + name);
|
||||
jObject.getMeta().setHaveLocalCopy(false);
|
||||
jObjectWriteback.remove(jObject);
|
||||
objectPersistentStore.deleteObjectData(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
@@ -186,26 +190,25 @@ public class JObjectResolver {
|
||||
}
|
||||
}
|
||||
|
||||
public <T extends JObjectData> void notifyWriteMeta(JObject<T> self) {
|
||||
public <T extends JObjectData> void notifyWrite(JObject<T> self, boolean metaChanged,
|
||||
boolean externalChanged, boolean hasDataChanged) {
|
||||
self.assertRWLock();
|
||||
jObjectWriteback.markDirty(self);
|
||||
for (var t : _metaWriteListeners.keySet()) { // FIXME:?
|
||||
if (t.isAssignableFrom(self.getKnownClass()))
|
||||
for (var cb : _metaWriteListeners.get(t))
|
||||
cb.apply((JObject) self);
|
||||
}
|
||||
}
|
||||
|
||||
public <T extends JObjectData> void notifyWriteData(JObject<T> self) {
|
||||
self.assertRWLock();
|
||||
jObjectWriteback.markDirty(self);
|
||||
if (self.isResolved()) {
|
||||
invalidationQueueService.pushInvalidationToAll(self.getName());
|
||||
if (metaChanged || hasDataChanged)
|
||||
jObjectWriteback.markDirty(self);
|
||||
if (metaChanged)
|
||||
for (var t : _metaWriteListeners.keySet()) { // FIXME:?
|
||||
if (t.isAssignableFrom(self.getKnownClass()))
|
||||
for (var cb : _metaWriteListeners.get(t))
|
||||
cb.apply((JObject) self);
|
||||
}
|
||||
if (hasDataChanged)
|
||||
for (var t : _writeListeners.keySet()) { // FIXME:?
|
||||
if (t.isAssignableFrom(self.getKnownClass()))
|
||||
for (var cb : _writeListeners.get(t))
|
||||
cb.apply((JObject) self);
|
||||
}
|
||||
if (externalChanged) {
|
||||
invalidationQueueService.pushInvalidationToAll(self.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -161,7 +161,7 @@ public class JObjectWriteback {
|
||||
if (!m.isDeletionCandidate())
|
||||
throw new IllegalStateException("Object deleted but not deletable! " + m.getName());
|
||||
// FIXME: assert Rw lock here?
|
||||
Log.trace("Deleting from persistent storage " + m.getName());
|
||||
Log.debug("Deleting from persistent storage " + m.getName());
|
||||
objectPersistentStore.deleteObject(m.getName());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.objects.repository.ObjectHeader;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.vertx.core.impl.ConcurrentHashSet;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
@@ -27,26 +28,30 @@ public class ObjectMetadata implements Serializable {
|
||||
@Getter
|
||||
private final Map<UUID, Long> _remoteCopies = new LinkedHashMap<>();
|
||||
private final AtomicReference<Class<? extends JObjectData>> _knownClass = new AtomicReference<>();
|
||||
private final AtomicBoolean _seen = new AtomicBoolean(false);
|
||||
private final AtomicBoolean _deleted = new AtomicBoolean(false);
|
||||
@Getter
|
||||
private final HashSet<UUID> _confirmedDeletes = new HashSet<>();
|
||||
private final Set<String> _referrers = new ConcurrentSkipListSet<>();
|
||||
private volatile boolean _seen = false;
|
||||
@Getter
|
||||
private volatile boolean _deleted = false;
|
||||
@Getter
|
||||
private final HashSet<UUID> _confirmedDeletes = new LinkedHashSet<>();
|
||||
private final Set<String> _referrers = new LinkedHashSet<>();
|
||||
@Getter
|
||||
@Setter
|
||||
private Map<UUID, Long> _changelog = new LinkedHashMap<>();
|
||||
private Map<UUID, Long> _changelog = new LinkedHashMap<>(4);
|
||||
@Getter
|
||||
@Setter
|
||||
private Set<String> _savedRefs = Collections.emptySet();
|
||||
@Getter
|
||||
private boolean _locked = false;
|
||||
@Getter
|
||||
private AtomicBoolean _haveLocalCopy = new AtomicBoolean(false);
|
||||
private transient AtomicBoolean _written = new AtomicBoolean(true);
|
||||
@Setter
|
||||
private volatile boolean _haveLocalCopy = false;
|
||||
@Getter
|
||||
private transient volatile boolean _written = true;
|
||||
|
||||
public ObjectMetadata(String name, boolean written, Class<? extends JObjectData> knownClass) {
|
||||
_name = name;
|
||||
_written.set(written);
|
||||
_written = written;
|
||||
_knownClass.set(knownClass);
|
||||
}
|
||||
|
||||
@@ -68,37 +73,25 @@ public class ObjectMetadata implements Serializable {
|
||||
@Serial
|
||||
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
|
||||
in.defaultReadObject();
|
||||
_written = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
public boolean isSeen() {
|
||||
return _seen.get();
|
||||
}
|
||||
|
||||
public boolean isDeleted() {
|
||||
return _deleted.get();
|
||||
_written = true;
|
||||
}
|
||||
|
||||
public void markSeen() {
|
||||
Log.trace("Marking seen: " + getName());
|
||||
_seen.set(true);
|
||||
_seen = true;
|
||||
}
|
||||
|
||||
public void markDeleted() {
|
||||
_deleted.set(true);
|
||||
_deleted = true;
|
||||
}
|
||||
|
||||
public void undelete() {
|
||||
_confirmedDeletes.clear();
|
||||
_deleted.set(false);
|
||||
}
|
||||
|
||||
public boolean isWritten() {
|
||||
return _written.get();
|
||||
_deleted = false;
|
||||
}
|
||||
|
||||
public void markWritten() {
|
||||
_written.set(true);
|
||||
_written = true;
|
||||
}
|
||||
|
||||
public boolean isReferred() {
|
||||
@@ -125,7 +118,8 @@ public class ObjectMetadata implements Serializable {
|
||||
public void addRef(String from) {
|
||||
_confirmedDeletes.clear();
|
||||
_referrers.add(from);
|
||||
Log.trace("Adding ref " + from + " to " + getName());
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Adding ref " + from + " to " + getName());
|
||||
}
|
||||
|
||||
public void removeRef(String from) {
|
||||
@@ -133,7 +127,8 @@ public class ObjectMetadata implements Serializable {
|
||||
unlock();
|
||||
Log.error("Object " + getName() + " is locked, but we removed a reference to it, unlocking!");
|
||||
}
|
||||
Log.trace("Removing ref " + from + " from " + getName());
|
||||
if (Log.isTraceEnabled())
|
||||
Log.trace("Removing ref " + from + " from " + getName());
|
||||
_referrers.remove(from);
|
||||
}
|
||||
|
||||
@@ -190,7 +185,24 @@ public class ObjectMetadata implements Serializable {
|
||||
}
|
||||
|
||||
public int metaHash() {
|
||||
return Objects.hash(isSeen(), getKnownClass(), isDeleted(), _referrers, _locked, _remoteCopies, _savedRefs, _haveLocalCopy);
|
||||
int res = Objects.hashCode(_name);
|
||||
res = 31 * res + Objects.hashCode(isSeen());
|
||||
res = 31 * res + Objects.hashCode(getKnownClass());
|
||||
res = 31 * res + Objects.hashCode(isDeleted());
|
||||
res = 31 * res + Objects.hashCode(_confirmedDeletes);
|
||||
res = 31 * res + Objects.hashCode(_referrers);
|
||||
res = 31 * res + Objects.hashCode(_changelog);
|
||||
res = 31 * res + Objects.hashCode(_locked);
|
||||
res = 31 * res + Objects.hashCode(_remoteCopies);
|
||||
res = 31 * res + Objects.hashCode(_savedRefs);
|
||||
res = 31 * res + Objects.hashCode(_haveLocalCopy);
|
||||
return res;
|
||||
}
|
||||
|
||||
public int externalHash() {
|
||||
int res = Objects.hashCode(_changelog);
|
||||
res = 31 * res + Objects.hashCode(_haveLocalCopy);
|
||||
return res;
|
||||
}
|
||||
|
||||
// Not really a hash
|
||||
|
||||
@@ -28,7 +28,7 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
|
||||
.putAllChangelog(object.getChangelog().entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Map.Entry::getValue)))
|
||||
.addAllSavedRefs(object.getSavedRefs() != null ? object.getSavedRefs() : Collections.emptyList())
|
||||
.setLocked(object.isLocked())
|
||||
.setHaveLocalCopy(object.getHaveLocalCopy().get())
|
||||
.setHaveLocalCopy(object.isHaveLocalCopy())
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ public class ObjectMetadataSerializer implements ProtoSerializer<ObjectMetadataP
|
||||
if (message.getLocked())
|
||||
obj.lock();
|
||||
if (message.getHaveLocalCopy())
|
||||
obj.getHaveLocalCopy().set(true);
|
||||
obj.setHaveLocalCopy(true);
|
||||
|
||||
return obj;
|
||||
} catch (ClassNotFoundException cx) {
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.usatiuk.utils.HashSetDelayedBlockingQueue;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import io.vertx.core.impl.ConcurrentHashSet;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
@@ -13,10 +14,14 @@ import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.lang.ref.Reference;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ApplicationScoped
|
||||
public class InvalidationQueueService {
|
||||
@@ -39,7 +44,9 @@ public class InvalidationQueueService {
|
||||
int threads;
|
||||
|
||||
private final HashSetDelayedBlockingQueue<Pair<UUID, String>> _queue;
|
||||
private final AtomicReference<ConcurrentHashSet<String>> _toAllQueue = new AtomicReference<>(new ConcurrentHashSet<>());
|
||||
private ExecutorService _executor;
|
||||
private boolean _shutdown = false;
|
||||
|
||||
public InvalidationQueueService(@ConfigProperty(name = "dhfs.objects.invalidation.delay") int delay) {
|
||||
_queue = new HashSetDelayedBlockingQueue<>(delay);
|
||||
@@ -55,6 +62,7 @@ public class InvalidationQueueService {
|
||||
}
|
||||
|
||||
void shutdown(@Observes @Priority(10) ShutdownEvent event) throws InterruptedException {
|
||||
_shutdown = true;
|
||||
_executor.shutdownNow();
|
||||
if (!_executor.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||
Log.error("Failed to shut down invalidation sender thread");
|
||||
@@ -62,10 +70,35 @@ public class InvalidationQueueService {
|
||||
}
|
||||
|
||||
private void sender() {
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
while (!_shutdown) {
|
||||
try {
|
||||
try {
|
||||
var data = _queue.getAllWait(100); // TODO: config?
|
||||
if (!_queue.hasImmediate()) {
|
||||
ConcurrentHashSet<String> toAllQueue;
|
||||
|
||||
while (true) {
|
||||
toAllQueue = _toAllQueue.get();
|
||||
if (toAllQueue != null) {
|
||||
if (_toAllQueue.compareAndSet(toAllQueue, null))
|
||||
break;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (toAllQueue != null) {
|
||||
var hostInfo = remoteHostManager.getHostStateSnapshot();
|
||||
for (var o : toAllQueue) {
|
||||
for (var h : hostInfo.available())
|
||||
_queue.add(Pair.of(h, o));
|
||||
for (var u : hostInfo.unavailable())
|
||||
deferredInvalidationQueueService.defer(u, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var data = _queue.getAllWait(100, _queue.getDelay()); // TODO: config?
|
||||
if (data.isEmpty()) continue;
|
||||
String stats = "Sent invalidation: ";
|
||||
long success = 0;
|
||||
|
||||
@@ -87,7 +120,7 @@ public class InvalidationQueueService {
|
||||
Log.info("Failed to send invalidation to " + e.getLeft() + ", will retry", ex);
|
||||
pushInvalidationToOne(e.getLeft(), e.getRight());
|
||||
}
|
||||
if (Thread.interrupted()) {
|
||||
if (_shutdown) {
|
||||
Log.info("Invalidation sender exiting");
|
||||
break;
|
||||
}
|
||||
@@ -100,8 +133,8 @@ public class InvalidationQueueService {
|
||||
} catch (Exception e) {
|
||||
Log.error("Exception in invalidation sender thread: ", e);
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
Log.info("Invalidation sender exiting");
|
||||
var data = _queue.close();
|
||||
@@ -111,12 +144,18 @@ public class InvalidationQueueService {
|
||||
}
|
||||
|
||||
public void pushInvalidationToAll(String name) {
|
||||
var hosts = remoteHostManager.getAvailableHosts();
|
||||
for (var h : hosts)
|
||||
_queue.add(Pair.of(h, name));
|
||||
var unavailable = remoteHostManager.getUnavailableHosts();
|
||||
for (var u : unavailable)
|
||||
deferredInvalidationQueueService.defer(u, name);
|
||||
while (true) {
|
||||
var queue = _toAllQueue.get();
|
||||
if (queue == null) {
|
||||
var nq = new ConcurrentHashSet<String>();
|
||||
if (!_toAllQueue.compareAndSet(null, nq)) continue;
|
||||
queue = nq;
|
||||
}
|
||||
|
||||
queue.add(name);
|
||||
|
||||
if (_toAllQueue.get() == queue) break;
|
||||
}
|
||||
}
|
||||
|
||||
public void pushInvalidationToOne(UUID host, String name) {
|
||||
|
||||
@@ -63,7 +63,7 @@ public class RemoteHostManager {
|
||||
.<Callable<Void>>map(host -> () -> {
|
||||
try {
|
||||
if (isReachable(host))
|
||||
Log.trace("Heartbeat: " + host);
|
||||
Log.debug("Heartbeat: " + host);
|
||||
else
|
||||
Log.info("Trying to connect to " + host);
|
||||
if (pingCheck(host))
|
||||
@@ -173,6 +173,25 @@ public class RemoteHostManager {
|
||||
.map(Map.Entry::getKey).toList());
|
||||
}
|
||||
|
||||
public record HostStateSnapshot(List<UUID> available, List<UUID> unavailable) {
|
||||
}
|
||||
|
||||
public HostStateSnapshot getHostStateSnapshot() {
|
||||
ArrayList<UUID> available = new ArrayList<>();
|
||||
ArrayList<UUID> unavailable = new ArrayList<>();
|
||||
_transientPeersState.runReadLocked(d -> {
|
||||
for (var v : d.getStates().entrySet()) {
|
||||
if (v.getValue().isReachable())
|
||||
available.add(v.getKey());
|
||||
else
|
||||
unavailable.add(v.getKey());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
);
|
||||
return new HostStateSnapshot(available, unavailable);
|
||||
}
|
||||
|
||||
public void notifyAddr(UUID host, String addr, Integer port, Integer securePort) {
|
||||
if (host.equals(persistentRemoteHostsService.getSelfUuid())) {
|
||||
return;
|
||||
@@ -187,7 +206,7 @@ public class RemoteHostManager {
|
||||
var prev = _seenHostsButNotAdded.put(host, state);
|
||||
// Needed for tests
|
||||
if (prev == null)
|
||||
Log.trace("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
|
||||
Log.debug("Ignoring new address from unknown host " + ": addr=" + addr + " port=" + port);
|
||||
return;
|
||||
} else {
|
||||
_seenHostsButNotAdded.remove(host);
|
||||
|
||||
@@ -123,8 +123,8 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
var reqUuid = UUID.fromString(request.getSelfUuid());
|
||||
|
||||
for (var obj : objs) {
|
||||
Log.trace("GI: " + obj.getName() + " to " + reqUuid);
|
||||
invalidationQueueService.pushInvalidationToOne(reqUuid, obj.getName());
|
||||
Log.trace("GI: " + obj + " to " + reqUuid);
|
||||
invalidationQueueService.pushInvalidationToOne(reqUuid, obj);
|
||||
}
|
||||
|
||||
return Uni.createFrom().item(GetIndexReply.getDefaultInstance());
|
||||
|
||||
@@ -37,7 +37,7 @@ public class RpcClientFactory {
|
||||
return withObjSyncClient(target, fn);
|
||||
} catch (StatusRuntimeException e) {
|
||||
if (e.getStatus().getCode().equals(Status.UNAVAILABLE.getCode()))
|
||||
Log.trace("Host " + target + " is unreachable: " + e.getMessage());
|
||||
Log.debug("Host " + target + " is unreachable: " + e.getMessage());
|
||||
else
|
||||
Log.warn("When calling " + target + " " + e.getMessage());
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -44,8 +44,8 @@ public class SyncHandler {
|
||||
var objs = jObjectManager.findAll();
|
||||
|
||||
for (var obj : objs) {
|
||||
Log.trace("IS: " + obj.getName() + " to " + host);
|
||||
invalidationQueueService.pushInvalidationToOne(host, obj.getName());
|
||||
Log.trace("IS: " + obj + " to " + host);
|
||||
invalidationQueueService.pushInvalidationToOne(host, obj);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -49,8 +49,9 @@ public class AutoSyncProcessor {
|
||||
if (downloadAll)
|
||||
executorService.submit(() -> {
|
||||
for (var obj : jObjectManager.findAll()) {
|
||||
if (!obj.hasLocalCopy())
|
||||
add(obj.getName());
|
||||
var got = jObjectManager.get(obj);
|
||||
if (got.isEmpty() || !got.get().hasLocalCopy())
|
||||
add(obj);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects.repository.persistence;
|
||||
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.UnsafeByteOperations;
|
||||
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
|
||||
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
|
||||
@@ -107,9 +108,9 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
private <T extends Message> T readObjectImpl(T defaultInstance, Path path) {
|
||||
try (var fsb = new FileInputStream(path.toFile());
|
||||
var fs = new BufferedInputStream(fsb, 1048576)) {
|
||||
return (T) defaultInstance.getParserForType().parseFrom(fs);
|
||||
try (var fsb = new FileInputStream(path.toFile())) {
|
||||
var file = fsb.readAllBytes();
|
||||
return (T) defaultInstance.getParserForType().parseFrom(UnsafeByteOperations.unsafeWrap(file));
|
||||
} catch (FileNotFoundException | NoSuchFileException fx) {
|
||||
throw new StatusRuntimeExceptionNoStacktrace(Status.NOT_FOUND);
|
||||
} catch (IOException e) {
|
||||
@@ -133,8 +134,8 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
private void writeObjectImpl(Path path, Message data) {
|
||||
try {
|
||||
try (var fsb = new FileOutputStream(path.toFile(), false);
|
||||
var fs = new BufferedOutputStream(fsb, 1048576)) {
|
||||
data.writeTo(fs);
|
||||
var buf = new BufferedOutputStream(fsb, Math.min(65536, data.getSerializedSize()))) {
|
||||
data.writeTo(buf);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing file " + path, e);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.usatiuk.utils;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -30,6 +31,7 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
|
||||
private final LinkedHashMap<SetElement, SetElement> _set = new LinkedHashMap<>();
|
||||
private final LinkedHashSet<Thread> _waiting = new LinkedHashSet<>();
|
||||
@Getter
|
||||
private final long _delay;
|
||||
private boolean _closed = false;
|
||||
|
||||
@@ -57,8 +59,7 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
synchronized (this) {
|
||||
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
|
||||
var sel = new SetElement(el, System.currentTimeMillis());
|
||||
SetElement contains = _set.remove(sel);
|
||||
_set.put(sel, sel);
|
||||
SetElement contains = _set.putLast(sel, sel);
|
||||
this.notify();
|
||||
if (contains != null)
|
||||
return contains._el;
|
||||
@@ -122,6 +123,17 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasImmediate() {
|
||||
synchronized (this) {
|
||||
if (_set.isEmpty()) return false;
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
var first = _set.firstEntry().getValue()._time;
|
||||
return first + _delay <= curTime;
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public T tryGet() throws InterruptedException {
|
||||
synchronized (this) {
|
||||
@@ -161,19 +173,34 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
}
|
||||
|
||||
public Collection<T> getAllWait() throws InterruptedException {
|
||||
return getAllWait(Integer.MAX_VALUE);
|
||||
return getAllWait(Integer.MAX_VALUE, -1);
|
||||
}
|
||||
|
||||
public Collection<T> getAllWait(int max) throws InterruptedException {
|
||||
return getAllWait(max, -1);
|
||||
}
|
||||
|
||||
public Collection<T> getAllWait(int max, long timeout) throws InterruptedException {
|
||||
ArrayList<T> out = new ArrayList<>();
|
||||
long startedWaiting = timeout > 0 ? System.currentTimeMillis() : -1;
|
||||
try {
|
||||
synchronized (this) {
|
||||
_waiting.add(Thread.currentThread());
|
||||
}
|
||||
while (!Thread.interrupted()) {
|
||||
if (timeout > 0)
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout)) return out;
|
||||
long sleep = 0;
|
||||
synchronized (this) {
|
||||
while (_set.isEmpty()) this.wait();
|
||||
while (_set.isEmpty()) {
|
||||
if (timeout > 0) {
|
||||
this.wait(timeout);
|
||||
if (System.currentTimeMillis() > (startedWaiting + timeout))
|
||||
return out;
|
||||
} else {
|
||||
this.wait();
|
||||
}
|
||||
}
|
||||
|
||||
var curTime = System.currentTimeMillis();
|
||||
|
||||
@@ -187,6 +214,13 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (timeout > 0) {
|
||||
var cur = System.currentTimeMillis();
|
||||
if (cur > (startedWaiting + timeout)) return out;
|
||||
sleep = Math.min(sleep, (startedWaiting + timeout) - cur);
|
||||
}
|
||||
|
||||
if (sleep > 0)
|
||||
Thread.sleep(sleep);
|
||||
else
|
||||
@@ -198,7 +232,7 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
_waiting.remove(Thread.currentThread());
|
||||
}
|
||||
}
|
||||
throw new InterruptedException();
|
||||
return out;
|
||||
}
|
||||
|
||||
public void interrupt() {
|
||||
@@ -206,5 +240,4 @@ public class HashSetDelayedBlockingQueue<T> {
|
||||
for (var t : _waiting) t.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
quarkus.grpc.server.use-separate-server=false
|
||||
dhfs.objects.persistence.files.root=${HOME}/dhfs_data/dhfs_root
|
||||
dhfs.objects.root=${HOME}/dhfs_data/dhfs_root_d
|
||||
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
|
||||
dhfs.objects.root=${HOME}/dhfs_default/data/stuff
|
||||
dhfs.objects.peerdiscovery.port=42069
|
||||
dhfs.objects.peerdiscovery.interval=5000
|
||||
dhfs.objects.sync.timeout=30
|
||||
@@ -8,7 +8,8 @@ dhfs.objects.sync.ping.timeout=5
|
||||
dhfs.objects.invalidation.threads=4
|
||||
dhfs.objects.invalidation.delay=1000
|
||||
dhfs.objects.reconnect_interval=5s
|
||||
dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
|
||||
dhfs.objects.write_log=false
|
||||
dhfs.fuse.root=${HOME}/dhfs_default/fuse
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.files.target_chunk_size=524288
|
||||
|
||||
@@ -80,13 +80,13 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
var fo = jObjectManager.put(f, Optional.empty());
|
||||
|
||||
var all = jObjectManager.findAll();
|
||||
Assertions.assertTrue(all.contains(c1o));
|
||||
Assertions.assertTrue(all.contains(c2o));
|
||||
Assertions.assertTrue(all.contains(c3o));
|
||||
Assertions.assertTrue(all.contains(c1io));
|
||||
Assertions.assertTrue(all.contains(c2io));
|
||||
Assertions.assertTrue(all.contains(c3io));
|
||||
Assertions.assertTrue(all.contains(fo));
|
||||
Assertions.assertTrue(all.contains(c1o.getName()));
|
||||
Assertions.assertTrue(all.contains(c2o.getName()));
|
||||
Assertions.assertTrue(all.contains(c3o.getName()));
|
||||
Assertions.assertTrue(all.contains(c1io.getName()));
|
||||
Assertions.assertTrue(all.contains(c2io.getName()));
|
||||
Assertions.assertTrue(all.contains(c3io.getName()));
|
||||
Assertions.assertTrue(all.contains(fo.getName()));
|
||||
}
|
||||
|
||||
String all = "1234567891011";
|
||||
@@ -224,7 +224,7 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
|
||||
@Test
|
||||
void moveTest2() throws InterruptedException {
|
||||
var ret = fileService.create("/moveTest", 777);
|
||||
var ret = fileService.create("/moveTest2", 777);
|
||||
Assertions.assertTrue(ret.isPresent());
|
||||
var uuid = ret.get();
|
||||
|
||||
@@ -240,14 +240,14 @@ public class DhfsFileServiceSimpleTestImpl {
|
||||
return null;
|
||||
});
|
||||
|
||||
Assertions.assertTrue(fileService.rename("/moveTest", "/movedTest"));
|
||||
Assertions.assertFalse(fileService.open("/moveTest").isPresent());
|
||||
Assertions.assertTrue(fileService.open("/movedTest").isPresent());
|
||||
Assertions.assertTrue(fileService.rename("/moveTest2", "/movedTest2"));
|
||||
Assertions.assertFalse(fileService.open("/moveTest2").isPresent());
|
||||
Assertions.assertTrue(fileService.open("/movedTest2").isPresent());
|
||||
|
||||
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
fileService.read(fileService.open("/movedTest").get(), 0, 10).get().toByteArray());
|
||||
fileService.read(fileService.open("/movedTest2").get(), 0, 10).get().toByteArray());
|
||||
|
||||
var newfile = fileService.open("/movedTest").get();
|
||||
var newfile = fileService.open("/movedTest2").get();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ public class DhfsFuseIT {
|
||||
"-Ddhfs.objects.invalidation.delay=100",
|
||||
"-Ddhfs.objects.ref_verification=true",
|
||||
"-Ddhfs.objects.deletion.delay=0",
|
||||
"-Ddhfs.objects.write_log=true",
|
||||
"-Ddhfs.objects.sync.timeout=20",
|
||||
"-Ddhfs.objects.sync.ping.timeout=20",
|
||||
"-Ddhfs.objects.reconnect_interval=1s",
|
||||
@@ -74,8 +75,8 @@ public class DhfsFuseIT {
|
||||
var loggingConsumer2 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFuseIT.class)).withPrefix("2-" + testInfo.getDisplayName());
|
||||
container2.followOutput(loggingConsumer2.andThen(waitingConsumer2));
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c1uuid));
|
||||
Assertions.assertDoesNotThrow(() -> UUID.fromString(c2uuid));
|
||||
@@ -108,80 +109,80 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWriteRewriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void createDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_data/dhfs_fuse_root/testf2").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo newfile > /root/dhfs_default/fuse/testf2").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("newfile\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
|
||||
Assertions.assertEquals("newfile\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
|
||||
Assertions.assertEquals("newfile\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout());
|
||||
Assertions.assertEquals("newfile\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeRewriteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteDelayedTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
var client = DockerClientFactory.instance().client();
|
||||
client.pauseContainerCmd(container2.getContainerId()).exec();
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Delaying deletion check"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
client.unpauseContainerCmd(container2.getContainerId()).exec();
|
||||
@@ -189,86 +190,86 @@ public class DhfsFuseIT {
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void deleteTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "rm /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 1);
|
||||
|
||||
Thread.sleep(500);
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Thread.sleep(500);
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Deleting from persistent"), 60, TimeUnit.SECONDS, 3);
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(1, container2.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Assertions.assertEquals(1, container1.execInContainer("/bin/sh", "-c", "test -f /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Listing");
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Moving");
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_data/dhfs_fuse_root/testf1 /root/dhfs_data/dhfs_fuse_root/testf2").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testf1 /root/dhfs_default/fuse/testf2").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Listing");
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Reading");
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf2").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf2").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void moveDirTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Log.info("Creating");
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_data/dhfs_fuse_root/testdir").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testdir/testf1").getExitCode());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testdir/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testdir/testf1").getExitCode());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir/testf1").getStdout());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Listing");
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Moving");
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_data/dhfs_fuse_root/testdir2").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_data/dhfs_fuse_root/testdir /root/dhfs_data/dhfs_fuse_root/testdir2/testdirm").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mkdir /root/dhfs_default/fuse/testdir2").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "mv /root/dhfs_default/fuse/testdir /root/dhfs_default/fuse/testdir2/testdirm").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Listing");
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root/").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse/").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Log.info("Reading");
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testdir2/testdirm/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testdir2/testdirm/testf1").getStdout());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void removeAddHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
var c2curl = container2.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
@@ -276,9 +277,9 @@ public class DhfsFuseIT {
|
||||
" --data '{\"uuid\":\"" + c1uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Lost connection to"), 60, TimeUnit.SECONDS);
|
||||
|
||||
@@ -291,17 +292,17 @@ public class DhfsFuseIT {
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
@@ -310,8 +311,8 @@ public class DhfsFuseIT {
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
@@ -321,10 +322,10 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void dirConflictTest2() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
@@ -333,8 +334,8 @@ public class DhfsFuseIT {
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
@@ -343,8 +344,8 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void dirConflictTest3() throws IOException, InterruptedException, TimeoutException {
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
@@ -353,8 +354,8 @@ public class DhfsFuseIT {
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
@@ -363,12 +364,12 @@ public class DhfsFuseIT {
|
||||
|
||||
@Test
|
||||
void dirConflictTest4() throws IOException, InterruptedException, TimeoutException {
|
||||
boolean createdOk = (container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf").getExitCode() == 0)
|
||||
&& (container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf").getExitCode() == 0);
|
||||
boolean createdOk = (container1.execInContainer("/bin/sh", "-c", "echo test1 >> /root/dhfs_default/fuse/testf").getExitCode() == 0)
|
||||
&& (container2.execInContainer("/bin/sh", "-c", "echo test2 >> /root/dhfs_default/fuse/testf").getExitCode() == 0);
|
||||
Assumptions.assumeTrue(createdOk, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
|
||||
@@ -53,6 +53,7 @@ public class DhfsFusex3IT {
|
||||
"-Ddhfs.objects.invalidation.delay=100",
|
||||
"-Ddhfs.objects.deletion.delay=0",
|
||||
"-Ddhfs.objects.ref_verification=true",
|
||||
"-Ddhfs.objects.write_log=true",
|
||||
"-Ddhfs.objects.sync.timeout=20",
|
||||
"-Ddhfs.objects.sync.ping.timeout=20",
|
||||
"-Ddhfs.objects.reconnect_interval=1s",
|
||||
@@ -76,9 +77,9 @@ public class DhfsFusex3IT {
|
||||
|
||||
Stream.of(container1, container2, container3).parallel().forEach(GenericContainer::start);
|
||||
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_root_d/self_uuid").getStdout();
|
||||
c1uuid = container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c2uuid = container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
c3uuid = container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/data/stuff/self_uuid").getStdout();
|
||||
|
||||
waitingConsumer1 = new WaitingConsumer();
|
||||
var loggingConsumer1 = new Slf4jLogConsumer(LoggerFactory.getLogger(DhfsFusex3IT.class))
|
||||
@@ -129,7 +130,7 @@ public class DhfsFusex3IT {
|
||||
waitingConsumer2.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
waitingConsumer1.waitUntil(frame -> frame.getUtf8String().contains("Connected"), 60, TimeUnit.SECONDS, 2);
|
||||
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
@@ -139,26 +140,26 @@ public class DhfsFusex3IT {
|
||||
|
||||
@Test
|
||||
void readWriteFileTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void gccHelloWorldTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /root/dhfs_data/dhfs_fuse_root/hello.c").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_data/dhfs_fuse_root && gcc hello.c").getExitCode());
|
||||
var helloOut1 = container1.execInContainer("/bin/sh", "-c", "/root/dhfs_data/dhfs_fuse_root/a.out");
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo '#include<stdio.h>\nint main(){printf(\"hello world\"); return 0;}' > /root/dhfs_default/fuse/hello.c").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "cd /root/dhfs_default/fuse && gcc hello.c").getExitCode());
|
||||
var helloOut1 = container1.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
|
||||
Log.info(helloOut1);
|
||||
Assertions.assertEquals(0, helloOut1.getExitCode());
|
||||
Assertions.assertEquals("hello world", helloOut1.getStdout());
|
||||
Thread.sleep(1000);
|
||||
var helloOut2 = container2.execInContainer("/bin/sh", "-c", "/root/dhfs_data/dhfs_fuse_root/a.out");
|
||||
Thread.sleep(2000);
|
||||
var helloOut2 = container2.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
|
||||
Log.info(helloOut2);
|
||||
Assertions.assertEquals(0, helloOut2.getExitCode());
|
||||
Assertions.assertEquals("hello world", helloOut2.getStdout());
|
||||
var helloOut3 = container3.execInContainer("/bin/sh", "-c", "/root/dhfs_data/dhfs_fuse_root/a.out");
|
||||
var helloOut3 = container3.execInContainer("/bin/sh", "-c", "/root/dhfs_default/fuse/a.out");
|
||||
Log.info(helloOut3);
|
||||
Assertions.assertEquals(0, helloOut3.getExitCode());
|
||||
Assertions.assertEquals("hello world", helloOut3.getStdout());
|
||||
@@ -166,10 +167,10 @@ public class DhfsFusex3IT {
|
||||
|
||||
@Test
|
||||
void removeHostTest() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
var c3curl = container3.execInContainer("/bin/sh", "-c",
|
||||
"curl --header \"Content-Type: application/json\" " +
|
||||
@@ -177,19 +178,19 @@ public class DhfsFusex3IT {
|
||||
" --data '{\"uuid\":\"" + c2uuid + "\"}' " +
|
||||
" http://localhost:8080/objects-manage/known-peers");
|
||||
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "echo rewritten > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("rewritten\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
void dirConflictTest() throws IOException, InterruptedException, TimeoutException {
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container3, "echo test3 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
|
||||
boolean createFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
@@ -197,36 +198,36 @@ public class DhfsFusex3IT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2000);
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
Assertions.assertTrue(cat.getStdout().contains("test2"));
|
||||
Assertions.assertTrue(cat.getStdout().contains("test3"));
|
||||
}
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void dirConflictTest3() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getExitCode());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Assertions.assertEquals(0, container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
Assertions.assertEquals(0, container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getExitCode());
|
||||
boolean createFail = Stream.of(
|
||||
Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf"),
|
||||
Pair.of(container3, "echo test3 >> /root/dhfs_data/dhfs_fuse_root/testf")).parallel().map(p -> {
|
||||
Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf"),
|
||||
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf")).parallel().map(p -> {
|
||||
try {
|
||||
Log.info("Creating");
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
@@ -235,36 +236,37 @@ public class DhfsFusex3IT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!createFail, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2000);
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
Assertions.assertTrue(cat.getStdout().contains("test2"));
|
||||
Assertions.assertTrue(cat.getStdout().contains("test3"));
|
||||
}
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
void fileConflictTest2() throws IOException, InterruptedException, TimeoutException {
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_data/dhfs_fuse_root/testf1").getExitCode());
|
||||
Thread.sleep(1000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/testf1").getStdout());
|
||||
Assertions.assertEquals(0, container1.execInContainer("/bin/sh", "-c", "echo tesempty > /root/dhfs_default/fuse/testf1").getExitCode());
|
||||
Thread.sleep(2000);
|
||||
Assertions.assertEquals("tesempty\n", container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
Assertions.assertEquals("tesempty\n", container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/testf1").getStdout());
|
||||
|
||||
boolean writeFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_data/dhfs_fuse_root/testf1"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_data/dhfs_fuse_root/testf1"),
|
||||
Pair.of(container3, "echo test3 >> /root/dhfs_data/dhfs_fuse_root/testf1")).parallel().map(p -> {
|
||||
boolean writeFail = Stream.of(Pair.of(container1, "echo test1 >> /root/dhfs_default/fuse/testf1"),
|
||||
Pair.of(container2, "echo test2 >> /root/dhfs_default/fuse/testf1"),
|
||||
Pair.of(container3, "echo test3 >> /root/dhfs_default/fuse/testf1")).parallel().map(p -> {
|
||||
try {
|
||||
return p.getLeft().execInContainer("/bin/sh", "-c", p.getRight()).getExitCode();
|
||||
} catch (Exception e) {
|
||||
@@ -272,24 +274,30 @@ public class DhfsFusex3IT {
|
||||
}
|
||||
}).anyMatch(r -> r != 0);
|
||||
Assumptions.assumeTrue(!writeFail, "Failed creating one or more files");
|
||||
Thread.sleep(1000);
|
||||
Thread.sleep(2000);
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*");
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info("fileConflictTest2: " + ls);
|
||||
Log.info("fileConflictTest2: " + cat);
|
||||
}
|
||||
for (var c : List.of(container1, container2, container3)) {
|
||||
var ls = c.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse");
|
||||
var cat = c.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*");
|
||||
Log.info(ls);
|
||||
Log.info(cat);
|
||||
Assertions.assertTrue(cat.getStdout().contains("test1"));
|
||||
Assertions.assertTrue(cat.getStdout().contains("test2"));
|
||||
Assertions.assertTrue(cat.getStdout().contains("test3"));
|
||||
}
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_data/dhfs_fuse_root").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_data/dhfs_fuse_root/*").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "ls /root/dhfs_default/fuse").getStdout());
|
||||
Assertions.assertEquals(container1.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
Assertions.assertEquals(container3.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout(),
|
||||
container2.execInContainer("/bin/sh", "-c", "cat /root/dhfs_default/fuse/*").getStdout());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -144,7 +144,7 @@ public class HashSetDelayedBlockingQueueTest {
|
||||
}
|
||||
});
|
||||
var thing = queue.getAllWait(); // Theoretically you can get one...
|
||||
if (thing.size() == 1) thing.add(queue.getAllWait().stream().findFirst());
|
||||
if (thing.size() == 1) thing.add(queue.getAllWait().stream().findFirst().get());
|
||||
var gotTime = System.currentTimeMillis();
|
||||
Assertions.assertIterableEquals(List.of("hello1", "hello2"), thing);
|
||||
Assertions.assertTrue((gotTime - curTime) >= 1810);
|
||||
|
||||
@@ -24,12 +24,15 @@ EXTRAOPTS_PARSED="$(tr '\n\r' ' ' <"$EXTRAOPTS")"
|
||||
echo "Extra options: $EXTRAOPTS_PARSED"
|
||||
|
||||
java \
|
||||
-Xmx256M -Ddhfs.objects.writeback.limit=134217728 \
|
||||
-Xmx384M \
|
||||
-Ddhfs.objects.writeback.limit=134217728 \
|
||||
-Ddhfs.objects.lru.limit=134217728 \
|
||||
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \
|
||||
-Ddhfs.objects.persistence.files.root="$SCRIPT_DIR"/../data/objects \
|
||||
-Ddhfs.objects.root="$SCRIPT_DIR"/../data/configs \
|
||||
-Ddhfs.fuse.root="$SCRIPT_DIR"/../fuse \
|
||||
-Dquarkus.http.host=0.0.0.0 \
|
||||
-Dquarkus.log.category.\"com.usatiuk.dhfs\".level=INFO \
|
||||
-Ddhfs.webui.root="$SCRIPT_DIR"/Webui $EXTRAOPTS_PARSED \
|
||||
-jar "$SCRIPT_DIR"/"DHFS Package"/quarkus-run.jar >quarkus.log 2>&1 &
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ set -o pipefail
|
||||
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
|
||||
|
||||
# 💀
|
||||
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -En "s/.*\[{\"id\":([0-9]*).*/\1/p")
|
||||
LATEST=$(curl "https://api.github.com/repos/usatiuk/dhfs/actions/runs?branch=main&status=completed&per_page=1" | tr -d "[:space:]" | sed -en "s/.*\[\{\"id\":([0-9]*).*/\1/p")
|
||||
|
||||
echo Latest: $LATEST
|
||||
|
||||
@@ -24,11 +24,14 @@ echo Downloading...
|
||||
cd "$SCRIPT_DIR"
|
||||
|
||||
rm "Run wrapper.zip" || true
|
||||
rm "Run wrapper.tar.gz" || true
|
||||
rm -rf "dhfs" || true
|
||||
|
||||
wget https://nightly.link/usatiuk/dhfs/actions/runs/$LATEST/Run%20wrapper.zip
|
||||
|
||||
unzip "Run wrapper.zip"
|
||||
rm "Run wrapper.zip"
|
||||
tar xvf "Run wrapper.tar.gz"
|
||||
|
||||
rm -rf "DHFS Package"
|
||||
rm -rf "Webui"
|
||||
|
||||
Reference in New Issue
Block a user