run code cleanup

This commit is contained in:
2025-01-03 12:24:39 +01:00
parent ea0ee90776
commit dbc5230fb8
27 changed files with 186 additions and 201 deletions

View File

@@ -68,11 +68,11 @@ class AutoprotomapProcessor {
}
} catch (Throwable e) {
StringBuilder sb = new StringBuilder();
sb.append(e.toString() + "\n");
sb.append(e + "\n");
for (var el : e.getStackTrace()) {
sb.append(el.toString() + "\n");
}
System.out.println(sb.toString());
System.out.println(sb);
}
}
}

View File

@@ -3,14 +3,14 @@ package com.usatiuk.kleppmanntree;
public abstract class TestNodeMeta implements NodeMeta {
private final String _name;
public TestNodeMeta(String name) {
_name = name;
}
@Override
public String getName() {
return _name;
}
public TestNodeMeta(String name) {
_name = name;
}
abstract public NodeMeta withName(String name);
}

View File

@@ -3,15 +3,15 @@ package com.usatiuk.kleppmanntree;
public class TestNodeMetaFile extends TestNodeMeta {
private final long _inode;
public long getInode() {
return _inode;
}
public TestNodeMetaFile(String name, long inode) {
super(name);
_inode = inode;
}
public long getInode() {
return _inode;
}
@Override
public NodeMeta withName(String name) {
return new TestNodeMetaFile(name, _inode);

View File

@@ -6,7 +6,9 @@ import java.util.List;
public class TestPeerInterface implements PeerInterface<Long> {
private final long selfId;
public TestPeerInterface(long selfId) {this.selfId = selfId;}
public TestPeerInterface(long selfId) {
this.selfId = selfId;
}
@Override
public Long getSelfId() {

View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>

View File

@@ -22,40 +22,18 @@ import java.util.function.Function;
// TODO: persistent tx id
@ApplicationScoped
public class JObjectManager {
private final List<PreCommitTxHook> _preCommitTxHooks;
private final DataLocker _objLocker = new DataLocker();
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
private final AtomicLong _txCounter = new AtomicLong();
@Inject
WritebackObjectPersistentStore writebackObjectPersistentStore;
@Inject
TransactionFactory transactionFactory;
private final List<PreCommitTxHook> _preCommitTxHooks;
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
_preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
}
private final DataLocker _objLocker = new DataLocker();
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
private final AtomicLong _txCounter = new AtomicLong();
private class JDataWrapper<T extends JData> extends WeakReference<JDataVersionedWrapper<T>> {
private static final Cleaner CLEANER = Cleaner.create();
public JDataWrapper(JDataVersionedWrapper<T> referent) {
super(referent);
var key = referent.data().key();
CLEANER.register(referent, () -> {
_objects.remove(key, this);
});
}
@Override
public String toString() {
return "JDataWrapper{" +
"ref=" + get() +
'}';
}
}
private <T extends JData> JDataVersionedWrapper<T> get(Class<T> type, JObjectKey key) {
while (true) {
{
@@ -93,16 +71,6 @@ public class JObjectManager {
}
}
private record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper<T>> data)
implements TransactionObject<T> {
}
private record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper<T>> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}
private <T extends JData> TransactionObjectNoLock<T> getObj(Class<T> type, JObjectKey key) {
var got = get(type, key);
return new TransactionObjectNoLock<>(Optional.ofNullable(got));
@@ -114,37 +82,6 @@ public class JObjectManager {
return new TransactionObjectLocked<>(Optional.ofNullable(got), lock);
}
private class TransactionObjectSourceImpl implements TransactionObjectSource {
private final long _txId;
private TransactionObjectSourceImpl(long txId) {
_txId = txId;
}
@Override
public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
return getObj(type, key);
// return getObj(type, key).map(got -> {
// if (got.data().getVersion() > _txId) {
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
// }
// return got;
// });
}
@Override
public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
return getObjLock(type, key);
// return getObjLock(type, key).map(got -> {
// if (got.data().getVersion() > _txId) {
// got.lock.close();
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
// }
// return got;
// });
}
}
public TransactionPrivate createTransaction() {
var counter = _txCounter.getAndIncrement();
Log.trace("Creating transaction " + counter);
@@ -292,4 +229,64 @@ public class JObjectManager {
}
});
}
private record TransactionObjectNoLock<T extends JData>
(Optional<JDataVersionedWrapper<T>> data)
implements TransactionObject<T> {
}
private record TransactionObjectLocked<T extends JData>
(Optional<JDataVersionedWrapper<T>> data, AutoCloseableNoThrow lock)
implements TransactionObject<T> {
}
private class JDataWrapper<T extends JData> extends WeakReference<JDataVersionedWrapper<T>> {
private static final Cleaner CLEANER = Cleaner.create();
public JDataWrapper(JDataVersionedWrapper<T> referent) {
super(referent);
var key = referent.data().key();
CLEANER.register(referent, () -> {
_objects.remove(key, this);
});
}
@Override
public String toString() {
return "JDataWrapper{" +
"ref=" + get() +
'}';
}
}
private class TransactionObjectSourceImpl implements TransactionObjectSource {
private final long _txId;
private TransactionObjectSourceImpl(long txId) {
_txId = txId;
}
@Override
public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
return getObj(type, key);
// return getObj(type, key).map(got -> {
// if (got.data().getVersion() > _txId) {
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
// }
// return got;
// });
}
@Override
public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
return getObjLock(type, key);
// return getObjLock(type, key).map(got -> {
// if (got.data().getVersion() > _txId) {
// got.lock.close();
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
// }
// return got;
// });
}
}
}

View File

@@ -8,11 +8,10 @@ import jakarta.inject.Inject;
@ApplicationScoped
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
@Inject
JObjectManager jObjectManager;
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
@Override
public void begin() {
if (_currentTransaction.get() != null) {

View File

@@ -13,6 +13,12 @@ public interface TxWriteback {
void fence(long bundleId);
Optional<PendingWriteEntry> getPendingWrite(JObjectKey key);
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread
void asyncFence(long bundleId, VoidFn callback);
interface PendingWriteEntry {
long bundleId();
}
@@ -22,10 +28,4 @@ public interface TxWriteback {
record PendingDelete(JObjectKey key, long bundleId) implements PendingWriteEntry {
}
Optional<PendingWriteEntry> getPendingWrite(JObjectKey key);
// Executes callback after bundle with bundleId id has been persisted
// if it was already, runs callback on the caller thread
void asyncFence(long bundleId, VoidFn callback);
}

View File

@@ -163,7 +163,7 @@ public class TxWritebackImpl implements TxWriteback {
callbacks.forEach(l -> l.forEach(VoidFn::apply));
synchronized (_flushWaitSynchronizer) {
currentSize -= ((TxBundleImpl) bundle).calculateTotalSize();
currentSize -= bundle.calculateTotalSize();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();

View File

@@ -19,14 +19,10 @@ import java.util.stream.Stream;
@ApplicationScoped
public class CachingObjectPersistentStore {
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true);
private final DataLocker _locker = new DataLocker();
@Inject
SerializingObjectPersistentStore delegate;
private record CacheEntry(Optional<JDataVersionedWrapper<?>> object, long size) {
}
private final LinkedHashMap<JObjectKey, CacheEntry> _cache = new LinkedHashMap<>(8, 0.75f, true);
@ConfigProperty(name = "dhfs.objects.lru.limit")
long sizeLimit;
@ConfigProperty(name = "dhfs.objects.lru.print-stats")
@@ -37,8 +33,6 @@ public class CachingObjectPersistentStore {
private ExecutorService _statusExecutor = null;
private final DataLocker _locker = new DataLocker();
@Startup
void init() {
if (printStats) {
@@ -110,4 +104,7 @@ public class CachingObjectPersistentStore {
}
delegate.commitTx(names);
}
private record CacheEntry(Optional<JDataVersionedWrapper<?>> object, long size) {
}
}

View File

@@ -12,23 +12,25 @@ import java.util.Optional;
@ApplicationScoped
public class TransactionFactoryImpl implements TransactionFactory {
@Override
public TransactionPrivate createTransaction(long id, TransactionObjectSource source) {
return new TransactionImpl(id, source);
}
private class TransactionImpl implements TransactionPrivate {
private final long _id;
public long getId() {
return _id;
}
private final ReadTrackingObjectSource _source;
private final Map<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new HashMap<>();
private Map<JObjectKey, TxRecord.TxObjectRecord<?>> _newWrites = new HashMap<>();
private TransactionImpl(long id, TransactionObjectSource source) {
_id = id;
_source = new ReadTrackingObjectSource(source);
}
public long getId() {
return _id;
}
@Override
public <T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy) {
switch (_writes.get(key)) {
@@ -97,9 +99,4 @@ public class TransactionFactoryImpl implements TransactionFactory {
}
}
@Override
public TransactionPrivate createTransaction(long id, TransactionObjectSource source) {
return new TransactionImpl(id, source);
}
}

View File

@@ -1,7 +1,7 @@
package com.usatiuk.dhfs.objects.transaction;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JDataVersionedWrapper;
import java.util.Optional;

View File

@@ -250,7 +250,7 @@ public class ObjectsTest {
Assertions.assertEquals("John", got.name());
}
} else {
Assertions.assertTrue(!thread2Failed.get());
Assertions.assertFalse(thread2Failed.get());
Assertions.assertEquals("John2", got.name());
}
}

View File

@@ -18,11 +18,6 @@ public class PreCommitTxHookTest {
@Inject
Transaction curTx;
@ApplicationScoped
public static class DummyPreCommitTxHook implements PreCommitTxHook {
}
@InjectSpy
private DummyPreCommitTxHook spyHook;
@@ -111,4 +106,8 @@ public class PreCommitTxHookTest {
Assertions.assertEquals(new JObjectKey("ParentEdit2"), keyCaptor.getValue());
}
@ApplicationScoped
public static class DummyPreCommitTxHook implements PreCommitTxHook {
}
}

View File

@@ -160,8 +160,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
} catch (Exception e) {
// fobj.getMeta().removeRef(newNodeId);
throw e;
} finally {
// fobj.rwUnlock();
}
return Optional.of(f.key());
});
@@ -173,8 +171,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return jObjectTxManager.executeTx(() -> {
return getTree().findParent(w -> {
if (w.meta() instanceof JKleppmannTreeNodeMetaFile f)
if (f.getFileIno().equals(ino))
return true;
return f.getFileIno().equals(ino);
return false;
});
});

View File

@@ -12,6 +12,13 @@ class JnrPtrByteOutputAccessors {
JavaNioAccess _nioAccess;
Unsafe _unsafe;
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
public JavaNioAccess getNioAccess() {
return _nioAccess;
}
@@ -19,11 +26,4 @@ class JnrPtrByteOutputAccessors {
public Unsafe getUnsafe() {
return _unsafe;
}
JnrPtrByteOutputAccessors() throws NoSuchFieldException, IllegalAccessException {
_nioAccess = SharedSecrets.getJavaNioAccess();
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
_unsafe = (Unsafe) f.get(null);
}
}

View File

@@ -58,13 +58,10 @@ public class JKleppmannTreeManager {
public class JKleppmannTree {
private final KleppmannTree<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> _tree;
private JKleppmannTreePersistentData _data;
private final JKleppmannTreeStorageInterface _storageInterface;
private final JKleppmannTreeClock _clock;
private final JObjectKey _treeName;
private JKleppmannTreePersistentData _data;
JKleppmannTree(JKleppmannTreePersistentData data) {
_treeName = data.key();
@@ -414,7 +411,7 @@ public class JKleppmannTreeManager {
@Override
public long size() {
return (long) _data.log().size();
return _data.log().size();
}
@Override

View File

@@ -10,15 +10,15 @@ import java.util.UUID;
public class JKleppmannTreeOpWrapper {
private final OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> _op;
public OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getOp() {
return _op;
}
public JKleppmannTreeOpWrapper(OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> op) {
if (op == null) throw new IllegalArgumentException("op shouldn't be null");
_op = op;
}
public OpMove<Long, UUID, JKleppmannTreeNodeMeta, JObjectKey> getOp() {
return _op;
}
// @Override
// public Collection<JObjectKey> getEscapedRefs() {
// if (_op.newMeta() instanceof JKleppmannTreeNodeMetaFile mf) {

View File

@@ -2,10 +2,15 @@ package com.usatiuk.dhfs.objects.jkleppmanntree;
import java.util.UUID;
public class JKleppmannTreePeriodicPushOp {
public class JKleppmannTreePeriodicPushOp {
private final UUID _from;
private final long _timestamp;
public JKleppmannTreePeriodicPushOp(UUID from, long timestamp) {
_from = from;
_timestamp = timestamp;
}
public UUID getFrom() {
return _from;
}
@@ -14,11 +19,6 @@ public class JKleppmannTreePeriodicPushOp {
return _timestamp;
}
public JKleppmannTreePeriodicPushOp(UUID from, long timestamp) {
_from = from;
_timestamp = timestamp;
}
// @Override
// public Collection<String> getEscapedRefs() {
// return List.of();

View File

@@ -59,7 +59,7 @@ public record JKleppmannTreeNode(JObjectKey key, PCollection<JObjectKey> refsFro
return Stream.concat(children().values().stream(),
switch (meta()) {
case JKleppmannTreeNodeMetaDirectory dir -> Stream.<JObjectKey>of();
case JKleppmannTreeNodeMetaFile file -> Stream.<JObjectKey>of(file.getFileIno());
case JKleppmannTreeNodeMetaFile file -> Stream.of(file.getFileIno());
default -> throw new IllegalStateException("Unexpected value: " + meta());
}
).toList();

View File

@@ -10,14 +10,14 @@ import java.util.Objects;
public abstract class JKleppmannTreeNodeMeta implements NodeMeta {
private final String _name;
public String getName() {
return _name;
}
public JKleppmannTreeNodeMeta(String name) {
_name = name;
}
public String getName() {
return _name;
}
public abstract JKleppmannTreeNodeMeta withName(String name);
@Override

View File

@@ -10,15 +10,15 @@ import java.util.Objects;
public class JKleppmannTreeNodeMetaFile extends JKleppmannTreeNodeMeta {
private final JObjectKey _fileIno;
public JObjectKey getFileIno() {
return _fileIno;
}
public JKleppmannTreeNodeMetaFile(String name, JObjectKey fileIno) {
super(name);
_fileIno = fileIno;
}
public JObjectKey getFileIno() {
return _fileIno;
}
@Override
public JKleppmannTreeNodeMeta withName(String name) {
return new JKleppmannTreeNodeMetaFile(name, _fileIno);

View File

@@ -9,7 +9,8 @@ import java.util.HashMap;
import java.util.Map;
abstract public class TempDataProfile implements QuarkusTestProfile {
protected void getConfigOverrides(Map<String, String> toPut) {}
protected void getConfigOverrides(Map<String, String> toPut) {
}
@Override
final public Map<String, String> getConfigOverrides() {

View File

@@ -12,10 +12,11 @@ import java.util.concurrent.TimeoutException;
public class DhfsImage implements Future<String> {
private static final DhfsImage INSTANCE = new DhfsImage();
private static String _builtImage = null;
private static DhfsImage INSTANCE = new DhfsImage();
private DhfsImage() {}
private DhfsImage() {
}
public static DhfsImage getInstance() {
return INSTANCE;

View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>

View File

@@ -6,42 +6,9 @@ import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap;
public class DataLocker {
private static class LockTag {
boolean released = false;
final Thread owner = Thread.currentThread();
}
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
private class Lock implements AutoCloseableNoThrow {
private final Object _key;
private final LockTag _tag;
private static final Cleaner CLEANER = Cleaner.create();
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
CLEANER.register(this, () -> {
if (!tag.released) {
Log.error("Lock collected without release: " + key);
}
});
}
@Override
public void close() {
synchronized (_tag) {
_tag.released = true;
// Notify all because when the object is locked again,
// it's a different lock tag
_tag.notifyAll();
_locks.remove(_key, _tag);
}
}
}
private static final AutoCloseableNoThrow DUMMY_LOCK = () -> {
};
private final ConcurrentHashMap<Object, LockTag> _locks = new ConcurrentHashMap<>();
public AutoCloseableNoThrow lock(Object data) {
while (true) {
@@ -69,4 +36,36 @@ public class DataLocker {
}
}
private static class LockTag {
final Thread owner = Thread.currentThread();
boolean released = false;
}
private class Lock implements AutoCloseableNoThrow {
private static final Cleaner CLEANER = Cleaner.create();
private final Object _key;
private final LockTag _tag;
public Lock(Object key, LockTag tag) {
_key = key;
_tag = tag;
CLEANER.register(this, () -> {
if (!tag.released) {
Log.error("Lock collected without release: " + key);
}
});
}
@Override
public void close() {
synchronized (_tag) {
_tag.released = true;
// Notify all because when the object is locked again,
// it's a different lock tag
_tag.notifyAll();
_locks.remove(_key, _tag);
}
}
}
}

View File

@@ -11,15 +11,21 @@ public class HashSetDelayedBlockingQueue<T> {
private final LinkedHashMap<T, SetElement<T>> _set = new LinkedHashMap<>();
private final Object _sleepSynchronizer = new Object();
private long _delay;
private boolean _closed = false;
public HashSetDelayedBlockingQueue(long delay) {
_delay = delay;
}
public long getDelay() {
return _delay;
}
private boolean _closed = false;
public HashSetDelayedBlockingQueue(long delay) {
_delay = delay;
public void setDelay(long delay) {
synchronized (_sleepSynchronizer) {
_delay = delay;
_sleepSynchronizer.notifyAll();
}
}
// If there's object with key in the queue, don't do anything
@@ -252,13 +258,6 @@ public class HashSetDelayedBlockingQueue<T> {
return out;
}
public void setDelay(long delay) {
synchronized (_sleepSynchronizer) {
_delay = delay;
_sleepSynchronizer.notifyAll();
}
}
private record SetElement<T>(T el, long time) {
}
}