This commit is contained in:
2024-11-30 20:08:53 +01:00
parent 88a1d8b240
commit bbf275855c
46 changed files with 1183 additions and 30 deletions

View File

@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>objects</artifactId>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkCount>1C</forkCount>
<reuseForks>false</reuseForks>
<parallel>classes</parallel>
</configuration>
</plugin>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<id>quarkus-plugin</id>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,40 @@
package com.usatiuk.dhfs.objects;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
@ApplicationScoped
public class DataLocker {
private final ConcurrentHashMap<JObjectKey, WeakReference<? extends LockWrapper<? extends JData>>> _locks = new ConcurrentHashMap<>();
private final static Cleaner CLEANER = Cleaner.create();
public <T extends JData> LockWrapper<T> get(T data) {
while (true) {
var have = _locks.get(data.getKey());
if (have != null) {
var ret = have.get();
if (ret != null) {
if (ret.sameObject(data)) {
return (LockWrapper<T>) ret;
} else {
Log.warn("Removed stale lock for " + data.getKey());
_locks.remove(data.getKey(), have);
}
}
}
var ret = new LockWrapper<>(data);
var ref = new WeakReference<>(ret);
if (_locks.putIfAbsent(data.getKey(), ref) == null) {
CLEANER.register(ret, () -> _locks.remove(data.getKey(), ref));
return ret;
}
}
}
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
import java.util.function.Function;
public interface JData {
JObjectKey getKey();
JData bindCopy();
Function<JObjectInterface, JObject> binder();
}

View File

@@ -0,0 +1,11 @@
package com.usatiuk.dhfs.objects;
public abstract class JObject {
protected final JObjectInterface _jObjectInterface;
public JObject(JObjectInterface jObjectInterface) {
_jObjectInterface = jObjectInterface;
}
public abstract JData getData();
}

View File

@@ -0,0 +1,9 @@
package com.usatiuk.dhfs.objects;
import java.util.Optional;
public interface JObjectInterface {
Optional<JObject> getObject(JObjectKey key);
<T extends JObject> Optional<T> getObject(JObjectKey key, Class<T> type);
}

View File

@@ -0,0 +1,4 @@
package com.usatiuk.dhfs.objects;
public record JObjectKey(String name) {
}

View File

@@ -0,0 +1,90 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.ObjectPersistentStore;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ApplicationScoped
public class JObjectManager {
@Inject
ObjectPersistentStore objectStorage;
@Inject
DataLocker dataLocker;
public class Transaction implements JObjectInterface {
private final Map<JObjectKey, JObject> _objects = new HashMap<>();
private JObject dataToObject(JData data) {
return data.binder().apply(this);
}
@Override
public Optional<JObject> getObject(JObjectKey key) {
if (_objects.containsKey(key)) {
return Optional.of(_objects.get(key));
}
var data = objectStorage.readObject(key).orElse(null);
if (data == null) {
return Optional.empty();
}
var ret = dataToObject(data);
_objects.put(key, ret);
return Optional.of(ret);
}
@Override
public <T extends JObject> Optional<T> getObject(JObjectKey key, Class<T> type) {
if (_objects.containsKey(key)) {
var got = _objects.get(key);
if (type.isInstance(got)) {
return Optional.of(type.cast(got));
} else {
throw new IllegalArgumentException("Object type mismatch");
}
}
var data = objectStorage.readObject(key).orElse(null);
if (data == null) {
return Optional.empty();
}
var got = dataToObject(data);
if (type.isInstance(got)) {
_objects.put(key, got);
return Optional.of(type.cast(got));
} else {
throw new IllegalArgumentException("Object type mismatch");
}
}
public void commit() {
_objects.forEach((key, value) -> {
var data = (TestData) value.getData();
if (!data.isChanged()) {
return;
}
if (_objectStorage.get(key) == null) {
_objectStorage.put(data.copy());
return;
}
if (_objectStorage.get(key).getVersion() <= data.getVersion()) {
_objectStorage.put(data.copy());
} else {
throw new IllegalArgumentException("Version mismatch");
}
});
}
}
public Transaction beginTransaction() {
return new Transaction();
}
}

View File

@@ -0,0 +1,60 @@
package com.usatiuk.dhfs.objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class LockWrapper<T extends JData> {
private final JData _data;
private final ReentrantReadWriteLock _lock = new ReentrantReadWriteLock();
public LockWrapper(T data) {
_data = data;
}
public boolean sameObject(JData data) {
return _data == data;
}
interface DataAccessor<T extends JData> extends AutoCloseable {
T getData();
}
public class ReadLocked<B extends JData> implements DataAccessor<B> {
public ReadLocked() {
_lock.readLock().lock();
}
@Override
public void close() {
_lock.readLock().unlock();
}
@Override
public B getData() {
return (B) _data;
}
}
public ReadLocked<T> read() {
return new ReadLocked<>();
}
public class WriteLocked<B extends JData> implements DataAccessor<B> {
public WriteLocked() {
_lock.writeLock().lock();
}
@Override
public void close() {
_lock.writeLock().unlock();
}
@Override
public B getData() {
return (B) _data;
}
}
public WriteLocked<T> write() {
return new WriteLocked<>();
}
}

View File

@@ -0,0 +1,33 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Optional;
public interface ObjectPersistentStore {
@Nonnull
Collection<JObjectKey> findAllObjects();
@Nonnull
Optional<JData> readObject(JObjectKey name);
void writeObjectDirect(JObjectKey name, JData object);
void writeObject(JObjectKey name, JData object);
void commitTx(TxManifest names);
// Deletes object metadata and data
void deleteObjectDirect(JObjectKey name);
long getTotalSpace();
long getFreeSpace();
long getUsableSpace();
}

View File

@@ -0,0 +1,351 @@
package com.usatiuk.dhfs.objects.persistence;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.dhfs.utils.ByteUtils;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.annotation.Nonnull;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
// File format:
// 64-bit metadata serialized size
// 64-bit offset of "rest of" metadata (if -1 then file has no data,
// if 0 then file has data and metadata fits into META_BLOCK_SIZE)
// Until META_BLOCK_SIZE - metadata (encoded as ObjectMetadataP)
// data (encoded as JObjectDataP)
// rest of metadata
@ApplicationScoped
public class SerializingFileObjectPersistentStore implements ObjectPersistentStore {
private final Path _root;
private final Path _txManifest;
private ExecutorService _flushExecutor;
private RandomAccessFile _txFile;
private volatile boolean _ready = false;
public SerializingFileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
this._root = Path.of(root).resolve("objects");
_txManifest = Path.of(root).resolve("cur-tx-manifest");
}
void init(@Observes @Priority(100) StartupEvent event) throws IOException {
if (!_root.toFile().exists()) {
Log.info("Initializing with root " + _root);
_root.toFile().mkdirs();
for (int i = 0; i < 256; i++) {
_root.resolve(String.valueOf(i)).toFile().mkdirs();
}
}
if (!Files.exists(_txManifest)) {
Files.createFile(_txManifest);
}
_txFile = new RandomAccessFile(_txManifest.toFile(), "rw");
{
BasicThreadFactory factory = new BasicThreadFactory.Builder()
.namingPattern("persistent-commit-%d")
.build();
_flushExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
}
tryReplay();
Log.info("Transaction replay done");
_ready = true;
}
void shutdown(@Observes @Priority(900) ShutdownEvent event) throws IOException {
_ready = false;
Log.debug("Deleting manifest file");
_txFile.close();
Files.delete(_txManifest);
Log.debug("Manifest file deleted");
}
private void verifyReady() {
if (!_ready) throw new IllegalStateException("Wrong service order!");
}
private void tryReplay() {
var read = readTxManifest();
if (read != null)
commitTxImpl(read, false);
}
private Path getObjPath(@Nonnull JObjectKey obj) {
int h = Objects.hash(obj);
int p1 = h & 0b00000000_00000000_11111111_00000000;
return _root.resolve(String.valueOf(p1 >> 8)).resolve(obj.toString());
}
private Path getTmpObjPath(@Nonnull JObjectKey obj) {
int h = Objects.hash(obj);
int p1 = h & 0b00000000_00000000_11111111_00000000;
return _root.resolve(String.valueOf(p1 >> 8)).resolve(obj + ".tmp");
}
private void findAllObjectsImpl(Collection<JObjectKey> out, Path path) {
var read = path.toFile().listFiles();
if (read == null) return;
for (var s : read) {
if (s.isDirectory()) {
findAllObjectsImpl(out, s.toPath());
} else {
if (s.getName().endsWith(".tmp")) continue; // FIXME:
out.add(new JObjectKey(s.getName())); // FIXME:
}
}
}
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
verifyReady();
ArrayList<JObjectKey> out = new ArrayList<>();
findAllObjectsImpl(out, _root);
return Collections.unmodifiableCollection(out);
}
@Nonnull
@Override
public Optional<JData> readObject(JObjectKey name) {
verifyReady();
var path = getObjPath(name);
try (var rf = new RandomAccessFile(path.toFile(), "r")) {
ByteBuffer buf = UninitializedByteBuffer.allocateUninitialized(Math.toIntExact(rf.getChannel().size()));
fillBuffer(buf, rf.getChannel());
buf.flip();
var bs = UnsafeByteOperations.unsafeWrap(buf);
// This way, the input will be considered "immutable" which would allow avoiding copies
// when parsing byte arrays
var ch = bs.newCodedInput();
ch.enableAliasing(true);
// return JObjectDataP.parseFrom(ch);
return null;
} catch (EOFException | FileNotFoundException | NoSuchFileException fx) {
return Optional.empty();
} catch (IOException e) {
Log.error("Error reading file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
private void fillBuffer(ByteBuffer dst, FileChannel src) throws IOException {
int rem = dst.remaining();
int readTotal = 0;
int readCur = 0;
while (readTotal < rem && (readCur = src.read(dst)) != -1) {
readTotal += readCur;
}
if (rem != readTotal)
throw new EOFException();
}
private void writeObjectImpl(Path path, JData data, boolean sync) throws IOException {
try (var fsb = new FileOutputStream(path.toFile(), false)) {
// int dataSize = data.getSerializedSize();
int dataSize = 0;
// if (fsb.getChannel().write(metaBb.limit(META_BLOCK_SIZE)) != META_BLOCK_SIZE)
// throw new IOException("Could not write to file");
if (sync) {
fsb.flush();
fsb.getFD().sync();
}
}
}
@Override
public void writeObjectDirect(JObjectKey name, JData data) {
verifyReady();
try {
var path = getObjPath(name);
writeObjectImpl(path, data, false);
} catch (IOException e) {
Log.error("Error writing file " + name, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Override
public void writeObject(JObjectKey name, JData obj) {
verifyReady();
try {
var tmpPath = getTmpObjPath(name);
writeObjectImpl(tmpPath, obj, true);
} catch (IOException e) {
Log.error("Error writing new file " + name, e);
}
}
private TxManifest readTxManifest() {
try {
var channel = _txFile.getChannel();
if (channel.size() == 0)
return null;
channel.position(0);
var buf = ByteBuffer.allocate(Math.toIntExact(channel.size()));
fillBuffer(buf, channel);
buf.flip();
long checksum = buf.getLong();
var data = buf.slice();
var hash = LongHashFunction.xx3().hashBytes(data);
if (hash != checksum)
throw new StatusRuntimeExceptionNoStacktrace(Status.DATA_LOSS.withDescription("Transaction manifest checksum mismatch!"));
return SerializationHelper.deserialize(data.array(), data.arrayOffset());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void putTxManifest(TxManifest manifest) {
try {
var channel = _txFile.getChannel();
var data = SerializationHelper.serializeArray(manifest);
channel.truncate(data.length + 8);
channel.position(0);
var hash = LongHashFunction.xx3().hashBytes(data);
if (channel.write(ByteUtils.longToBb(hash)) != 8)
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
if (channel.write(ByteBuffer.wrap(data)) != data.length)
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
channel.force(true);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void commitTx(TxManifest manifest) {
verifyReady();
commitTxImpl(manifest, true);
}
public void commitTxImpl(TxManifest manifest, boolean failIfNotFound) {
try {
if (manifest.getDeleted().isEmpty() && manifest.getWritten().isEmpty()) {
Log.debug("Empty manifest, skipping");
return;
}
putTxManifest(manifest);
var latch = new CountDownLatch(manifest.getWritten().size() + manifest.getDeleted().size());
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
for (var n : manifest.getWritten()) {
_flushExecutor.execute(() -> {
try {
Files.move(getTmpObjPath(n), getObjPath(n), ATOMIC_MOVE, REPLACE_EXISTING);
} catch (Throwable t) {
if (!failIfNotFound && (t instanceof NoSuchFileException)) return;
Log.error("Error writing " + n, t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
for (var d : manifest.getDeleted()) {
_flushExecutor.execute(() -> {
try {
deleteImpl(getObjPath(d));
} catch (Throwable t) {
Log.error("Error deleting " + d, t);
errors.add(t);
} finally {
latch.countDown();
}
});
}
latch.await();
if (!errors.isEmpty()) {
throw new RuntimeException("Errors when commiting tx!");
}
// No real need to truncate here
// try (var channel = _txFile.getChannel()) {
// channel.truncate(0);
// }
// } catch (IOException e) {
// Log.error("Failed committing transaction to disk: ", e);
// throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void deleteImpl(Path path) {
try {
Files.delete(path);
} catch (NoSuchFileException ignored) {
} catch (IOException e) {
Log.error("Error deleting file " + path, e);
throw new StatusRuntimeExceptionNoStacktrace(Status.INTERNAL);
}
}
@Override
public void deleteObjectDirect(JObjectKey name) {
verifyReady();
deleteImpl(getObjPath(name));
}
@Override
public long getTotalSpace() {
verifyReady();
return _root.toFile().getTotalSpace();
}
@Override
public long getFreeSpace() {
verifyReady();
return _root.toFile().getFreeSpace();
}
@Override
public long getUsableSpace() {
verifyReady();
return _root.toFile().getUsableSpace();
}
}

View File

@@ -0,0 +1,13 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JObjectKey;
import java.io.Serializable;
import java.util.List;
// FIXME: Serializable
public interface TxManifest extends Serializable {
List<JObjectKey> getWritten();
List<JObjectKey> getDeleted();
}

View File

@@ -0,0 +1,97 @@
package com.usatiuk.dhfs.objects;
import com.usatiuk.dhfs.objects.persistence.FakeObjectStorage;
import com.usatiuk.dhfs.objects.test.objs.Kid;
import com.usatiuk.dhfs.objects.test.objs.Parent;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class ObjectsTest {
private final FakeObjectStorage _storage = new FakeObjectStorage();
private final JObjectManager _tx = new JObjectManager(_storage);
@Test
void createObject() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John");
tx.commit();
}
{
var tx2 = _tx.beginTransaction();
var parent = tx2.getObject(new JObjectKey("Parent"));
Assertions.assertInstanceOf(Parent.class, parent);
Assertions.assertEquals("John", ((Parent) parent).getName());
}
}
@Test
void createObjectConflict() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John");
var tx2 = _tx.beginTransaction();
var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class);
parent2.setName("John");
tx.commit();
Assertions.assertThrows(Exception.class, tx2::commit);
}
}
@Test
void editConflict() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John");
tx.commit();
}
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
parent.setName("John2");
var tx2 = _tx.beginTransaction();
var parent2 = tx2.getObject(new JObjectKey("Parent"), Parent.class);
parent2.setName("John3");
tx.commit();
Assertions.assertThrows(Exception.class, tx2::commit);
}
{
var tx2 = _tx.beginTransaction();
var parent = tx2.getObject(new JObjectKey("Parent"));
Assertions.assertInstanceOf(Parent.class, parent);
Assertions.assertEquals("John2", ((Parent) parent).getName());
}
}
@Test
void nestedCreate() {
{
var tx = _tx.beginTransaction();
var parent = tx.getObject(new JObjectKey("Parent"), Parent.class);
var kid = tx.getObject(new JObjectKey("Kid"), Kid.class);
parent.setName("John");
kid.setName("KidName");
parent.setKidKey(kid.getKey());
tx.commit();
}
{
var tx2 = _tx.beginTransaction();
var parent = tx2.getObject(new JObjectKey("Parent"));
Assertions.assertInstanceOf(Parent.class, parent);
Assertions.assertEquals("John", ((Parent) parent).getName());
Assertions.assertEquals("KidName", ((Parent) parent).getKid().getName());
}
}
}

View File

@@ -0,0 +1,80 @@
package com.usatiuk.dhfs.objects.persistence;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
import com.usatiuk.dhfs.objects.test.objs.TestData;
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class FakeObjectStorage implements ObjectPersistentStore {
private final Map<JObjectKey, TestData> _objects = new HashMap<>();
private final Map<JObjectKey, TestData> _pending = new HashMap<>();
@Nonnull
@Override
public Collection<JObjectKey> findAllObjects() {
synchronized (this) {
return _objects.keySet();
}
}
@Nonnull
@Override
public Optional<JData> readObject(JObjectKey name) {
synchronized (this) {
return Optional.ofNullable(_objects.get(name));
}
}
@Override
public void writeObjectDirect(JObjectKey name, JData object) {
synchronized (this) {
_objects.put(name, (TestData) object);
}
}
@Override
public void writeObject(JObjectKey name, JData object) {
synchronized (this) {
_pending.put(name, (TestData) object);
}
}
@Override
public void commitTx(TxManifest names) {
synchronized (this) {
for (JObjectKey key : names.getWritten()) {
_objects.put(key, _pending.get(key));
}
for (JObjectKey key : names.getDeleted()) {
_objects.remove(key);
}
}
}
@Override
public void deleteObjectDirect(JObjectKey name) {
synchronized (this) {
_objects.remove(name);
}
}
@Override
public long getTotalSpace() {
return 0;
}
@Override
public long getFreeSpace() {
return 0;
}
@Override
public long getUsableSpace() {
return 0;
}
}

View File

@@ -0,0 +1,18 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.JObjectInterface;
public class Kid extends JObject {
public Kid(JObjectInterface jObjectInterface, KidData data) {
super(jObjectInterface, data);
}
@Override
public JData getData() {
return _data;
}
}

View File

@@ -0,0 +1,19 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.JObjectInterface;
import java.util.function.Function;
public interface KidData extends JData {
String getName();
void setName(String name);
KidData bindCopy();
default Function<JObjectInterface, JObject> binder() {
return jo -> new Kid(jo, bindCopy());
}
}

View File

@@ -0,0 +1,28 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JObjectKey;
public class KidDataImpl extends TestData implements KidData {
private String _name;
public KidDataImpl(long version, JObjectKey key, String name) {
super(version, key);
_name = name;
}
@Override
public String getName() {
return _name;
}
@Override
public void setName(String name) {
_name = name;
onChanged();
}
@Override
public KidDataImpl copy() {
return new KidDataImpl(isChanged() ? getVersion() + 1 : getVersion(), getKey(), _name);
}
}

View File

@@ -0,0 +1,25 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObject;
import com.usatiuk.dhfs.objects.JObjectInterface;
import lombok.experimental.Delegate;
public class Parent extends JObject {
@Delegate
private final ParentData _data;
public Parent(JObjectInterface jObjectInterface, ParentData data) {
super(jObjectInterface);
_data = data;
}
@Override
public JData getData() {
return _data;
}
public Kid getKid() {
return _jObjectInterface.getObject(_data.getKidKey(), Kid.class);
}
}

View File

@@ -0,0 +1,14 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
public interface ParentData extends JData {
String getName();
void setName(String name);
JObjectKey getKidKey();
void setKidKey(JObjectKey kid);
}

View File

@@ -0,0 +1,41 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JObjectKey;
public class ParentDataImpl extends TestData implements ParentData {
private String _name;
private JObjectKey _kidKey;
public ParentDataImpl(long version, JObjectKey key, String name, JObjectKey kidKey) {
super(version, key);
_name = name;
_kidKey = kidKey;
}
@Override
public String getName() {
return _name;
}
@Override
public void setName(String name) {
_name = name;
onChanged();
}
@Override
public JObjectKey getKidKey() {
return _kidKey;
}
@Override
public void setKidKey(JObjectKey kid) {
_kidKey = kid;
onChanged();
}
@Override
public ParentDataImpl copy() {
return new ParentDataImpl(isChanged() ? getVersion() + 1 : getVersion(), getKey(), _name, _kidKey);
}
}

View File

@@ -0,0 +1,34 @@
package com.usatiuk.dhfs.objects.test.objs;
import com.usatiuk.dhfs.objects.JData;
import com.usatiuk.dhfs.objects.JObjectKey;
public abstract class TestData implements JData {
private boolean _changed = false;
private final long _version;
private final JObjectKey _key;
protected TestData(long version, JObjectKey key) {
_version = version;
_key = key;
}
void onChanged() {
_changed = true;
}
public boolean isChanged() {
return _changed;
}
public long getVersion() {
return _version;
}
@Override
public JObjectKey getKey() {
return _key;
}
public abstract TestData copy();
}

View File

@@ -15,6 +15,8 @@
<module>kleppmanntree</module>
<module>supportlib</module>
<module>autoprotomap</module>
<module>objects</module>
<module>utils</module>
</modules>
<properties>
@@ -54,6 +56,11 @@
<version>1.18.34</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>

View File

@@ -51,7 +51,6 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
@@ -147,6 +146,16 @@
<artifactId>supportlib</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>objects</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>utils</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

View File

@@ -15,7 +15,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObject;
import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;

View File

@@ -9,7 +9,7 @@ import com.usatiuk.dhfs.objects.repository.opsupport.OpObject;
import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
import com.usatiuk.dhfs.objects.repository.opsupport.OpSender;
import com.usatiuk.kleppmanntree.*;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
public abstract class JObject<T extends JObjectData> {
public abstract ObjectMetadata getMeta();

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
import jakarta.annotation.Nullable;
import java.util.Collection;

View File

@@ -7,7 +7,7 @@ import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;

View File

@@ -4,7 +4,7 @@ import com.usatiuk.autoprotomap.runtime.ProtoSerializer;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
public interface TxWriteback {
TxBundle createBundle();

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.jrepository;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.dhfs.objects.repository.persistence.ObjectPersistentStore;
import com.usatiuk.utils.VoidFn;
import com.usatiuk.dhfs.utils.VoidFn;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.repository;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.ShutdownChecker;
import com.usatiuk.dhfs.objects.jrepository.*;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;

View File

@@ -10,7 +10,7 @@ import com.usatiuk.dhfs.objects.repository.autosync.AutoSyncProcessor;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.opsupport.Op;
import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.grpc.GrpcService;

View File

@@ -8,7 +8,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.repository.invalidation.InvalidationQueueService;
import com.usatiuk.dhfs.objects.repository.opsupport.OpObjectRegistry;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.repository.autosync;
import com.usatiuk.dhfs.objects.jrepository.*;
import com.usatiuk.dhfs.objects.repository.peersync.PeerDirectory;
import com.usatiuk.dhfs.objects.repository.peersync.PersistentPeerInfo;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;

View File

@@ -1,6 +1,6 @@
package com.usatiuk.dhfs.objects.repository.invalidation;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;

View File

@@ -6,7 +6,7 @@ import com.usatiuk.dhfs.objects.jrepository.JObjectManager;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;

View File

@@ -3,7 +3,7 @@ package com.usatiuk.dhfs.objects.repository.opsupport;
import com.usatiuk.dhfs.objects.jrepository.JObjectTxManager;
import com.usatiuk.dhfs.objects.repository.PeerManager;
import com.usatiuk.dhfs.objects.repository.RemoteObjectServiceClient;
import com.usatiuk.utils.HashSetDelayedBlockingQueue;
import com.usatiuk.dhfs.utils.HashSetDelayedBlockingQueue;
import io.quarkus.logging.Log;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;

View File

@@ -3,13 +3,13 @@ package com.usatiuk.dhfs.objects.repository.persistence;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.SerializationHelper;
import com.usatiuk.dhfs.utils.SerializationHelper;
import com.usatiuk.dhfs.objects.persistence.JObjectDataP;
import com.usatiuk.dhfs.objects.persistence.ObjectMetadataP;
import com.usatiuk.dhfs.supportlib.DhfsSupport;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.utils.ByteUtils;
import com.usatiuk.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.dhfs.utils.ByteUtils;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;

59
dhfs-parent/utils/pom.xml Normal file
View File

@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.usatiuk.dhfs</groupId>
<artifactId>parent</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>utils</artifactId>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.slf4j</groupId>
<artifactId>slf4j-jboss-logmanager</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,4 +1,4 @@
package com.usatiuk.utils;
package com.usatiuk.dhfs.utils;
import java.nio.ByteBuffer;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.utils;
package com.usatiuk.dhfs.utils;
import jakarta.annotation.Nullable;
import lombok.Getter;
@@ -11,9 +11,12 @@ import java.util.function.Function;
public class HashSetDelayedBlockingQueue<T> {
private final LinkedHashMap<T, SetElement<T>> _set = new LinkedHashMap<>();
private final Object _sleepSynchronizer = new Object();
@Getter
private long _delay;
public long getDelay() {
return _delay;
}
private boolean _closed = false;
public HashSetDelayedBlockingQueue(long delay) {

View File

@@ -1,8 +1,7 @@
package com.usatiuk.dhfs;
package com.usatiuk.dhfs.utils;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.dhfs.files.objects.File;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.apache.commons.lang3.SerializationUtils;
@@ -12,10 +11,9 @@ import java.io.InputStream;
import java.io.Serializable;
public abstract class SerializationHelper {
// Taken from SerializationUtils
public static <T> T deserialize(final InputStream inputStream) {
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(File.class.getClassLoader(), inputStream)) {
try (ClassLoaderObjectInputStream in = new ClassLoaderObjectInputStream(SerializationHelper.class.getClassLoader(), inputStream)) {
final T obj = (T) in.readObject();
return obj;
} catch (IOException | ClassNotFoundException e) {

View File

@@ -1,4 +1,4 @@
package com.usatiuk.utils;
package com.usatiuk.dhfs.utils;
import io.grpc.Metadata;
import io.grpc.Status;

View File

@@ -1,4 +1,4 @@
package com.usatiuk.utils;
package com.usatiuk.dhfs.utils;
@FunctionalInterface
public interface VoidFn {

View File

@@ -1,4 +1,4 @@
package com.usatiuk.utils;
package com.usatiuk.dhfs.utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;