slightly hacky versioning

This commit is contained in:
2024-12-31 18:05:19 +01:00
parent a0cad2a5f6
commit 2a8fbc72de
6 changed files with 40 additions and 29 deletions

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.objects.common.runtime.JData;
import jakarta.annotation.Nonnull;
import lombok.Builder;
import java.io.Serializable;
@Builder
public record JDataVersionedWrapper<T extends JData>(@Nonnull T data, long version) implements Serializable {
}

View File

@@ -31,7 +31,7 @@ public class JObjectManager {
@Inject @Inject
ObjectPersistentStore objectStorage; ObjectPersistentStore objectStorage;
@Inject @Inject
ObjectSerializer<JData> objectSerializer; ObjectSerializer<JDataVersionedWrapper> objectSerializer;
@Inject @Inject
ObjectAllocator objectAllocator; ObjectAllocator objectAllocator;
@Inject @Inject
@@ -47,12 +47,12 @@ public class JObjectManager {
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>(); private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
private final AtomicLong _txCounter = new AtomicLong(); private final AtomicLong _txCounter = new AtomicLong();
private class JDataWrapper<T extends JData> extends WeakReference<T> { private class JDataWrapper<T extends JData> extends WeakReference<JDataVersionedWrapper<T>> {
private static final Cleaner CLEANER = Cleaner.create(); private static final Cleaner CLEANER = Cleaner.create();
public JDataWrapper(T referent) { public JDataWrapper(JDataVersionedWrapper<T> referent) {
super(referent); super(referent);
var key = referent.getKey(); var key = referent.data().getKey();
CLEANER.register(referent, () -> { CLEANER.register(referent, () -> {
_objects.remove(key, this); _objects.remove(key, this);
}); });
@@ -66,17 +66,17 @@ public class JObjectManager {
} }
} }
private <T extends JData> T get(Class<T> type, JObjectKey key) { private <T extends JData> JDataVersionedWrapper<T> get(Class<T> type, JObjectKey key) {
while (true) { while (true) {
{ {
var got = _objects.get(key); var got = _objects.get(key);
if (got != null) { if (got != null) {
var ref = got.get(); var ref = got.get();
if (type.isInstance(ref)) { if (ref == null) {
return type.cast(ref);
} else if (ref == null) {
_objects.remove(key, got); _objects.remove(key, got);
} else if (type.isInstance(ref.data())) {
return (JDataVersionedWrapper<T>) ref;
} else { } else {
throw new IllegalArgumentException("Object type mismatch: " + ref.getClass() + " vs " + type); throw new IllegalArgumentException("Object type mismatch: " + ref.getClass() + " vs " + type);
} }
@@ -93,11 +93,11 @@ public class JObjectManager {
if (read == null) return null; if (read == null) return null;
if (type.isInstance(read)) { if (type.isInstance(read.data())) {
var wrapper = new JDataWrapper<>(type.cast(read)); var wrapper = new JDataWrapper<>((JDataVersionedWrapper<T>) read);
var old = _objects.put(key, wrapper); var old = _objects.put(key, wrapper);
assert old == null; assert old == null;
return type.cast(read); return read;
} else { } else {
throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type); throw new IllegalArgumentException("Object type mismatch: " + read.getClass() + " vs " + type);
} }
@@ -106,12 +106,12 @@ public class JObjectManager {
} }
private record TransactionObjectNoLock<T extends JData> private record TransactionObjectNoLock<T extends JData>
(Optional<T> data) (Optional<JDataVersionedWrapper<T>> data)
implements TransactionObject<T> { implements TransactionObject<T> {
} }
private record TransactionObjectLocked<T extends JData> private record TransactionObjectLocked<T extends JData>
(Optional<T> data, AutoCloseableNoThrow lock) (Optional<JDataVersionedWrapper<T>> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> { implements TransactionObject<T> {
} }
@@ -210,7 +210,7 @@ public class JObjectManager {
if (dep == null) { if (dep == null) {
throw new IllegalStateException("No dependency for " + key); throw new IllegalStateException("No dependency for " + key);
} }
yield dep.data.orElse(null); yield dep.data.map(JDataVersionedWrapper::data).orElse(null);
} }
default -> { default -> {
throw new IllegalStateException("Unexpected value: " + current.get(key)); throw new IllegalStateException("Unexpected value: " + current.get(key));
@@ -254,7 +254,6 @@ public class JObjectManager {
} }
} }
// FIXME: lock leak
for (var read : tx.reads().entrySet()) { for (var read : tx.reads().entrySet()) {
addDependency.accept(read.getKey()); addDependency.accept(read.getKey());
if (read.getValue() instanceof TransactionObjectLocked<?> locked) { if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
@@ -265,10 +264,10 @@ public class JObjectManager {
for (var dep : dependenciesLocked.entrySet()) { for (var dep : dependenciesLocked.entrySet()) {
Log.trace("Checking dependency " + dep.getKey()); Log.trace("Checking dependency " + dep.getKey());
if (dep.getValue().data.isEmpty()) continue; if (dep.getValue().data().isEmpty()) continue;
if (dep.getValue().data.get().getVersion() >= tx.getId()) { if (dep.getValue().data().get().version() >= tx.getId()) {
throw new IllegalStateException("Serialization hazard: " + dep.getValue().data.get().getVersion() + " vs " + tx.getId()); throw new IllegalStateException("Serialization hazard: " + dep.getValue().data().get().version() + " vs " + tx.getId());
} }
} }
@@ -282,9 +281,10 @@ public class JObjectManager {
case TxRecord.TxObjectRecordWrite<?> write -> { case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + action.getKey()); Log.trace("Flushing object " + action.getKey());
toWrite.add(action.getKey()); toWrite.add(action.getKey());
var data = objectSerializer.serialize(write.data()); var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
var data = objectSerializer.serialize(wrapped);
objectStorage.writeObject(action.getKey(), data); objectStorage.writeObject(action.getKey(), data);
_objects.put(action.getKey(), new JDataWrapper<>(write.data())); _objects.put(action.getKey(), new JDataWrapper<>(wrapped));
} }
case TxRecord.TxObjectRecordDeleted deleted -> { case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + action.getKey()); Log.trace("Deleting object " + action.getKey());

View File

@@ -3,20 +3,19 @@ package com.usatiuk.dhfs.objects;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.utils.SerializationHelper; import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.objects.common.runtime.JData;
import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.ApplicationScoped;
import java.io.Serializable; import java.io.Serializable;
@ApplicationScoped @ApplicationScoped
public class JavaDataSerializer implements ObjectSerializer<JData> { public class JavaDataSerializer implements ObjectSerializer<JDataVersionedWrapper> {
@Override @Override
public ByteString serialize(JData obj) { public ByteString serialize(JDataVersionedWrapper obj) {
return SerializationHelper.serialize((Serializable) obj); return SerializationHelper.serialize((Serializable) obj);
} }
@Override @Override
public JData deserialize(ByteString data) { public JDataVersionedWrapper deserialize(ByteString data) {
return SerializationHelper.deserialize(data.toByteArray()); return SerializationHelper.deserialize(data.toByteArray());
} }
} }

View File

@@ -1,9 +1,8 @@
package com.usatiuk.dhfs.objects; package com.usatiuk.dhfs.objects;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.usatiuk.objects.common.runtime.JData;
public interface ObjectSerializer<T extends JData> { public interface ObjectSerializer<T> {
ByteString serialize(T obj); ByteString serialize(T obj);
T deserialize(ByteString data); T deserialize(ByteString data);

View File

@@ -1,5 +1,6 @@
package com.usatiuk.dhfs.objects.transaction; package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.objects.alloc.runtime.ObjectAllocator; import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey; import com.usatiuk.objects.common.runtime.JObjectKey;
@@ -34,8 +35,8 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override @Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) { public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
return switch (strategy) { return switch (strategy) {
case OPTIMISTIC -> _source.get(type, key).data(); case OPTIMISTIC -> _source.get(type, key).data().map(JDataVersionedWrapper::data);
case WRITE -> _source.getWriteLocked(type, key).data(); case WRITE -> _source.getWriteLocked(type, key).data().map(JDataVersionedWrapper::data);
}; };
} }

View File

@@ -1,9 +1,10 @@
package com.usatiuk.dhfs.objects.transaction; package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.objects.common.runtime.JData; import com.usatiuk.objects.common.runtime.JData;
import java.util.Optional; import java.util.Optional;
public interface TransactionObject<T extends JData> { public interface TransactionObject<T extends JData> {
Optional<T> data(); Optional<JDataVersionedWrapper<T>> data();
} }