possibly working refcounting

This commit is contained in:
2024-12-29 21:27:08 +01:00
parent 62fbaa206a
commit 5d159ffde1
13 changed files with 137 additions and 21 deletions

View File

@@ -210,7 +210,7 @@ public class JObjectManager {
try {
Collection<TxRecord.TxObjectRecord<?>> drained;
while (!(drained = tx.drainWrites()).isEmpty()) {
while (!(drained = tx.drainNewWrites()).isEmpty()) {
Log.trace("Commit iteration with " + drained.size() + " records");
var toLock = new ArrayList<JObjectKey>();
@@ -236,7 +236,7 @@ public class JObjectManager {
}
}
for (var entry : tx.drainReads().entrySet()) {
for (var entry : tx.reads().entrySet()) {
Log.trace("Processing read " + entry.toString());
switch (entry.getValue()) {
case ReadTrackingObjectSource.TxReadObjectNone<?> none -> {

View File

@@ -46,7 +46,7 @@ public class TransactionManagerImpl implements TransactionManager {
public void rollback() {
var tx = _currentTransaction.get();
// Works only before commit was called
for (var o : tx.drainWrites()) {
for (var o : tx.drainNewWrites()) {
switch (o) {
case TxRecord.TxObjectRecordCopyLock<?> r -> r.original().lock().writeLock().unlock();
default -> {

View File

@@ -24,6 +24,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private final ReadTrackingObjectSource _source;
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _objects = new HashMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newObjects = new HashMap<>();
private TransactionImpl(long id, TransactionObjectSource source) {
_id = id;
@@ -52,6 +53,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
var copy = objectAllocator.copy(read.data());
_objects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy));
_newObjects.put(key, new TxRecord.TxObjectRecordOptimistic<>(read, copy));
return Optional.of(copy.wrapped());
}
case WRITE: {
@@ -61,6 +63,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
var copy = objectAllocator.copy(locked.data());
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
_newObjects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
return Optional.of(copy.wrapped());
}
default:
@@ -76,12 +79,15 @@ public class TransactionFactoryImpl implements TransactionFactory {
switch (got) {
case TxRecord.TxObjectRecordNew<?> created -> {
_objects.remove(key);
_newObjects.remove(key);
}
case TxRecord.TxObjectRecordCopyLock<?> copyLockRecord -> {
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord.original()));
_newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(copyLockRecord.original()));
}
case TxRecord.TxObjectRecordOptimistic<?> optimisticRecord -> {
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord.original()));
_newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(optimisticRecord.original()));
}
case TxRecord.TxObjectRecordDeleted<?> deletedRecord -> {
return;
@@ -95,6 +101,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
return;
}
_objects.put(key, new TxRecord.TxObjectRecordDeleted<>(read));
_newObjects.put(key, new TxRecord.TxObjectRecordDeleted<>(read));
}
@Override
@@ -104,17 +111,18 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
_objects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj));
_newObjects.put(obj.getKey(), new TxRecord.TxObjectRecordNew<>(obj));
}
@Override
public Collection<TxRecord.TxObjectRecord<?>> drainWrites() {
var ret = _objects;
_objects = new HashMap<>();
public Collection<TxRecord.TxObjectRecord<?>> drainNewWrites() {
var ret = _newObjects;
_newObjects = new HashMap<>();
return ret.values();
}
@Override
public Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> drainReads() {
public Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads() {
return _source.getRead();
}
}

View File

@@ -7,7 +7,7 @@ import java.util.Map;
// The transaction interface actually used by user code to retrieve objects
public interface TransactionPrivate extends Transaction {
Collection<TxRecord.TxObjectRecord<?>> drainWrites();
Collection<TxRecord.TxObjectRecord<?>> drainNewWrites();
Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> drainReads();
Map<JObjectKey, ReadTrackingObjectSource.TxReadObject<?>> reads();
}

View File

@@ -1,11 +1,12 @@
package com.usatiuk.dhfs.files.objects;
import com.google.protobuf.ByteString;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.objects.common.runtime.JData;
import java.io.Serializable;
public interface ChunkData extends JData, Serializable {
public interface ChunkData extends JDataRefcounted, Serializable {
ByteString getData();
void setData(ByteString data);

View File

@@ -2,6 +2,7 @@ package com.usatiuk.dhfs.files.objects;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Collection;
import java.util.NavigableMap;
public interface File extends FsNode {
@@ -16,4 +17,9 @@ public interface File extends FsNode {
long getSize();
void setSize(long size);
@Override
default Collection<JObjectKey> collectRefsTo() {
return getChunks().values().stream().toList();
}
}

View File

@@ -1,10 +1,11 @@
package com.usatiuk.dhfs.files.objects;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.objects.common.runtime.JData;
import java.io.Serializable;
public interface FsNode extends JData, Serializable {
public interface FsNode extends JDataRefcounted, Serializable {
long getMode();
void setMode(long mode);

View File

@@ -77,6 +77,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) {
var newChunk = objectAllocator.create(ChunkData.class, new JObjectKey(UUID.randomUUID().toString()));
newChunk.setData(bytes);
newChunk.setRefsFrom(List.of());
curTx.put(newChunk);
return newChunk;
}
@@ -159,6 +160,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
f.setCtime(f.getMtime());
f.setSymlink(false);
f.setChunks(new TreeMap<>());
f.setRefsFrom(List.of());
curTx.put(f);
try {
@@ -622,6 +624,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
File f = objectAllocator.create(File.class, new JObjectKey(fuuid.toString()));
f.setSymlink(true);
f.setRefsFrom(List.of());
ChunkData newChunkData = createChunk(UnsafeByteOperations.unsafeWrap(oldpath.getBytes(StandardCharsets.UTF_8)));
f.getChunks().put(0L, newChunkData.getKey());

View File

@@ -0,0 +1,17 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Collection;
import java.util.List;
public interface JDataRefcounted extends JData {
Collection<JObjectKey> getRefsFrom();
void setRefsFrom(Collection<JObjectKey> refs);
default Collection<JObjectKey> collectRefsTo() {
return List.of();
}
}

View File

@@ -0,0 +1,63 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.transaction.Transaction;
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Set;
@ApplicationScoped
public class RefcounterTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;
@Inject
ObjectAllocator alloc;
@Override
public void onChange(JObjectKey key, JData old, JData cur) {
if (!(cur instanceof JDataRefcounted refCur)) {
return;
}
var refOld = (JDataRefcounted) old;
for (var newRef : CollectionUtils.subtract(refCur.collectRefsTo(), refOld.collectRefsTo())) {
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
referenced.setRefsFrom(CollectionUtils.union(referenced.getRefsFrom(), Set.of(key)));
}
for (var removedRef : CollectionUtils.subtract(refOld.collectRefsTo(), refCur.collectRefsTo())) {
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
referenced.setRefsFrom(CollectionUtils.subtract(referenced.getRefsFrom(), Set.of(key)));
}
}
@Override
public void onCreate(JObjectKey key, JData cur) {
if (!(cur instanceof JDataRefcounted refCur)) {
return;
}
for (var newRef : refCur.collectRefsTo()) {
var referenced = curTx.get(JDataRefcounted.class, newRef).orElse(null);
referenced.setRefsFrom(CollectionUtils.union(referenced.getRefsFrom(), Set.of(key)));
}
}
@Override
public void onDelete(JObjectKey key, JData cur) {
if (!(cur instanceof JDataRefcounted refCur)) {
return;
}
for (var removedRef : refCur.collectRefsTo()) {
var referenced = curTx.get(JDataRefcounted.class, removedRef).orElse(null);
referenced.setRefsFrom(CollectionUtils.subtract(referenced.getRefsFrom(), Set.of(key)));
}
}
}

View File

@@ -294,8 +294,10 @@ public class JKleppmannTreeManager {
if (curTx.get(JKleppmannTreeNode.class, getRootId()).isEmpty()) {
var rootNode = objectAllocator.create(JKleppmannTreeNode.class, getRootId());
rootNode.setNode(new TreeNode<>(getRootId(), null, new JKleppmannTreeNodeMetaDirectory("")));
rootNode.setRefsFrom(List.of());
curTx.put(rootNode);
var trashNode = objectAllocator.create(JKleppmannTreeNode.class, getTrashId());
trashNode.setRefsFrom(List.of());
trashNode.setNode(new TreeNode<>(getTrashId(), null, new JKleppmannTreeNodeMetaDirectory("")));
curTx.put(trashNode);
}
@@ -303,12 +305,12 @@ public class JKleppmannTreeManager {
@Override
public JObjectKey getRootId() {
return new JObjectKey(_treeName + "_jt_root");
return new JObjectKey(_treeName.name() + "_jt_root");
}
@Override
public JObjectKey getTrashId() {
return new JObjectKey(_treeName + "_jt_trash");
return new JObjectKey(_treeName.name() + "_jt_trash");
}
@Override
@@ -327,6 +329,7 @@ public class JKleppmannTreeManager {
public JKleppmannTreeNodeWrapper createNewNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> node) {
var created = objectAllocator.create(JKleppmannTreeNode.class, node.getId());
created.setNode(node);
created.setRefsFrom(List.of());
curTx.put(created);
return new JKleppmannTreeNodeWrapper(created);
}

View File

@@ -1,15 +1,28 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.kleppmanntree.TreeNode;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.io.Serializable;
import java.util.Collection;
import java.util.UUID;
import java.util.stream.Stream;
// FIXME: Ideally this is two classes?
public interface JKleppmannTreeNode extends JData, Serializable {
public interface JKleppmannTreeNode extends JDataRefcounted, Serializable {
TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getNode();
void setNode(TreeNode<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> node);
@Override
default Collection<JObjectKey> collectRefsTo() {
return Stream.concat(getNode().getChildren().values().stream(),
switch (getNode().getMeta()) {
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>of();
case JKleppmannTreeNodeMetaFile file -> Stream.<JObjectKey>of(file.getFileIno());
default -> throw new IllegalStateException("Unexpected value: " + getNode().getMeta());
}
).toList();
}
}

View File

@@ -1,18 +1,15 @@
package com.usatiuk.dhfs.objects.jkleppmanntree.structs;
import com.usatiuk.dhfs.objects.JDataRefcounted;
import com.usatiuk.kleppmanntree.AtomicClock;
import com.usatiuk.kleppmanntree.CombinedTimestamp;
import com.usatiuk.kleppmanntree.LogRecord;
import com.usatiuk.kleppmanntree.OpMove;
import com.usatiuk.objects.common.runtime.JData;
import com.usatiuk.objects.common.runtime.JObjectKey;
import java.util.Collection;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.*;
public interface JKleppmannTreePersistentData extends JData {
public interface JKleppmannTreePersistentData extends JDataRefcounted {
AtomicClock getClock();
void setClock(AtomicClock clock);
@@ -50,4 +47,8 @@ public interface JKleppmannTreePersistentData extends JData {
}
}
@Override
default Collection<JObjectKey> collectRefsTo() {
return List.of(new JObjectKey(getKey().name() + "_jt_trash"), new JObjectKey(getKey().name() + "_jt_root"));
}
}