mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
a bit nicer tx object locking
This commit is contained in:
@@ -8,9 +8,9 @@ import com.usatiuk.dhfs.objects.transaction.TransactionPrivate;
|
||||
import com.usatiuk.dhfs.objects.transaction.TxRecord;
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import com.usatiuk.dhfs.utils.VoidFn;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import io.quarkus.logging.Log;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
@@ -106,6 +106,43 @@ public class JObjectManager {
|
||||
if (got == null) return Optional.empty();
|
||||
return Optional.of(new TransactionObjectImpl<>(got.getLeft(), got.getRight().lock));
|
||||
}
|
||||
|
||||
private <T extends JData> Optional<TransactionObject<T>> getLocked(Class<T> type, JObjectKey key, boolean write) {
|
||||
var read = get(type, key).orElse(null);
|
||||
if (read == null) return Optional.empty();
|
||||
var lock = write ? read.lock().writeLock() : read.lock().readLock();
|
||||
lock.lock();
|
||||
while (true) {
|
||||
try {
|
||||
var readAgain = get(type, key).orElse(null);
|
||||
if (readAgain == null) {
|
||||
lock.unlock();
|
||||
return Optional.empty();
|
||||
}
|
||||
if (!Objects.equals(read, readAgain)) {
|
||||
lock.unlock();
|
||||
read = readAgain;
|
||||
lock = write ? read.lock().writeLock() : read.lock().readLock();
|
||||
lock.lock();
|
||||
continue;
|
||||
}
|
||||
return Optional.of(new TransactionObjectImpl<>(read.data(), read.lock()));
|
||||
} catch (Throwable e) {
|
||||
lock.unlock();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<TransactionObject<T>> getReadLocked(Class<T> type, JObjectKey key) {
|
||||
return getLocked(type, key, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key) {
|
||||
return getLocked(type, key, true);
|
||||
}
|
||||
};
|
||||
|
||||
public TransactionPrivate createTransaction() {
|
||||
@@ -114,7 +151,6 @@ public class JObjectManager {
|
||||
return transactionFactory.createTransaction(counter, _objSource);
|
||||
}
|
||||
|
||||
|
||||
public void commit(TransactionPrivate tx) {
|
||||
var toUnlock = new LinkedList<VoidFn>();
|
||||
var toFlush = new LinkedList<TxRecord.TxObjectRecordWrite<?>>();
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package com.usatiuk.dhfs.objects.transaction;
|
||||
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import com.usatiuk.objects.common.runtime.JData;
|
||||
import com.usatiuk.objects.common.runtime.JObjectKey;
|
||||
import com.usatiuk.objects.alloc.runtime.ObjectAllocator;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import lombok.AccessLevel;
|
||||
@@ -49,9 +49,12 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
|
||||
switch (strategy) {
|
||||
case READ_ONLY: {
|
||||
read.lock().readLock().lock();
|
||||
var view = objectAllocator.unmodifiable(read.data());
|
||||
_objects.put(key, new TxRecord.TxObjectRecordRead<>(read, view));
|
||||
var locked = _source.getReadLocked(type, key).orElse(null);
|
||||
if (locked == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
var view = objectAllocator.unmodifiable(locked.data());
|
||||
_objects.put(key, new TxRecord.TxObjectRecordRead<>(locked, view));
|
||||
return Optional.of(view);
|
||||
}
|
||||
case OPTIMISTIC: {
|
||||
@@ -60,28 +63,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
|
||||
return Optional.of(copy.wrapped());
|
||||
}
|
||||
case WRITE: {
|
||||
read.lock().writeLock().lock();
|
||||
while (true) {
|
||||
try {
|
||||
var readAgain = _source.get(type, key).orElse(null);
|
||||
if (readAgain == null) {
|
||||
read.lock().writeLock().unlock();
|
||||
return Optional.empty();
|
||||
}
|
||||
if (!Objects.equals(read, readAgain)) {
|
||||
read.lock().writeLock().unlock();
|
||||
read = readAgain;
|
||||
read.lock().writeLock().lock();
|
||||
continue;
|
||||
}
|
||||
var copy = objectAllocator.copy(read.data());
|
||||
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(read, copy));
|
||||
return Optional.of(copy.wrapped());
|
||||
} catch (Throwable e) {
|
||||
read.lock().writeLock().unlock();
|
||||
throw e;
|
||||
}
|
||||
var locked = _source.getWriteLocked(type, key).orElse(null);
|
||||
if (locked == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
var copy = objectAllocator.copy(locked.data());
|
||||
_objects.put(key, new TxRecord.TxObjectRecordCopyLock<>(locked, copy));
|
||||
return Optional.of(copy.wrapped());
|
||||
}
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown locking strategy");
|
||||
|
||||
@@ -14,4 +14,8 @@ public interface TransactionObjectSource {
|
||||
}
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> get(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> getReadLocked(Class<T> type, JObjectKey key);
|
||||
|
||||
<T extends JData> Optional<TransactionObject<T>> getWriteLocked(Class<T> type, JObjectKey key);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user