Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ plugins {
id 'maven-publish'
id 'signing'
id 'com.gradleup.shadow'
id 'com.google.protobuf'
}

group = 'dev.lukebemish'
Expand Down Expand Up @@ -166,6 +167,8 @@ dependencies {
// For reading/writing parchment json files
implementation libs.feather.core
implementation libs.feather.gson
// For storing various binary information -- notably, zip dissassembly info
implementation libs.protobuf.java
annotationProcessor libs.picocli.codegen
implementation(project(':')) {
capabilities {
Expand Down Expand Up @@ -207,6 +210,12 @@ dependencies {
instrumentationImplementation libs.asm
}

protobuf {
protoc {
artifact = libs.protobuf.protoc.get()
}
}

// This keeps the same versions at runtime and compile time
var tools = new Properties()
file("src/main/resources/tools.properties").withInputStream {
Expand Down
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mappingio = "0.7.0"
feather = "1.1.0"
gson = "2.11.0"
oshi = "6.6.5"
protobuf = "4.33.4"

[libraries]

Expand All @@ -27,3 +28,6 @@ feather-gson = { group = "org.parchmentmc.feather", name = "io-gson", version.re
gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" }

oshi-core = { group = "com.github.oshi", name = "oshi-core", version.ref = "oshi" }

protobuf-java = { group = "com.google.protobuf", name = "protobuf-java", version.ref = "protobuf" }
protobuf-protoc = { group = "com.google.protobuf", name = "protoc", version.ref = "protobuf" }
20 changes: 11 additions & 9 deletions locks/buildscript-gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
com.fasterxml.woodstox:woodstox-core:7.1.0=classpath
com.gradleup.shadow:com.gradleup.shadow.gradle.plugin:9.3.0=classpath
com.gradleup.shadow:shadow-gradle-plugin:9.3.0=classpath
com.google.code.findbugs:jsr305:3.0.2=classpath
com.google.gradle:osdetector-gradle-plugin:1.7.3=classpath
com.google.protobuf:com.google.protobuf.gradle.plugin:0.9.6=classpath
com.google.protobuf:protobuf-gradle-plugin:0.9.6=classpath
com.gradleup.shadow:com.gradleup.shadow.gradle.plugin:9.3.1=classpath
com.gradleup.shadow:shadow-gradle-plugin:9.3.1=classpath
commons-codec:commons-codec:1.20.0=classpath
commons-io:commons-io:2.21.0=classpath
kr.motd.maven:os-maven-plugin:1.7.1=classpath
org.apache.ant:ant-launcher:1.10.15=classpath
org.apache.ant:ant:1.10.15=classpath
org.apache.logging.log4j:log4j-api:2.25.2=classpath
org.apache.logging.log4j:log4j-core:2.25.2=classpath
org.apache.logging.log4j:log4j-api:2.25.3=classpath
org.apache.logging.log4j:log4j-core:2.25.3=classpath
org.apache.maven:maven-api-annotations:4.0.0-rc-3=classpath
org.apache.maven:maven-api-xml:4.0.0-rc-3=classpath
org.apache.maven:maven-xml:4.0.0-rc-3=classpath
org.codehaus.plexus:plexus-utils:4.0.2=classpath
org.codehaus.plexus:plexus-xml:4.1.0=classpath
org.codehaus.woodstox:stax2-api:4.2.2=classpath
org.jdom:jdom2:2.0.6.1=classpath
org.jetbrains.kotlin:kotlin-metadata-jvm:2.3.0-RC2=classpath
org.jetbrains.kotlin:kotlin-stdlib:2.3.0-RC2=classpath
org.jetbrains.kotlin:kotlin-metadata-jvm:2.3.0=classpath
org.jetbrains.kotlin:kotlin-stdlib:2.3.0=classpath
org.jetbrains:annotations:13.0=classpath
org.ow2.asm:asm-commons:9.9=classpath
org.ow2.asm:asm-tree:9.9=classpath
org.ow2.asm:asm:9.9=classpath
org.vafer:jdependency:2.14=classpath
empty=
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pluginManagement {
}
plugins {
id 'com.gradleup.shadow' version '+'
id 'com.google.protobuf' version '+'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dev.lukebemish.taskgraphrunner.model.Output;
import dev.lukebemish.taskgraphrunner.runtime.util.LockManager;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -20,7 +21,13 @@ public interface Context {

Path pathFromHash(String hash, String outputType);

Path existingTaskOutput(Task task, String outputName);
String storeTaskOutput(Task task, String output) throws IOException;

String contentAddressForTaskOutput(Task task, String outputName);

Path contentAddressedTaskOutput(Task task, String outputName);

Path reassembleTaskOutput(Task task, String outputName, Path reassemblyInfo);

Path taskStatePath(Task task);

Expand Down
100 changes: 98 additions & 2 deletions src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import dev.lukebemish.taskgraphrunner.model.Output;
import dev.lukebemish.taskgraphrunner.runtime.util.JsonUtils;
import dev.lukebemish.taskgraphrunner.runtime.util.LockManager;
import dev.lukebemish.taskgraphrunner.runtime.zips.PiecewiseZips;
import org.jspecify.annotations.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
Expand All @@ -20,7 +22,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -82,8 +86,21 @@ public Path taskOutputMarkerPath(Task task, String outputName) {
return taskDirectory(task).resolve(contentsHash+"."+outputName+"."+task.outputId()+"."+outputType+".txt");
}

private static final Set<String> CAN_REASSEMBLE = Set.of(
"zip",
"jar"
);

public String contentAddressForTaskOutput(Task task, String outputName) {
var outputType = task.outputTypes().get(outputName);
if (outputType == null) {
throw new IllegalArgumentException("No such output `"+outputName+"` for task `"+task.name()+"`");
}
return task.getContentAddress(outputName);
}

@Override
public Path existingTaskOutput(Task task, String outputName) {
public Path contentAddressedTaskOutput(Task task, String outputName) {
var outputType = task.outputTypes().get(outputName);
if (outputType == null) {
throw new IllegalArgumentException("No such output `"+outputName+"` for task `"+task.name()+"`");
Expand All @@ -95,13 +112,83 @@ public Path existingTaskOutput(Task task, String outputName) {
}
try {
var contents = Files.readString(markerPath, StandardCharsets.UTF_8);
if (contents.contains("/")) {
var parts = contents.split("/");
task.setContentAddress(outputName, parts[0]);
if (CAN_REASSEMBLE.contains(parts[1])) {
var prefix = parts[0].substring(0, 2);
return contentAddressableDirectory().resolve(prefix).resolve(parts[0] + "." + outputType + ".binpb");
} else {
// Requires reassembly we cannot perform
return null;
}
}
task.setContentAddress(outputName, contents);
var prefix = contents.substring(0, 2);
return contentAddressableDirectory().resolve(prefix).resolve(contents + "." + outputType);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private final Map<String, Path> reassemblyCache = new ConcurrentHashMap<>();
private final Path reassemblyCachePath = Files.createTempDirectory("taskgraphrunner-reassembly-");

public String storeTaskOutput(Task task, String output) throws IOException {
var outputPath = taskOutputPath(task, output);
var outputType = task.outputTypes().get(output);
var hash = contentAddressForTaskOutput(task, output);
return switch (outputType) {
case "zip", "jar" -> {
var outPath = pathFromHash(hash, outputType + ".binpb");
Files.createDirectories(outPath.getParent());
new PiecewiseZips(this).disassemble(outputPath, outPath);
reassemblyCache.compute(task.name() + "/" + output + "." + outputType, (k, v) -> {
var outputPathCache = reassemblyCachePath.resolve(k);
try {
Files.createDirectories(outputPathCache.getParent());
Files.move(outputPath, outputPathCache, StandardCopyOption.ATOMIC_MOVE);
return outputPathCache;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
yield hash + "/" + outputType;
}
default -> {
var outPath = pathFromHash(hash, task.outputTypes().get(output));
Files.createDirectories(outPath.getParent());
// This is atomic because locking here is less sensible
Files.move(outputPath, outPath, StandardCopyOption.ATOMIC_MOVE);
yield hash;
}
};
}

public Path reassembleTaskOutput(Task task, String outputName, Path reassemblyInfo) {
var outputType = task.outputTypes().get(outputName);
if (outputType == null) {
throw new IllegalArgumentException("No such output `"+outputName+"` for task `"+task.name()+"`");
}
return reassemblyCache.computeIfAbsent(task.name() + "/" + outputName + "." + outputType, k -> {
var outputPath = reassemblyCachePath.resolve(k);
try {
Files.createDirectories(outputPath.getParent());
switch (outputType) {
case "zip", "jar" -> {
new PiecewiseZips(this).assemble(outputPath, reassemblyInfo);
}
default -> {
throw new IllegalArgumentException("Cannot reassemble output of type `" + outputType + "` for task `" + task.name() + "`");
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return outputPath;
});
}

@Override
public Path pathFromHash(String hash, String outputType) {
var prefix = hash.substring(0, 2);
Expand Down Expand Up @@ -212,7 +299,16 @@ public void execute(Map<Output, Path> results, @Nullable Path taskRecordJson) {
JsonArray outputs = new JsonArray();
singleTask.addProperty("state", taskStatePath(task).toAbsolutePath().toString());
for (var output : task.outputTypes().entrySet()) {
outputs.add(existingTaskOutput(task, output.getKey()).toAbsolutePath().toString());
var existingPath = contentAddressedTaskOutput(task, output.getKey());
var lastDot = existingPath.getFileName().toString().lastIndexOf('.');
if (lastDot != -1 && !"binpb".equals(output.getValue()) && "binpb".equals(existingPath.getFileName().toString().substring(lastDot + 1))) {
var reassemblyInfo = new JsonObject();
reassemblyInfo.addProperty("reassembles", existingPath.toAbsolutePath().toString());
reassemblyInfo.addProperty("type", output.getValue());
outputs.add(reassemblyInfo);
} else {
outputs.add(existingPath.toAbsolutePath().toString());
}
}
singleTask.add("outputs", outputs);
executed.add(task.name(), singleTask);
Expand Down
Loading