mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 12:37:48 +01:00
a little less crazy versioning
This commit is contained in:
@@ -34,7 +34,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
|
||||
|
||||
void init(@Observes @Priority(500) StartupEvent event) {
|
||||
Log.info("Initializing file service");
|
||||
if (!objectRepository.existsObject(namespace + new UUID(0, 0))) {
|
||||
if (!objectRepository.existsObject(new UUID(0, 0).toString())) {
|
||||
jObjectManager.put(new Directory(new UUID(0, 0), 0755));
|
||||
}
|
||||
getRoot();
|
||||
|
||||
@@ -23,6 +23,7 @@ import java.util.Optional;
|
||||
public class DistributedObjectRepository implements ObjectRepository {
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
String selfname;
|
||||
|
||||
@Inject
|
||||
Vertx vertx;
|
||||
|
||||
@@ -42,14 +43,9 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
try {
|
||||
Log.info("Starting sync");
|
||||
var got = remoteObjectServiceClient.getIndex();
|
||||
for (var h : got) {
|
||||
var prevMtime = objectIndexService.exists(h.getName())
|
||||
? objectIndexService.getMeta(h.getName()).get().getMtime()
|
||||
: 0;
|
||||
syncHandler.handleRemoteUpdate(
|
||||
IndexUpdatePush.newBuilder().setSelfname(selfname).setName(h.getName())
|
||||
.setAssumeUnique(h.getAssumeUnique())
|
||||
.setMtime(h.getMtime()).setPrevMtime(prevMtime).build());
|
||||
for (var h : got.getObjectsList()) {
|
||||
syncHandler.handleRemoteUpdate(IndexUpdatePush.newBuilder()
|
||||
.setSelfname(got.getSelfname()).setHeader(h).build());
|
||||
}
|
||||
Log.info("Sync complete");
|
||||
} catch (Exception e) {
|
||||
@@ -76,11 +72,9 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
@Nonnull
|
||||
@Override
|
||||
public byte[] readObject(String name) {
|
||||
if (!objectIndexService.exists(name))
|
||||
throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
||||
|
||||
var infoOpt = objectIndexService.getMeta(name);
|
||||
if (infoOpt.isEmpty()) throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
||||
if (infoOpt.isEmpty())
|
||||
throw new IllegalArgumentException("Object " + name + " doesn't exist");
|
||||
|
||||
var info = infoOpt.get();
|
||||
|
||||
@@ -90,8 +84,8 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
return Optional.empty();
|
||||
});
|
||||
if (read.isPresent()) return read.get();
|
||||
// Race?
|
||||
|
||||
// Possible race if it got deleted?
|
||||
return info.runWriteLocked((data) -> {
|
||||
var obj = remoteObjectServiceClient.getObject(name);
|
||||
objectPersistentStore.writeObject(name, obj);
|
||||
@@ -106,17 +100,17 @@ public class DistributedObjectRepository implements ObjectRepository {
|
||||
|
||||
info.runWriteLocked((metaData) -> {
|
||||
objectPersistentStore.writeObject(name, data);
|
||||
var prevMtime = info.getMtime();
|
||||
info.setMtime(System.currentTimeMillis());
|
||||
try {
|
||||
remoteObjectServiceClient.notifyUpdate(name, prevMtime);
|
||||
} catch (Exception e) {
|
||||
Log.error("Error when notifying remote update:");
|
||||
Log.error(e);
|
||||
Log.error(e.getCause());
|
||||
}
|
||||
metaData.getChangelog().merge(selfname, 1L, Long::sum);
|
||||
return null;
|
||||
});
|
||||
// FIXME: Race?
|
||||
try {
|
||||
remoteObjectServiceClient.notifyUpdate(name);
|
||||
} catch (Exception e) {
|
||||
Log.error("Error when notifying remote update:");
|
||||
Log.error(e);
|
||||
Log.error(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
||||
@@ -17,6 +17,9 @@ import java.util.Optional;
|
||||
|
||||
@ApplicationScoped
|
||||
public class ObjectIndexService {
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
String selfname;
|
||||
|
||||
ObjectIndex _index = new ObjectIndex();
|
||||
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.root")
|
||||
@@ -47,7 +50,12 @@ public class ObjectIndexService {
|
||||
}
|
||||
|
||||
public ObjectMeta getOrCreateMeta(String name, boolean assumeUnique) {
|
||||
return _index.getOrCreate(name, assumeUnique);
|
||||
var ret = _index.getOrCreate(name, assumeUnique);
|
||||
ret.runWriteLocked(md -> {
|
||||
md.getChangelog().putIfAbsent(selfname, 0L);
|
||||
return null;
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
|
||||
@@ -12,17 +12,6 @@ public class ObjectMeta implements Serializable {
|
||||
private final ObjectMetaData _data;
|
||||
private final ReadWriteLock _lock = new ReentrantReadWriteLock();
|
||||
|
||||
public void setMtime(long mtime) {
|
||||
runWriteLocked((data) -> {
|
||||
data.setMtime(mtime);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public long getMtime() {
|
||||
return runReadLocked((data) -> data.getMtime());
|
||||
}
|
||||
|
||||
public boolean getAssumeUnique() {
|
||||
return runReadLocked((data) -> data.getAssumeUnique());
|
||||
}
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelog;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
||||
public class ObjectMetaData implements Serializable {
|
||||
@@ -14,15 +18,33 @@ public class ObjectMetaData implements Serializable {
|
||||
}
|
||||
|
||||
@Getter
|
||||
final String _name;
|
||||
private final String _name;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
long _mtime;
|
||||
private final Boolean _assumeUnique;
|
||||
|
||||
@Getter
|
||||
final Boolean _assumeUnique;
|
||||
private final List<String> _remoteCopies = new ArrayList<>();
|
||||
|
||||
@Getter
|
||||
final List<String> _remoteCopies = new ArrayList<>();
|
||||
private final HashMap<String, Long> _changelog = new LinkedHashMap<>();
|
||||
|
||||
Long getTotalVersion() {
|
||||
return _changelog.values().stream().reduce(0L, Long::sum);
|
||||
}
|
||||
|
||||
ObjectChangelog toRpcChangelog() {
|
||||
var changelogBuilder = ObjectChangelog.newBuilder();
|
||||
for (var m : getChangelog().entrySet()) {
|
||||
changelogBuilder.addEntries(ObjectChangelogEntry.newBuilder().setHost(m.getKey()).setVersion(m.getValue()).build());
|
||||
}
|
||||
return changelogBuilder.build();
|
||||
}
|
||||
|
||||
ObjectHeader toRpcHeader() {
|
||||
var headerBuilder = ObjectHeader.newBuilder().setName(getName());
|
||||
headerBuilder.setAssumeUnique(getAssumeUnique());
|
||||
headerBuilder.setChangelog(toRpcChangelog());
|
||||
return headerBuilder.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,18 +1,12 @@
|
||||
package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.GetIndexRequest;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.GetObjectRequest;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectHeader;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.*;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.smallrye.mutiny.Uni;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ApplicationScoped
|
||||
public class RemoteObjectServiceClient {
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
@@ -26,39 +20,57 @@ public class RemoteObjectServiceClient {
|
||||
|
||||
public byte[] getObject(String name) {
|
||||
return remoteHostManager.withClient(client -> {
|
||||
var req = GetObjectRequest.newBuilder().setName(name).build();
|
||||
var reply = client.getObject(req);
|
||||
var metaOpt = objectIndexService.getMeta(name);
|
||||
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
||||
var meta = metaOpt.get();
|
||||
if (meta.getMtime() != reply.getObject().getHeader().getMtime()) {
|
||||
if (!meta.getAssumeUnique() && (meta.getAssumeUnique() != reply.getObject().getHeader().getAssumeUnique())) {
|
||||
var reply = client.getObject(GetObjectRequest.newBuilder().setName(name).build());
|
||||
|
||||
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
|
||||
Log.error("Race when trying to fetch");
|
||||
return new NotImplementedException();
|
||||
});
|
||||
|
||||
var receivedSelfVer = reply.getObject().getHeader().getChangelog()
|
||||
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
|
||||
.findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L);
|
||||
|
||||
var receivedTotalVer = reply.getObject().getHeader().getChangelog().getEntriesList()
|
||||
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
|
||||
|
||||
return meta.runReadLocked(md -> {
|
||||
var outdated =
|
||||
(
|
||||
(md.getTotalVersion() > receivedTotalVer)
|
||||
|| (md.getChangelog().get(selfname) > receivedSelfVer)
|
||||
)
|
||||
&& !md.getAssumeUnique();
|
||||
|
||||
if (outdated) {
|
||||
Log.error("Race when trying to fetch");
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
return reply.getObject().getContent().toByteArray();
|
||||
return reply.getObject().getContent().toByteArray();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public List<ObjectHeader> getIndex() {
|
||||
public GetIndexReply getIndex() {
|
||||
return remoteHostManager.withClient(client -> {
|
||||
var req = GetIndexRequest.newBuilder().build();
|
||||
var reply = client.getIndex(req);
|
||||
return reply.getObjectsList();
|
||||
return reply;
|
||||
});
|
||||
}
|
||||
|
||||
public Boolean notifyUpdate(String name, long prevMtime) {
|
||||
public Boolean notifyUpdate(String name) {
|
||||
return remoteHostManager.withClient(client -> {
|
||||
var metaOpt = objectIndexService.getMeta(name);
|
||||
if (metaOpt.isEmpty()) throw new RuntimeException("Oops!");
|
||||
var meta = metaOpt.get();
|
||||
var meta = objectIndexService.getMeta(name).orElseThrow(() -> {
|
||||
Log.error("Race when trying to notify update");
|
||||
return new NotImplementedException();
|
||||
});
|
||||
|
||||
var req = IndexUpdatePush.newBuilder().setSelfname(selfname).setName(name)
|
||||
.setAssumeUnique(meta.getAssumeUnique())
|
||||
.setMtime(meta.getMtime()).setPrevMtime(prevMtime).build();
|
||||
client.indexUpdate(req);
|
||||
var builder = IndexUpdatePush.newBuilder().setSelfname(selfname);
|
||||
|
||||
client.indexUpdate(builder.setHeader(
|
||||
meta.runReadLocked(ObjectMetaData::toRpcHeader)
|
||||
).build());
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -30,18 +30,20 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Blocking
|
||||
public Uni<GetObjectReply> getObject(GetObjectRequest request) {
|
||||
Log.info("<-- getObject: " + request.getName());
|
||||
var metaOpt = objectIndexService.getMeta(request.getName());
|
||||
if (metaOpt.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
var meta = metaOpt.get();
|
||||
Optional<Pair<Long, byte[]>> read = meta.runReadLocked((data) -> {
|
||||
if (objectPersistentStore.existsObject(request.getName()))
|
||||
return Optional.of(Pair.of(meta.getMtime(), objectPersistentStore.readObject(request.getName())));
|
||||
return Optional.empty();
|
||||
|
||||
var meta = objectIndexService.getMeta(request.getName()).orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
|
||||
Optional<Pair<ObjectHeader, byte[]>> readOpt = meta.runReadLocked((data) -> {
|
||||
if (objectPersistentStore.existsObject(request.getName())) {
|
||||
ObjectHeader header = data.toRpcHeader();
|
||||
byte[] bytes = objectPersistentStore.readObject(request.getName());
|
||||
return Optional.of(Pair.of(header, bytes));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
if (read.isEmpty()) throw new StatusRuntimeException(Status.NOT_FOUND);
|
||||
var obj = read.get().getRight();
|
||||
var header = ObjectHeader.newBuilder().setName(request.getName()).setMtime(read.get().getLeft()).setAssumeUnique(meta.getAssumeUnique()).build();
|
||||
var replyObj = ApiObject.newBuilder().setHeader(header).setContent(ByteString.copyFrom(obj)).build();
|
||||
var read = readOpt.orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND));
|
||||
var replyObj = ApiObject.newBuilder().setHeader(read.getLeft()).setContent(ByteString.copyFrom(read.getRight())).build();
|
||||
return Uni.createFrom().item(GetObjectReply.newBuilder().setObject(replyObj).build());
|
||||
}
|
||||
|
||||
@@ -51,8 +53,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
Log.info("<-- getIndex: ");
|
||||
var builder = GetIndexReply.newBuilder();
|
||||
objectIndexService.forAllRead((name, meta) -> {
|
||||
var entry = ObjectHeader.newBuilder().setName(name).setMtime(meta.getMtime()).setAssumeUnique(meta.getAssumeUnique()).build();
|
||||
builder.addObjects(entry);
|
||||
builder.addObjects(meta.runReadLocked(ObjectMetaData::toRpcHeader));
|
||||
});
|
||||
return Uni.createFrom().item(builder.build());
|
||||
}
|
||||
@@ -60,7 +61,7 @@ public class RemoteObjectServiceServer implements DhfsObjectSyncGrpc {
|
||||
@Override
|
||||
@Blocking
|
||||
public Uni<IndexUpdateReply> indexUpdate(IndexUpdatePush request) {
|
||||
Log.info("<-- indexUpdate: " + request.getName() + " from: " + request.getPrevMtime() + " to: " + request.getMtime());
|
||||
Log.info("<-- indexUpdate: " + request.getHeader().getName());
|
||||
return Uni.createFrom().item(syncHandler.handleRemoteUpdate(request));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,15 +2,22 @@ package com.usatiuk.dhfs.storage.objects.repository.distributed;
|
||||
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdatePush;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.IndexUpdateReply;
|
||||
import com.usatiuk.dhfs.objects.repository.distributed.ObjectChangelogEntry;
|
||||
import com.usatiuk.dhfs.storage.objects.jrepository.JObjectManager;
|
||||
import com.usatiuk.dhfs.storage.objects.repository.persistence.ObjectPersistentStore;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
@ApplicationScoped
|
||||
public class SyncHandler {
|
||||
@ConfigProperty(name = "dhfs.objects.distributed.selfname")
|
||||
String selfname;
|
||||
|
||||
@Inject
|
||||
ObjectPersistentStore objectPersistentStore;
|
||||
|
||||
@@ -21,28 +28,48 @@ public class SyncHandler {
|
||||
JObjectManager jObjectManager;
|
||||
|
||||
public IndexUpdateReply handleRemoteUpdate(IndexUpdatePush request) {
|
||||
var meta = objectIndexService.getOrCreateMeta(request.getName(), request.getAssumeUnique());
|
||||
var meta = objectIndexService.getOrCreateMeta(request.getHeader().getName(), request.getHeader().getAssumeUnique());
|
||||
|
||||
var receivedSelfVer = request.getHeader().getChangelog()
|
||||
.getEntriesList().stream().filter(p -> p.getHost().equals(selfname))
|
||||
.findFirst().map(ObjectChangelogEntry::getVersion).orElse(0L);
|
||||
|
||||
var receivedTotalVer = request.getHeader().getChangelog().getEntriesList()
|
||||
.stream().map(ObjectChangelogEntry::getVersion).reduce(0L, Long::sum);
|
||||
|
||||
meta.runWriteLocked((data) -> {
|
||||
if (meta.getMtime() == request.getMtime()) {
|
||||
var conflict = (data.getChangelog().get(selfname) > receivedSelfVer) && !data.getAssumeUnique();
|
||||
|
||||
if (conflict) {
|
||||
Log.error("Conflict when updating: " + request.getHeader().getName());
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
if (receivedTotalVer.equals(data.getTotalVersion())) {
|
||||
data.getRemoteCopies().add(request.getSelfname());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (meta.getMtime() != request.getPrevMtime()) {
|
||||
if (!meta.getAssumeUnique()
|
||||
|| (meta.getAssumeUnique() != request.getAssumeUnique())) {
|
||||
Log.error("Conflict!");
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
if (receivedTotalVer < data.getTotalVersion()) {
|
||||
// FIXME?:
|
||||
data.getRemoteCopies().remove(request.getSelfname());
|
||||
return null;
|
||||
}
|
||||
|
||||
meta.setMtime(request.getMtime());
|
||||
data.getChangelog().clear();
|
||||
for (var entry : request.getHeader().getChangelog().getEntriesList()) {
|
||||
data.getChangelog().put(entry.getHost(), entry.getVersion());
|
||||
}
|
||||
data.getChangelog().putIfAbsent(selfname, 0L);
|
||||
|
||||
data.getRemoteCopies().clear();
|
||||
data.getRemoteCopies().add(request.getSelfname());
|
||||
|
||||
try {
|
||||
objectPersistentStore.deleteObject(request.getName());
|
||||
objectPersistentStore.deleteObject(request.getHeader().getName());
|
||||
} catch (StatusRuntimeException sx) {
|
||||
if (sx.getStatus() != Status.NOT_FOUND)
|
||||
Log.info("Couldn't delete object from persistent store: ", sx);
|
||||
} catch (Exception e) {
|
||||
Log.info("Couldn't delete object from persistent store: ", e);
|
||||
}
|
||||
|
||||
@@ -12,10 +12,19 @@ service DhfsObjectSyncGrpc {
|
||||
rpc IndexUpdate (IndexUpdatePush) returns (IndexUpdateReply) {}
|
||||
}
|
||||
|
||||
message ObjectChangelogEntry {
|
||||
string host = 1;
|
||||
uint64 version = 2;
|
||||
}
|
||||
|
||||
message ObjectChangelog {
|
||||
repeated ObjectChangelogEntry entries = 1;
|
||||
}
|
||||
|
||||
message ObjectHeader {
|
||||
string name = 2;
|
||||
bool assumeUnique = 3;
|
||||
uint64 mtime = 4;
|
||||
ObjectChangelog changelog = 4;
|
||||
}
|
||||
|
||||
message ApiObject {
|
||||
@@ -28,6 +37,8 @@ message GetObjectRequest {
|
||||
}
|
||||
|
||||
message GetObjectReply {
|
||||
string selfname = 10;
|
||||
|
||||
ApiObject object = 1;
|
||||
}
|
||||
|
||||
@@ -36,16 +47,15 @@ message GetIndexRequest {
|
||||
}
|
||||
|
||||
message GetIndexReply {
|
||||
string selfname = 10;
|
||||
|
||||
repeated ObjectHeader objects = 1;
|
||||
}
|
||||
|
||||
message IndexUpdatePush {
|
||||
string selfname = 10;
|
||||
|
||||
string name = 2;
|
||||
bool assumeUnique = 3;
|
||||
uint64 mtime = 4;
|
||||
uint64 prevMtime = 5;
|
||||
ObjectHeader header = 1;
|
||||
}
|
||||
|
||||
message IndexUpdateReply {
|
||||
|
||||
Reference in New Issue
Block a user