a bit more cleanup with metadata

This commit is contained in:
2024-06-16 20:57:22 +02:00
parent 4b3f29f6f6
commit 23cd3e4345
5 changed files with 61 additions and 45 deletions

View File

@@ -84,7 +84,7 @@ public class DistributedObjectRepository implements ObjectRepository {
var info = infoOpt.get(); var info = infoOpt.get();
Optional<byte[]> read = info.runReadLocked(() -> { Optional<byte[]> read = info.runReadLocked((data) -> {
if (objectPersistentStore.existsObject(name)) if (objectPersistentStore.existsObject(name))
return Optional.of(objectPersistentStore.readObject(name)); return Optional.of(objectPersistentStore.readObject(name));
return Optional.empty(); return Optional.empty();
@@ -92,7 +92,7 @@ public class DistributedObjectRepository implements ObjectRepository {
if (read.isPresent()) return read.get(); if (read.isPresent()) return read.get();
// Race? // Race?
return info.runWriteLocked(() -> { return info.runWriteLocked((data) -> {
var obj = remoteObjectServiceClient.getObject(name); var obj = remoteObjectServiceClient.getObject(name);
objectPersistentStore.writeObject(name, obj); objectPersistentStore.writeObject(name, obj);
return obj; return obj;
@@ -104,7 +104,7 @@ public class DistributedObjectRepository implements ObjectRepository {
public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) { public void writeObject(String name, byte[] data, Boolean canIgnoreConflict) {
var info = objectIndexService.getOrCreateMeta(name, canIgnoreConflict); var info = objectIndexService.getOrCreateMeta(name, canIgnoreConflict);
info.runWriteLocked(() -> { info.runWriteLocked((metaData) -> {
objectPersistentStore.writeObject(name, data); objectPersistentStore.writeObject(name, data);
var prevMtime = info.getMtime(); var prevMtime = info.getMtime();
info.setMtime(System.currentTimeMillis()); info.setMtime(System.currentTimeMillis());

View File

@@ -1,61 +1,50 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed; package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ObjectMeta implements Serializable { public class ObjectMeta implements Serializable {
public ObjectMeta(String name, Boolean assumeUnique) { public ObjectMeta(String name, Boolean assumeUnique) {
this._name = name; _data = new ObjectMetaData(name, assumeUnique);
this._assumeUnique = assumeUnique;
} }
private final ObjectMetaData _data;
private final ReadWriteLock _lock = new ReentrantReadWriteLock(); private final ReadWriteLock _lock = new ReentrantReadWriteLock();
@Getter
final String _name;
long _mtime;
@Getter
final Boolean _assumeUnique;
//FIXME:
final List<String> _remoteCopies = new ArrayList<>();
public void setMtime(long mtime) { public void setMtime(long mtime) {
runWriteLocked(() -> { runWriteLocked((data) -> {
_mtime = mtime; data.setMtime(mtime);
return null; return null;
}); });
} }
public long getMtime() { public long getMtime() {
return runReadLocked(() -> _mtime); return runReadLocked((data) -> data.getMtime());
} }
public <R> R runReadLocked(Callable<R> fn) { public boolean getAssumeUnique() {
return runReadLocked((data) -> data.getAssumeUnique());
}
@FunctionalInterface
public interface ObjectMetaFn<R> {
R apply(ObjectMetaData indexData);
}
public <R> R runReadLocked(ObjectMetaFn<R> fn) {
_lock.readLock().lock(); _lock.readLock().lock();
try { try {
return fn.call(); return fn.apply(_data);
} catch (Exception e) {
throw new RuntimeException(e);
} finally { } finally {
_lock.readLock().unlock(); _lock.readLock().unlock();
} }
} }
public <R> R runWriteLocked(Callable<R> fn) { public <R> R runWriteLocked(ObjectMetaFn<R> fn) {
_lock.writeLock().lock(); _lock.writeLock().lock();
try { try {
return fn.call(); return fn.apply(_data);
} catch (Exception e) {
throw new RuntimeException(e);
} finally { } finally {
_lock.writeLock().unlock(); _lock.writeLock().unlock();
} }

View File

@@ -0,0 +1,28 @@
package com.usatiuk.dhfs.storage.objects.repository.distributed;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class ObjectMetaData implements Serializable {
public ObjectMetaData(String name, Boolean assumeUnique) {
_name = name;
_assumeUnique = assumeUnique;
}
@Getter
final String _name;
@Getter
@Setter
long _mtime;
@Getter
final Boolean _assumeUnique;
@Getter
final List<String> _remoteCopies = new ArrayList<>();
}

View File

@@ -33,7 +33,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
var metaOpt = objectIndexService.getMeta(request.getName()); var metaOpt = objectIndexService.getMeta(request.getName());
if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND); if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
var meta = metaOpt.get(); var meta = metaOpt.get();
Optional<Pair<Long, byte[]>> read = meta.runReadLocked(() -> { Optional<Pair<Long, byte[]>> read = meta.runReadLocked((data) -> {
if (objectPersistentStore.existsObject(request.getName())) if (objectPersistentStore.existsObject(request.getName()))
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName()))); return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName())));
return Optional.empty(); return Optional.empty();

View File

@@ -5,7 +5,6 @@ import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager; import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore; import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
import io.quarkus.logging.Log; import io.quarkus.logging.Log;
import io.smallrye.mutiny.Uni;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject; import jakarta.inject.Inject;
import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.NotImplementedException;
@@ -22,32 +21,32 @@ public class SyncHandler {
JObjectManager jObjectManager; JObjectManager jObjectManager;
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) { public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
var metaOpt = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique()); var meta = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique());
metaOpt.runWriteLocked(() -> { meta.runWriteLocked((data) -> {
if (metaOpt.getMtime() == request.getMtime()) { if (meta.getMtime() == request.getMtime()) {
metaOpt._remoteCopies.add(request.getSelfname()); data.getRemoteCopies().add(request.getSelfname());
return null; return null;
} }
if (metaOpt.getMtime() != request.getPrevMtime()) { if (meta.getMtime() != request.getPrevMtime()) {
if (!metaOpt.getAssumeUnique() if (!meta.getAssumeUnique()
|| (metaOpt.getAssumeUnique() != request.getAssumeUnique())) { || (meta.getAssumeUnique() != request.getAssumeUnique())) {
Log.error("Conflict!"); Log.error("Conflict!");
throw new NotImplementedException(); throw new NotImplementedException();
} }
} }
metaOpt.setMtime(request.getMtime()); meta.setMtime(request.getMtime());
metaOpt._remoteCopies.clear(); data.getRemoteCopies().clear();
metaOpt._remoteCopies.add(request.getSelfname()); data.getRemoteCopies().add(request.getSelfname());
try { try {
objectPersistentStore.deleteObject(request.getName()); objectPersistentStore.deleteObject(request.getName());
} catch (Exception ignored) { } catch (Exception ignored) {
} }
jObjectManager.invalidateJObject(metaOpt.getName()); jObjectManager.invalidateJObject(data.getName());
return null; return null;
}); });