mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-29 04:57:48 +01:00
some writeback tweaks
This commit is contained in:
@@ -82,7 +82,7 @@ public class DirectoryConflictResolver implements ConflictResolver {
|
||||
}
|
||||
}
|
||||
|
||||
newMetadata = new ObjectMetadata(ours.getName());
|
||||
newMetadata = new ObjectMetadata(ours.getName(), true);
|
||||
|
||||
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
||||
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
||||
|
||||
@@ -85,7 +85,7 @@ public class FileConflictResolver implements ConflictResolver {
|
||||
otherHostname = persistentRemoteHostsService.getSelfUuid();
|
||||
}
|
||||
|
||||
newMetadata = new ObjectMetadata(ours.getName());
|
||||
newMetadata = new ObjectMetadata(ours.getName(), true);
|
||||
|
||||
for (var entry : oursHeader.getChangelog().getEntriesList()) {
|
||||
newMetadata.getChangelog().put(UUID.fromString(entry.getHost()), entry.getVersion());
|
||||
|
||||
@@ -21,7 +21,7 @@ public class JObject<T extends JObjectData> implements Serializable {
|
||||
// Create a new object
|
||||
protected JObject(JObjectResolver resolver, String name, UUID selfUuid, T obj) {
|
||||
_resolver = resolver;
|
||||
_metaPart = new ObjectMetadata(name);
|
||||
_metaPart = new ObjectMetadata(name, false);
|
||||
_dataPart.set(obj);
|
||||
// FIXME:?
|
||||
if (!obj.assumeUnique())
|
||||
|
||||
@@ -187,7 +187,7 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
if (objectPersistentStore.existsObject("meta_" + name))
|
||||
continue;
|
||||
|
||||
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name));
|
||||
var created = new JObject<>(jObjectResolver, new ObjectMetadata(name, false));
|
||||
_map.put(name, new NamedSoftReference(created, _refQueue));
|
||||
created.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (m, d, b, i) -> {
|
||||
parent.ifPresent(m::addRef);
|
||||
@@ -222,10 +222,11 @@ public class JObjectManagerImpl implements JObjectManager {
|
||||
refs = Streams.concat(refs, object.getData().extractRefs().stream());
|
||||
|
||||
object.discardData();
|
||||
jObjectWriteback.hintDeletion(m);
|
||||
|
||||
refs.forEach(c -> get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
mc.removeRef(object.getName());
|
||||
// tryQuickDelete(ref);
|
||||
tryQuickDelete(ref);
|
||||
return null;
|
||||
})));
|
||||
|
||||
|
||||
@@ -23,6 +23,9 @@ public class JObjectRefProcessor {
|
||||
@Inject
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
@Inject
|
||||
JObjectWriteback jObjectWriteback;
|
||||
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
|
||||
@@ -43,8 +46,8 @@ public class JObjectRefProcessor {
|
||||
|
||||
public void putDeletionCandidate(String name) {
|
||||
synchronized (this) {
|
||||
_candidates.putIfAbsent(name, System.currentTimeMillis());
|
||||
this.notify();
|
||||
if (_candidates.putIfAbsent(name, System.currentTimeMillis()) == null)
|
||||
this.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,6 +91,7 @@ public class JObjectRefProcessor {
|
||||
refs = Streams.concat(refs, got.get().getData().extractRefs().stream());
|
||||
|
||||
got.get().discardData();
|
||||
jObjectWriteback.hintDeletion(m);
|
||||
|
||||
refs.forEach(c -> {
|
||||
jObjectManager.get(c).ifPresent(ref -> ref.runWriteLocked(JObject.ResolutionStrategy.NO_RESOLUTION, (mc, dc, bc, ic) -> {
|
||||
|
||||
@@ -104,7 +104,7 @@ public class JObjectResolver {
|
||||
if (local.isPresent()) return local.get();
|
||||
|
||||
var obj = remoteObjectServiceClient.getObject(jObject);
|
||||
objectPersistentStore.writeObject(jObject.getName(), obj);
|
||||
jObjectWriteback.markDirty(jObject.getName(), jObject);
|
||||
invalidationQueueService.pushInvalidationToAll(jObject.getName(), !jObject.getMeta().isSeen());
|
||||
return SerializationHelper.deserialize(obj);
|
||||
}
|
||||
@@ -112,7 +112,7 @@ public class JObjectResolver {
|
||||
public void removeLocal(JObject<?> jObject, String name) {
|
||||
jObject.assertRWLock();
|
||||
try {
|
||||
Log.trace("Deleting " + name);
|
||||
Log.trace("Invalidating " + name);
|
||||
jObjectWriteback.remove(name);
|
||||
objectPersistentStore.deleteObject(name);
|
||||
} catch (StatusRuntimeException sx) {
|
||||
|
||||
@@ -126,6 +126,7 @@ public class JObjectWriteback {
|
||||
objectPersistentStore.deleteObject(m.getName());
|
||||
return;
|
||||
}
|
||||
m.markWritten();
|
||||
objectPersistentStore.writeObject("meta_" + m.getName(), SerializationHelper.serialize(m));
|
||||
if (data != null)
|
||||
objectPersistentStore.writeObject(m.getName(), SerializationHelper.serialize(data));
|
||||
@@ -138,37 +139,54 @@ public class JObjectWriteback {
|
||||
}
|
||||
|
||||
private void tryClean() {
|
||||
JObject<?> found = null;
|
||||
synchronized (_objects) {
|
||||
if (_objects.size() >= limit) {
|
||||
for (var obj : _objects.entrySet()) {
|
||||
var jobj = obj.getValue().getValue();
|
||||
if (jobj.isDeleted()) {
|
||||
if (jobj.tryRwLock()) {
|
||||
try {
|
||||
if (!jobj.isDeleted())
|
||||
continue;
|
||||
flushOneImmediate(jobj.getMeta(), null);
|
||||
_objects.remove(jobj.getName());
|
||||
break;
|
||||
} finally {
|
||||
obj.getValue().getRight().rwUnlock();
|
||||
if (!jobj.isDeleted()) {
|
||||
jobj.rwUnlock();
|
||||
continue;
|
||||
}
|
||||
found = jobj;
|
||||
_objects.remove(found.getName());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (found != null)
|
||||
try {
|
||||
flushOneImmediate(found.getMeta(), null);
|
||||
} finally {
|
||||
found.rwUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void hintDeletion(ObjectMetadata meta) {
|
||||
synchronized (_objects) {
|
||||
if (!meta.isWritten()) {
|
||||
_objects.remove(meta.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void markDirty(String name, JObject<?> object) {
|
||||
object.assertRWLock();
|
||||
if (object.isDeleted() && !object.getMeta().isWritten())
|
||||
return;
|
||||
|
||||
synchronized (_objects) {
|
||||
if (_objects.containsKey(name)) {
|
||||
if (_objects.containsKey(name))
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
tryClean();
|
||||
tryClean();
|
||||
|
||||
synchronized (_objects) {
|
||||
// FIXME: better logic
|
||||
if (_objects.size() < limit) {
|
||||
if (overload) {
|
||||
|
||||
@@ -6,13 +6,17 @@ import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||
import jakarta.enterprise.inject.spi.CDI;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.Serial;
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class ObjectMetadata implements Serializable {
|
||||
public ObjectMetadata(String name) {
|
||||
public ObjectMetadata(String name, boolean written) {
|
||||
_name = name;
|
||||
_written.set(written);
|
||||
}
|
||||
|
||||
@Getter
|
||||
@@ -33,10 +37,18 @@ public class ObjectMetadata implements Serializable {
|
||||
@Getter
|
||||
private boolean _locked = false;
|
||||
|
||||
private transient AtomicBoolean _written = new AtomicBoolean(true);
|
||||
|
||||
private final AtomicBoolean _seen = new AtomicBoolean(false);
|
||||
|
||||
private final AtomicBoolean _deleted = new AtomicBoolean(false);
|
||||
|
||||
@Serial
|
||||
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
|
||||
in.defaultReadObject();
|
||||
_written = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
public boolean isSeen() {
|
||||
return _seen.get();
|
||||
}
|
||||
@@ -57,6 +69,14 @@ public class ObjectMetadata implements Serializable {
|
||||
_deleted.set(false);
|
||||
}
|
||||
|
||||
public boolean isWritten() {
|
||||
return _written.get();
|
||||
}
|
||||
|
||||
public void markWritten() {
|
||||
_written.set(true);
|
||||
}
|
||||
|
||||
private final Set<String> _referrers = new HashSet<>();
|
||||
|
||||
public long lock() {
|
||||
|
||||
@@ -13,8 +13,8 @@ dhfs.fuse.root=${HOME}/dhfs_data/dhfs_fuse_root
|
||||
dhfs.fuse.debug=false
|
||||
dhfs.fuse.enabled=true
|
||||
dhfs.storage.files.target_chunk_size=1048576
|
||||
dhfs.objects.writeback.delay=100
|
||||
dhfs.objects.writeback.limit=5000
|
||||
dhfs.objects.writeback.delay=200
|
||||
dhfs.objects.writeback.limit=3000
|
||||
dhfs.objects.deletion.delay=0
|
||||
dhfs.objects.ref_verification=false
|
||||
dhfs.files.use_hash_for_chunks=false
|
||||
Reference in New Issue
Block a user