From 8ec07f3e488f2a8ee5ef3532b2d53b2912bc7b1d Mon Sep 17 00:00:00 2001 From: Luke Bemish Date: Fri, 30 Jan 2026 20:11:07 -0500 Subject: [PATCH 1/6] Piecewise ZIP cache implementation --- build.gradle | 9 + gradle/libs.versions.toml | 4 + locks/buildscript-gradle.lockfile | 20 +- settings.gradle | 1 + .../taskgraphrunner/runtime/Context.java | 5 + .../taskgraphrunner/runtime/Invocation.java | 80 ++++++++ .../taskgraphrunner/runtime/Task.java | 9 +- .../taskgraphrunner/runtime/TaskInput.java | 14 +- .../taskgraphrunner/runtime/TaskOutput.java | 13 +- .../runtime/mappings/MappingsSourceImpl.java | 2 +- .../runtime/tasks/ArgumentProcessor.java | 2 +- .../runtime/tasks/CompileTask.java | 2 +- .../runtime/tasks/DownloadAssetsTask.java | 2 +- .../tasks/DownloadDistributionTask.java | 2 +- .../runtime/tasks/DownloadJsonTask.java | 2 +- .../runtime/tasks/DownloadMappingsTask.java | 2 +- .../runtime/tasks/InjectSourcesTask.java | 2 +- .../runtime/tasks/InterfaceInjectionTask.java | 2 +- .../runtime/tasks/JstTask.java | 4 +- .../runtime/tasks/ListClasspathTask.java | 2 +- .../runtime/tasks/PatchSourcesTask.java | 2 +- .../runtime/tasks/RetrieveDataTask.java | 4 +- .../tasks/SplitClassesResourcesTask.java | 2 +- .../runtime/tasks/TransformMappingsTask.java | 2 +- .../runtime/util/HashUtils.java | 15 ++ .../runtime/zips/PiecewiseZips.java | 177 ++++++++++++++++++ src/main/proto/zipparts.proto | 26 +++ 27 files changed, 365 insertions(+), 42 deletions(-) create mode 100644 src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java create mode 100644 src/main/proto/zipparts.proto diff --git a/build.gradle b/build.gradle index d9ec1b2..34c63a2 100644 --- a/build.gradle +++ b/build.gradle @@ -5,6 +5,7 @@ plugins { id 'maven-publish' id 'signing' id 'com.gradleup.shadow' + id 'com.google.protobuf' } group = 'dev.lukebemish' @@ -166,6 +167,8 @@ dependencies { // For reading/writing parchment json files implementation libs.feather.core implementation libs.feather.gson + // For storing various binary information -- notably, zip dissassembly info + implementation libs.protobuf.java annotationProcessor libs.picocli.codegen implementation(project(':')) { capabilities { @@ -207,6 +210,12 @@ dependencies { instrumentationImplementation libs.asm } +protobuf { + protoc { + artifact = libs.protobuf.protoc.get() + } +} + // This keeps the same versions at runtime and compile time var tools = new Properties() file("src/main/resources/tools.properties").withInputStream { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index aa4ed8a..ba72124 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -8,6 +8,7 @@ mappingio = "0.7.0" feather = "1.1.0" gson = "2.11.0" oshi = "6.6.5" +protobuf = "4.33.4" [libraries] @@ -27,3 +28,6 @@ feather-gson = { group = "org.parchmentmc.feather", name = "io-gson", version.re gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" } oshi-core = { group = "com.github.oshi", name = "oshi-core", version.ref = "oshi" } + +protobuf-java = { group = "com.google.protobuf", name = "protobuf-java", version.ref = "protobuf" } +protobuf-protoc = { group = "com.google.protobuf", name = "protoc", version.ref = "protobuf" } diff --git a/locks/buildscript-gradle.lockfile b/locks/buildscript-gradle.lockfile index 4584c75..bb66450 100644 --- a/locks/buildscript-gradle.lockfile +++ b/locks/buildscript-gradle.lockfile @@ -2,14 +2,19 @@ # Manual edits can break the build and are not advised. # This file is expected to be part of source control. com.fasterxml.woodstox:woodstox-core:7.1.0=classpath -com.gradleup.shadow:com.gradleup.shadow.gradle.plugin:9.3.0=classpath -com.gradleup.shadow:shadow-gradle-plugin:9.3.0=classpath +com.google.code.findbugs:jsr305:3.0.2=classpath +com.google.gradle:osdetector-gradle-plugin:1.7.3=classpath +com.google.protobuf:com.google.protobuf.gradle.plugin:0.9.6=classpath +com.google.protobuf:protobuf-gradle-plugin:0.9.6=classpath +com.gradleup.shadow:com.gradleup.shadow.gradle.plugin:9.3.1=classpath +com.gradleup.shadow:shadow-gradle-plugin:9.3.1=classpath commons-codec:commons-codec:1.20.0=classpath commons-io:commons-io:2.21.0=classpath +kr.motd.maven:os-maven-plugin:1.7.1=classpath org.apache.ant:ant-launcher:1.10.15=classpath org.apache.ant:ant:1.10.15=classpath -org.apache.logging.log4j:log4j-api:2.25.2=classpath -org.apache.logging.log4j:log4j-core:2.25.2=classpath +org.apache.logging.log4j:log4j-api:2.25.3=classpath +org.apache.logging.log4j:log4j-core:2.25.3=classpath org.apache.maven:maven-api-annotations:4.0.0-rc-3=classpath org.apache.maven:maven-api-xml:4.0.0-rc-3=classpath org.apache.maven:maven-xml:4.0.0-rc-3=classpath @@ -17,11 +22,8 @@ org.codehaus.plexus:plexus-utils:4.0.2=classpath org.codehaus.plexus:plexus-xml:4.1.0=classpath org.codehaus.woodstox:stax2-api:4.2.2=classpath org.jdom:jdom2:2.0.6.1=classpath -org.jetbrains.kotlin:kotlin-metadata-jvm:2.3.0-RC2=classpath -org.jetbrains.kotlin:kotlin-stdlib:2.3.0-RC2=classpath +org.jetbrains.kotlin:kotlin-metadata-jvm:2.3.0=classpath +org.jetbrains.kotlin:kotlin-stdlib:2.3.0=classpath org.jetbrains:annotations:13.0=classpath -org.ow2.asm:asm-commons:9.9=classpath -org.ow2.asm:asm-tree:9.9=classpath -org.ow2.asm:asm:9.9=classpath org.vafer:jdependency:2.14=classpath empty= diff --git a/settings.gradle b/settings.gradle index f04339a..31107cf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,6 +7,7 @@ pluginManagement { } plugins { id 'com.gradleup.shadow' version '+' + id 'com.google.protobuf' version '+' } } diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java index 875886c..2d27673 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java @@ -3,6 +3,7 @@ import dev.lukebemish.taskgraphrunner.model.Output; import dev.lukebemish.taskgraphrunner.runtime.util.LockManager; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; @@ -20,8 +21,12 @@ public interface Context { Path pathFromHash(String hash, String outputType); + String storeTaskOutput(Task task, String output) throws IOException; + Path existingTaskOutput(Task task, String outputName); + Path reassembleTaskOutput(Task task, String outputName, Path reassemblyInfo); + Path taskStatePath(Task task); Path taskDirectory(Task task); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java index 915ae9c..c712ddc 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java @@ -3,8 +3,10 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import dev.lukebemish.taskgraphrunner.model.Output; +import dev.lukebemish.taskgraphrunner.runtime.util.HashUtils; import dev.lukebemish.taskgraphrunner.runtime.util.JsonUtils; import dev.lukebemish.taskgraphrunner.runtime.util.LockManager; +import dev.lukebemish.taskgraphrunner.runtime.zips.PiecewiseZips; import org.jspecify.annotations.Nullable; import java.io.IOException; @@ -12,6 +14,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -20,7 +23,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -82,6 +87,11 @@ public Path taskOutputMarkerPath(Task task, String outputName) { return taskDirectory(task).resolve(contentsHash+"."+outputName+"."+task.outputId()+"."+outputType+".txt"); } + private static final Set CAN_REASSEMBLE = Set.of( + "zip", + "jar" + ); + @Override public Path existingTaskOutput(Task task, String outputName) { var outputType = task.outputTypes().get(outputName); @@ -95,6 +105,17 @@ public Path existingTaskOutput(Task task, String outputName) { } try { var contents = Files.readString(markerPath, StandardCharsets.UTF_8); + if (contents.contains("/")) { + var parts = contents.split("/"); + if (CAN_REASSEMBLE.contains(parts[1])) { + var prefix = parts[0].substring(0, 2); + // TODO: check existence of component parts, perhaps? + return contentAddressableDirectory().resolve(prefix).resolve(parts[0] + "." + outputType + ".binpb"); + } else { + // Requires reassembly we cannot perform + return null; + } + } var prefix = contents.substring(0, 2); return contentAddressableDirectory().resolve(prefix).resolve(contents + "." + outputType); } catch (IOException e) { @@ -102,6 +123,64 @@ public Path existingTaskOutput(Task task, String outputName) { } } + private final Map reassemblyCache = new ConcurrentHashMap<>(); + private final Path reassemblyCachePath = Files.createTempDirectory("taskgraphrunner-reassembly-"); + + public String storeTaskOutput(Task task, String output) throws IOException { + var outputPath = taskOutputPath(task, output); + var outputType = task.outputTypes().get(output); + var hash = HashUtils.hash(outputPath, "SHA-256"); + return switch (outputType) { + case "zip", "jar" -> { + var outPath = pathFromHash(hash, outputType + ".binpb"); + Files.createDirectories(outPath.getParent()); + new PiecewiseZips(this).disassemble(outputPath, outPath); + reassemblyCache.compute(task.name() + "/" + output + "." + outputType, (k, v) -> { + var outputPathCache = reassemblyCachePath.resolve(k); + try { + Files.createDirectories(outputPathCache.getParent()); + Files.move(outputPath, outputPathCache, StandardCopyOption.ATOMIC_MOVE); + return outputPathCache; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + yield hash + "/" + outputType; + } + default -> { + var outPath = pathFromHash(hash, task.outputTypes().get(output)); + Files.createDirectories(outPath.getParent()); + // This is atomic because locking here is less sensible + Files.move(outputPath, outPath, StandardCopyOption.ATOMIC_MOVE); + yield hash; + } + }; + } + + public Path reassembleTaskOutput(Task task, String outputName, Path reassemblyInfo) { + var outputType = task.outputTypes().get(outputName); + if (outputType == null) { + throw new IllegalArgumentException("No such output `"+outputName+"` for task `"+task.name()+"`"); + } + return reassemblyCache.computeIfAbsent(task.name() + "/" + outputName + "." + outputType, k -> { + var outputPath = reassemblyCachePath.resolve(k); + try { + Files.createDirectories(outputPath.getParent()); + switch (outputType) { + case "zip", "jar" -> { + new PiecewiseZips(this).assemble(outputPath, reassemblyInfo); + } + default -> { + throw new IllegalArgumentException("Cannot reassemble output of type `" + outputType + "` for task `" + task.name() + "`"); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return outputPath; + }); + } + @Override public Path pathFromHash(String hash, String outputType) { var prefix = hash.substring(0, 2); @@ -212,6 +291,7 @@ public void execute(Map results, @Nullable Path taskRecordJson) { JsonArray outputs = new JsonArray(); singleTask.addProperty("state", taskStatePath(task).toAbsolutePath().toString()); for (var output : task.outputTypes().entrySet()) { + // TODO: we ought to record reassembly info here too! outputs.add(existingTaskOutput(task, output.getKey()).toAbsolutePath().toString()); } singleTask.add("outputs", outputs); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java index 12add21..ce070da 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java @@ -165,7 +165,7 @@ private static void executeNode(Context context, GraphNode node) { try (var ignored = node.task.lock(context)) { node.task.execute(context); for (var entry : node.outputs.entrySet()) { - var outputPath = Objects.requireNonNull(context.existingTaskOutput(node.task, entry.getKey()), "Output did not exist"); + var outputPath = Objects.requireNonNull(new TaskOutput(node.task.name, entry.getKey()).resolvePath(context), "Output did not exist"); try { Files.copy(outputPath, entry.getValue(), StandardCopyOption.REPLACE_EXISTING); } catch (IOException e) { @@ -347,14 +347,9 @@ private void execute(Context context) { outputId--; } else { for (var output : outputTypes().keySet()) { - var outputPath = context.taskOutputPath(this, output); - var hash = HashUtils.hash(outputPath, "SHA-256"); - var outPath = context.pathFromHash(hash, outputTypes().get(output)); var markerPath = context.taskOutputMarkerPath(this, output); - Files.createDirectories(outPath.getParent()); Files.createDirectories(markerPath.getParent()); - // This is atomic because locking here is less sensible - Files.move(outputPath, outPath, StandardCopyOption.ATOMIC_MOVE); + var hash = context.storeTaskOutput(this, output); Files.writeString(markerPath, hash, StandardCharsets.UTF_8); } } diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java index 3f64942..f956b64 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java @@ -34,7 +34,7 @@ default List dependencies() { } sealed interface HasFileInput extends TaskInput { - Path path(Context context); + Path resolvePath(Context context); } sealed interface FileListInput extends TaskInput { @@ -135,7 +135,7 @@ public JsonElement recordedValue(Context context) { } @Override - public Path path(Context context) { + public Path resolvePath(Context context) { return path(); } } @@ -147,7 +147,7 @@ public void hashReference(ByteConsumer digest, Context context) {} @Override public void hashContents(ByteConsumer digest, Context context) { - HashUtils.hash(output.getPath(context), digest); + HashUtils.hash(output.resolvePath(context), digest); } @Override @@ -164,8 +164,8 @@ public JsonElement recordedValue(Context context) { } @Override - public Path path(Context context) { - return output.getPath(context); + public Path resolvePath(Context context) { + return output.resolvePath(context); } } @@ -177,7 +177,7 @@ public List dependencies() { @Override public List paths(Context context) { - try (var reader = Files.newBufferedReader(libraryFile.path(context))) { + try (var reader = Files.newBufferedReader(libraryFile.resolvePath(context))) { return reader.lines().map(line -> pathNotation(context, line)).toList(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -300,7 +300,7 @@ public JsonElement recordedValue(Context context) { @Override public List paths(Context context) { - var stream = inputs.stream().map(input -> input.path(context)); + var stream = inputs.stream().map(input -> input.resolvePath(context)); if (listOrdering == ListOrdering.CONTENTS) { stream = stream.sorted((a, b) -> { var aOutput = new ByteArrayOutputStream(); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java index 21ce96e..36adb95 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java @@ -4,7 +4,16 @@ import java.util.Objects; public record TaskOutput(String taskName, String name) { - public Path getPath(Context context) { - return Objects.requireNonNull(context.existingTaskOutput(context.getTask(taskName), name), "Output did not exist"); + public Path resolvePath(Context context) { + // Reassembles ZIP or the like if needed + var path = Objects.requireNonNull(context.existingTaskOutput(context.getTask(taskName), name), "Output did not exist"); + var parts = path.getFileName().toString().split("\\."); + if (parts.length > 1) { + var extension = parts[parts.length - 1]; + if (!extension.equals(context.getTask(taskName).outputTypes().get(name)) && "binpb".equals(extension)) { + return context.reassembleTaskOutput(context.getTask(taskName), name, path); + } + } + return path; } } diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/mappings/MappingsSourceImpl.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/mappings/MappingsSourceImpl.java index 1390ef5..bc2a91d 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/mappings/MappingsSourceImpl.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/mappings/MappingsSourceImpl.java @@ -324,7 +324,7 @@ public FileSource(int andIncrement, TaskInput.HasFileInput input, TaskInput.@Nul @Override public MappingTree makeMappings(Context context) { try { - var path = input.path(context); + var path = input.resolvePath(context); if (extension != null) { var extensionObj = extension.value().value(); if (extensionObj instanceof String extensionString) { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ArgumentProcessor.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ArgumentProcessor.java index b1cddf0..dd960f7 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ArgumentProcessor.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ArgumentProcessor.java @@ -117,7 +117,7 @@ public Stream inputs() { @Override public List resolve(Path workingDirectory, String taskName, Context context, int argCount) { - return List.of(pattern.replace("{}", input.path(context).toAbsolutePath().toString())); + return List.of(pattern.replace("{}", input.resolvePath(context).toAbsolutePath().toString())); } } diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/CompileTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/CompileTask.java index b2993d2..ea6d87b 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/CompileTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/CompileTask.java @@ -79,7 +79,7 @@ public Map outputTypes() { @Override protected void run(Context context) { - var sourcesJar = this.sources.path(context); + var sourcesJar = this.sources.resolvePath(context); var workingDirectory = context.taskWorkingDirectory(this); var logFile = workingDirectory.resolve("log.txt"); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java index 78c82c1..0c29f3f 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java @@ -59,7 +59,7 @@ protected boolean upToDate(long lastExecuted, Context context) { @Override protected void run(Context context) { - var versionJson = this.versionJson.path(context); + var versionJson = this.versionJson.resolvePath(context); try (var reader = Files.newBufferedReader(versionJson)) { var json = JsonUtils.GSON.fromJson(reader, JsonObject.class); var assetIndex = json.getAsJsonObject("assetIndex"); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadDistributionTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadDistributionTask.java index fbcf6d0..1cb3131 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadDistributionTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadDistributionTask.java @@ -44,7 +44,7 @@ protected void run(Context context) { if (distribution == Distribution.JOINED) { throw new IllegalArgumentException("Distribution JOINED cannot be downloaded"); } - var versionJson = this.versionJson.path(context); + var versionJson = this.versionJson.resolvePath(context); try (var reader = Files.newBufferedReader(versionJson)) { var json = JsonUtils.GSON.fromJson(reader, JsonObject.class); var downloads = json.getAsJsonObject("downloads"); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadJsonTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadJsonTask.java index a8b0c8d..cfe6139 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadJsonTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadJsonTask.java @@ -39,7 +39,7 @@ public Map outputTypes() { @Override protected void run(Context context) { - try (var reader = Files.newBufferedReader(manifest.path(context))) { + try (var reader = Files.newBufferedReader(manifest.resolvePath(context))) { var json = JsonUtils.GSON.fromJson(reader, JsonObject.class); var versions = json.getAsJsonArray("versions"); var matching = versions.asList().stream() diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadMappingsTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadMappingsTask.java index cb4f800..97e416c 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadMappingsTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadMappingsTask.java @@ -44,7 +44,7 @@ protected void run(Context context) { if (distribution == Distribution.JOINED) { throw new IllegalArgumentException("Distribution JOINED cannot be downloaded"); } - var versionJson = this.versionJson.path(context); + var versionJson = this.versionJson.resolvePath(context); try (var reader = Files.newBufferedReader(versionJson)) { var json = JsonUtils.GSON.fromJson(reader, JsonObject.class); var downloads = json.getAsJsonObject("downloads"); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InjectSourcesTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InjectSourcesTask.java index c17eb3e..d5574f8 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InjectSourcesTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InjectSourcesTask.java @@ -49,7 +49,7 @@ protected void run(Context context) { try (var os = Files.newOutputStream(context.taskOutputPath(this, "output")); var zos = new JarOutputStream(os)) { for (var input : inputs) { - try (var is = new BufferedInputStream(Files.newInputStream(input.path(context))); + try (var is = new BufferedInputStream(Files.newInputStream(input.resolvePath(context))); var zis = new ZipInputStream(is)) { ZipEntry entry; while ((entry = zis.getNextEntry()) != null) { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InterfaceInjectionTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InterfaceInjectionTask.java index 971ff5a..44bad33 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InterfaceInjectionTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/InterfaceInjectionTask.java @@ -296,7 +296,7 @@ private static Signature parseSignature(String binaryName, String signature, Non protected void run(Context context) { var outputJar = context.taskOutputPath(this, "output"); var stubsJar = context.taskOutputPath(this, "stubs"); - var inputJar = this.input.path(context); + var inputJar = this.input.resolvePath(context); Map> injections = new HashMap<>(); try (var classFinder = new NonLoadingClassLoader(classpath.paths(context).toArray(Path[]::new)); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/JstTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/JstTask.java index 722c574..a1983fb 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/JstTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/JstTask.java @@ -102,7 +102,7 @@ public JstTask(TaskModel.Jst model, WorkItem workItem, Context context) { } private void collectArguments(ArrayList command, Context context, Path workingDirectory) { - command.add(input.path(context).toAbsolutePath().toString()); + command.add(input.resolvePath(context).toAbsolutePath().toString()); command.add(context.taskOutputPath(this, "output").toAbsolutePath().toString()); command.add("--classpath="+classpath.classpath(context)); @@ -134,7 +134,7 @@ private void collectArguments(ArrayList command, Context context, Path w mappings = MappingsUtil.fixInnerClasses(parchmentMappingsSource.makeMappings(context)); } else { try { - var path = binaryInput.path(context); + var path = binaryInput.resolvePath(context); var inheritance = MappingInheritance.read(path); mappings = MappingsUtil.fixInnerClasses(parchmentMappingsSource.makeMappingsFillInheritance(context).make(inheritance)); } catch (IOException e) { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ListClasspathTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ListClasspathTask.java index 4ef92cb..3cc4321 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ListClasspathTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/ListClasspathTask.java @@ -45,7 +45,7 @@ public Map outputTypes() { @Override protected void run(Context context) { - var versionManifest = versionJson.path(context); + var versionManifest = versionJson.resolvePath(context); var output = context.taskOutputPath(this, "output"); try (var reader = Files.newBufferedReader(versionManifest)) { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/PatchSourcesTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/PatchSourcesTask.java index 38f8fb3..1ba5a11 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/PatchSourcesTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/PatchSourcesTask.java @@ -38,7 +38,7 @@ protected void collectArguments(ArrayList command, Context context, Path command.add("-jar"); command.add(diffPatchPath.toString()); command.addAll(List.of( - input.path(context).toAbsolutePath().toString(), patches.path(context).toAbsolutePath().toString(), + input.resolvePath(context).toAbsolutePath().toString(), patches.resolvePath(context).toAbsolutePath().toString(), "--patch", "--archive", "ZIP", "--output", context.taskOutputPath(this, "output").toAbsolutePath().toString(), diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/RetrieveDataTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/RetrieveDataTask.java index 924e070..db8f116 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/RetrieveDataTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/RetrieveDataTask.java @@ -56,7 +56,7 @@ public Map outputTypes() { protected void run(Context context) { var pathString = ((Value.DirectStringValue) path.value()).value(); if (isMakingZip) { - try (var is = new BufferedInputStream(Files.newInputStream(input.path(context))); + try (var is = new BufferedInputStream(Files.newInputStream(input.resolvePath(context))); var os = Files.newOutputStream(context.taskOutputPath(this, "output")); var zis = new ZipInputStream(is); var zos = new ZipOutputStream(os) @@ -92,7 +92,7 @@ protected void run(Context context) { throw new UncheckedIOException(e); } } else { - try (var is = new BufferedInputStream(Files.newInputStream(input.path(context))); + try (var is = new BufferedInputStream(Files.newInputStream(input.resolvePath(context))); var os = Files.newOutputStream(context.taskOutputPath(this, "output")); var zis = new ZipInputStream(is)) { boolean found = false; diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/SplitClassesResourcesTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/SplitClassesResourcesTask.java index cd97919..d8132bc 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/SplitClassesResourcesTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/SplitClassesResourcesTask.java @@ -51,7 +51,7 @@ protected void run(Context context) { var deny = Pattern.compile((String) excludePattern.value().value()).asMatchPredicate(); - try (var input = new JarInputStream(new BufferedInputStream(Files.newInputStream(this.input.path(context)))); + try (var input = new JarInputStream(new BufferedInputStream(Files.newInputStream(this.input.resolvePath(context)))); var classesOutFile = new BufferedOutputStream(Files.newOutputStream(classesJar)); var resourcesOutFile = new BufferedOutputStream(Files.newOutputStream(resourcesJar)); var classesOutJar = new JarOutputStream(classesOutFile); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/TransformMappingsTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/TransformMappingsTask.java index 283bc21..67faf89 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/TransformMappingsTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/TransformMappingsTask.java @@ -73,7 +73,7 @@ protected void run(Context context) { mappings = MappingsUtil.fixInnerClasses(source.makeMappings(context)); } else { try { - var path = sourceJarInput.path(context); + var path = sourceJarInput.resolvePath(context); var inheritance = MappingInheritance.read(path); mappings = MappingsUtil.fixInnerClasses(source.makeMappingsFillInheritance(context).make(inheritance)); } catch (IOException e) { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java index 75d4f00..06f6c63 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java @@ -81,6 +81,21 @@ public static String hash(Path path, String algorithm) { return HexFormat.of().formatHex(output.toByteArray()); } + public static String hash(byte[] bytes, String algorithm) { + return hash(bytes, 0, bytes.length, algorithm); + } + + public static String hash(byte[] bytes, int offset, int len, String algorithm) { + MessageDigest digest; + try { + digest = MessageDigest.getInstance(algorithm); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + digest.update(bytes, offset, len); + return HexFormat.of().formatHex(digest.digest()); + } + public static String hash(String key, String algorithm) { MessageDigest digest; try { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java new file mode 100644 index 0000000..ebffc1d --- /dev/null +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java @@ -0,0 +1,177 @@ +package dev.lukebemish.taskgraphrunner.runtime.zips; + +import com.google.protobuf.ByteString; +import dev.lukebemish.taskgraphrunner.runtime.Invocation; +import dev.lukebemish.taskgraphrunner.runtime.util.HashUtils; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; + +public class PiecewiseZips { + private final Invocation invocation; + + public PiecewiseZips(Invocation invocation) { + this.invocation = invocation; + } + + public void disassemble(Path inputZip, Path outputZipPartsFile) throws IOException { + var builder = ZipFile.newBuilder(); + byte[] zipFileBytes = Files.readAllBytes(inputZip); + var fileSize = zipFileBytes.length; + + // TODO: do we risk streaming it and assume we get "nice" jars? And then validate at the end? + + record FileCompressed(int offset, int length) implements Comparable { + @Override + public int compareTo(FileCompressed o) { + return Integer.compare(this.offset, o.offset); + } + } + var files = new ArrayList(); + + var maxEocdLength = 1 << 16; // 64KB + boolean foundValidZip = false; + outer: for (int i = fileSize - 22; i >= Math.max(0, fileSize - maxEocdLength); i--) { + files.clear(); + if (zipFileBytes[i] == 0x50 && zipFileBytes[i + 1] == 0x4b && zipFileBytes[i + 2] == 0x05 && zipFileBytes[i + 3] == 0x06) { + // found EOCD signature + int commentLength = ((zipFileBytes[i + 20] & 0xFF) | ((zipFileBytes[i + 21] & 0xFF) << 8)); + int eocdLength = 22 + commentLength; + if (i + eocdLength != fileSize) { + continue; + } + int totalCDRecords = (zipFileBytes[i + 10] & 0xFF) | ((zipFileBytes[i + 11] & 0xFF) << 8); + int cdSize = ((zipFileBytes[i + 12] & 0xFF) | ((zipFileBytes[i + 13] & 0xFF) << 8) | ((zipFileBytes[i + 14] & 0xFF) << 16) | ((zipFileBytes[i + 15] & 0xFF) << 24)); + int cdOffset = ((zipFileBytes[i + 16] & 0xFF) | ((zipFileBytes[i + 17] & 0xFF) << 8) | ((zipFileBytes[i + 18] & 0xFF) << 16) | ((zipFileBytes[i + 19] & 0xFF) << 24)); + int cdStart = cdOffset; + for (int j = 0; j < totalCDRecords; j++) { + if (cdStart < 0 || cdStart + 46 > fileSize) { + continue outer; + } + if (zipFileBytes[cdStart] != 0x50 || zipFileBytes[cdStart + 1] != 0x4b || zipFileBytes[cdStart + 2] != 0x01 || zipFileBytes[cdStart + 3] != 0x02) { + continue outer; + } + int fileNameLength = ((zipFileBytes[cdStart + 28] & 0xFF) | ((zipFileBytes[cdStart + 29] & 0xFF) << 8)); + int extraFieldLength = ((zipFileBytes[cdStart + 30] & 0xFF) | ((zipFileBytes[cdStart + 31] & 0xFF) << 8)); + int fileCommentLength = ((zipFileBytes[cdStart + 32] & 0xFF) | ((zipFileBytes[cdStart + 33] & 0xFF) << 8)); + int recordSize = 46 + fileNameLength + extraFieldLength + fileCommentLength; + if (cdStart + recordSize > fileSize) { + continue outer; + } + int compressedSize = ((zipFileBytes[cdStart + 20] & 0xFF) | ((zipFileBytes[cdStart + 21] & 0xFF) << 8) | ((zipFileBytes[cdStart + 22] & 0xFF) << 16) | ((zipFileBytes[cdStart + 23] & 0xFF) << 24)); + int localHeaderStart = ((zipFileBytes[cdStart + 42] & 0xFF) | ((zipFileBytes[cdStart + 43] & 0xFF) << 8) | ((zipFileBytes[cdStart + 44] & 0xFF) << 16) | ((zipFileBytes[cdStart + 45] & 0xFF) << 24)); + if (localHeaderStart < 0 || localHeaderStart + 30 > fileSize) { + continue outer; + } + cdStart += recordSize; + + if (zipFileBytes[localHeaderStart] != 0x50 || zipFileBytes[localHeaderStart + 1] != 0x4b || zipFileBytes[localHeaderStart + 2] != 0x03 || zipFileBytes[localHeaderStart + 3] != 0x04) { + continue outer; + } + if (localHeaderStart + 30 + compressedSize > fileSize) { + continue outer; + } + // TODO: should this cutoff be configurable? + if (compressedSize < 1000) { + // skip small/empty files + continue; + } + files.add(new FileCompressed(localHeaderStart, compressedSize)); + } + if (cdStart != cdOffset + cdSize) { + continue; + } + foundValidZip = true; + break; + } + } + + if (!foundValidZip) { + throw new IOException("Could not find valid EOCD record in zip file: " + inputZip); + } + + int head = 0; + for (var file : files) { + if (file.offset > head) { + builder.addEntries(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() + .setRawData(ByteString.copyFrom(zipFileBytes, head, file.offset - head)) + .build())); + head = file.offset; + } + if (file.offset == head) { + byte[] fileBytes = new byte[file.length]; + System.arraycopy(zipFileBytes, file.offset, fileBytes, 0, fileBytes.length); + var hash = HashUtils.hash(fileBytes, "SHA-256"); + var fileOutPath = invocation.pathFromHash(hash, "dat"); + if (!Files.exists(fileOutPath.getParent())) { + Files.createDirectories(fileOutPath.getParent()); + } + Files.write(fileOutPath, fileBytes); + builder.addEntries(ZipEntry.newBuilder().setReferencePart(ReferenceZipPart.newBuilder() + .setExpectedLength(file.length) + .setContentHash(hash) + .build())); + head += file.length; + } else { + throw new IOException("Overlapping local files in zip disassembly"); + } + } + if (head < fileSize) { + builder.addEntries(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() + .setRawData(ByteString.copyFrom(zipFileBytes, head, fileSize - head)) + .build())); + } + + builder.setFormat(1); + + try (var os = Files.newOutputStream(outputZipPartsFile)) { + var zipFile = builder.build(); + zipFile.writeTo(os); + } + } + + public void assemble(Path outputZip, Path zipPartsFile) throws IOException { + // TODO: try without NIO, see if it's faster? Profile? + ZipFile zipFile; + try (var is = Files.newInputStream(zipPartsFile)) { + // should be .binpb file + zipFile = ZipFile.parseFrom(is); + } + if (zipFile.getFormat() != 1) { + throw new IOException("Unsupported zip part holder format: " + zipFile.getFormat()); + } + try (var os = new BufferedOutputStream(Files.newOutputStream(outputZip)); + var channel = Channels.newChannel(os)) { + for (int i = 0; i < zipFile.getEntriesCount(); i++) { + var entry = zipFile.getEntries(i); + + switch (entry.getEntryCase()) { + case SIMPLEPART -> { + var part = entry.getSimplePart(); + channel.write(part.getRawData().asReadOnlyByteBuffer()); + } + case REFERENCEPART -> { + var part = entry.getReferencePart(); + var contentPath = invocation.pathFromHash(part.getContentHash(), "dat"); + if (!Files.exists(contentPath)) { + throw new IOException("Missing zip entry content for hash " + part.getContentHash() + " at " + contentPath); + } + + try (var is = new BufferedInputStream(Files.newInputStream(contentPath))) { + long written = is.transferTo(os); + if (written != part.getExpectedLength()) { + throw new IOException("Mismatched zip entry content size for hash " + part.getContentHash() + ": expected " + part.getExpectedLength() + ", got " + written); + } + } + } + default -> throw new IOException("Unsupported zip entry case: " + entry.getEntryCase()); + } + } + } + } +} diff --git a/src/main/proto/zipparts.proto b/src/main/proto/zipparts.proto new file mode 100644 index 0000000..c6bbf11 --- /dev/null +++ b/src/main/proto/zipparts.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "dev.lukebemish.taskgraphrunner.runtime.zips"; +option java_outer_classname = "ZipPartsProtos"; + +message SimpleZipPart { + bytes rawData = 1; +} + +message ReferenceZipPart { + string contentHash = 1; + int32 expectedLength = 2; +} + +message ZipEntry { + oneof entry { + SimpleZipPart simplePart = 1; + ReferenceZipPart referencePart = 2; + } +} + +message ZipFile { + int32 format = 1; + repeated ZipEntry entries = 2; +} From 381bf4da4e4552421a56b35a0a25c21e1384de0a Mon Sep 17 00:00:00 2001 From: Luke Bemish Date: Sat, 31 Jan 2026 00:02:28 -0500 Subject: [PATCH 2/6] Make reassembly more efficient --- .../taskgraphrunner/runtime/Invocation.java | 13 +++-- .../taskgraphrunner/runtime/TaskOutput.java | 8 +-- .../runtime/zips/PiecewiseZips.java | 53 ++++++++++++++----- 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java index c712ddc..fd692b3 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java @@ -109,7 +109,6 @@ public Path existingTaskOutput(Task task, String outputName) { var parts = contents.split("/"); if (CAN_REASSEMBLE.contains(parts[1])) { var prefix = parts[0].substring(0, 2); - // TODO: check existence of component parts, perhaps? return contentAddressableDirectory().resolve(prefix).resolve(parts[0] + "." + outputType + ".binpb"); } else { // Requires reassembly we cannot perform @@ -291,8 +290,16 @@ public void execute(Map results, @Nullable Path taskRecordJson) { JsonArray outputs = new JsonArray(); singleTask.addProperty("state", taskStatePath(task).toAbsolutePath().toString()); for (var output : task.outputTypes().entrySet()) { - // TODO: we ought to record reassembly info here too! - outputs.add(existingTaskOutput(task, output.getKey()).toAbsolutePath().toString()); + var existingPath = existingTaskOutput(task, output.getKey()); + var lastDot = existingPath.getFileName().toString().lastIndexOf('.'); + if (lastDot != -1 && !"binpb".equals(output.getValue()) && "binpb".equals(existingPath.getFileName().toString().substring(lastDot + 1))) { + var reassemblyInfo = new JsonObject(); + reassemblyInfo.addProperty("reassembles", existingPath.toAbsolutePath().toString()); + reassemblyInfo.addProperty("type", output.getValue()); + outputs.add(reassemblyInfo); + } else { + outputs.add(existingPath.toAbsolutePath().toString()); + } } singleTask.add("outputs", outputs); executed.add(task.name(), singleTask); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java index 36adb95..1eeb87a 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java @@ -7,10 +7,10 @@ public record TaskOutput(String taskName, String name) { public Path resolvePath(Context context) { // Reassembles ZIP or the like if needed var path = Objects.requireNonNull(context.existingTaskOutput(context.getTask(taskName), name), "Output did not exist"); - var parts = path.getFileName().toString().split("\\."); - if (parts.length > 1) { - var extension = parts[parts.length - 1]; - if (!extension.equals(context.getTask(taskName).outputTypes().get(name)) && "binpb".equals(extension)) { + var lastDot = path.getFileName().toString().lastIndexOf('.'); + if (lastDot != -1) { + var extension = path.getFileName().toString().substring(lastDot + 1); + if (!"binpb".equals(context.getTask(taskName).outputTypes().get(name)) && "binpb".equals(extension)) { return context.reassembleTaskOutput(context.getTask(taskName), name, path); } } diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java index ebffc1d..bd824bb 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java @@ -4,13 +4,15 @@ import dev.lukebemish.taskgraphrunner.runtime.Invocation; import dev.lukebemish.taskgraphrunner.runtime.util.HashUtils; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.IOException; -import java.nio.channels.Channels; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.function.Consumer; public class PiecewiseZips { private final Invocation invocation; @@ -76,8 +78,8 @@ public int compareTo(FileCompressed o) { if (localHeaderStart + 30 + compressedSize > fileSize) { continue outer; } - // TODO: should this cutoff be configurable? - if (compressedSize < 1000) { + // TODO: We could make this configureable? + if (compressedSize < 1) { // skip small/empty files continue; } @@ -130,13 +132,13 @@ public int compareTo(FileCompressed o) { builder.setFormat(1); try (var os = Files.newOutputStream(outputZipPartsFile)) { + // TODO: do we set this to be last used before the parts were written? var zipFile = builder.build(); zipFile.writeTo(os); } } public void assemble(Path outputZip, Path zipPartsFile) throws IOException { - // TODO: try without NIO, see if it's faster? Profile? ZipFile zipFile; try (var is = Files.newInputStream(zipPartsFile)) { // should be .binpb file @@ -145,15 +147,14 @@ public void assemble(Path outputZip, Path zipPartsFile) throws IOException { if (zipFile.getFormat() != 1) { throw new IOException("Unsupported zip part holder format: " + zipFile.getFormat()); } - try (var os = new BufferedOutputStream(Files.newOutputStream(outputZip)); - var channel = Channels.newChannel(os)) { + try (var outChannel = FileChannel.open(outputZip, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { for (int i = 0; i < zipFile.getEntriesCount(); i++) { var entry = zipFile.getEntries(i); switch (entry.getEntryCase()) { case SIMPLEPART -> { var part = entry.getSimplePart(); - channel.write(part.getRawData().asReadOnlyByteBuffer()); + copyBytes(outChannel, part.getRawData().asReadOnlyByteBuffer()); } case REFERENCEPART -> { var part = entry.getReferencePart(); @@ -162,10 +163,11 @@ public void assemble(Path outputZip, Path zipPartsFile) throws IOException { throw new IOException("Missing zip entry content for hash " + part.getContentHash() + " at " + contentPath); } - try (var is = new BufferedInputStream(Files.newInputStream(contentPath))) { - long written = is.transferTo(os); - if (written != part.getExpectedLength()) { - throw new IOException("Mismatched zip entry content size for hash " + part.getContentHash() + ": expected " + part.getExpectedLength() + ", got " + written); + try (var contentChannel = FileChannel.open(contentPath, StandardOpenOption.READ)) { + var totalSize = contentChannel.size(); + copyChannel(outChannel, contentChannel); + if (totalSize != part.getExpectedLength()) { + throw new IOException("Mismatched content length for zip entry with hash " + part.getContentHash() + ": expected " + part.getExpectedLength() + ", got " + totalSize); } } } @@ -174,4 +176,29 @@ public void assemble(Path outputZip, Path zipPartsFile) throws IOException { } } } + + private static void copyBytes(FileChannel outChannel, ByteBuffer bytes) throws IOException { + int fullSize = bytes.remaining(); + int total = 0; + int transferred; + while (total < fullSize && (transferred = outChannel.write(bytes)) != 0) { + total += transferred; + } + if (total < fullSize) { + throw new IOException("Could not fully copy from byte buffer"); + } + } + + private static void copyChannel(FileChannel outChannel, SeekableByteChannel inChannel) throws IOException { + long fullSize = inChannel.size(); + long total = 0; + long transferred; + while (total < fullSize && (transferred = outChannel.transferFrom(inChannel, outChannel.position(), fullSize - total)) > 0) { + total += transferred; + outChannel.position(outChannel.position() + transferred); + } + if (total < fullSize) { + throw new IOException("Could not fully copy from channel"); + } + } } From d3feab3899bc7d2b0e864f944d00d26fe9890b8a Mon Sep 17 00:00:00 2001 From: Luke Bemish Date: Sat, 31 Jan 2026 00:46:17 -0500 Subject: [PATCH 3/6] Make disassembly more efficient --- .../runtime/util/HashUtils.java | 12 ++++++++ .../runtime/zips/PiecewiseZips.java | 29 +++++++++++-------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java index 06f6c63..fcfde59 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/util/HashUtils.java @@ -5,6 +5,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -96,6 +97,17 @@ public static String hash(byte[] bytes, int offset, int len, String algorithm) { return HexFormat.of().formatHex(digest.digest()); } + public static String hash(ByteBuffer bytes, String algorithm) { + MessageDigest digest; + try { + digest = MessageDigest.getInstance(algorithm); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + digest.update(bytes); + return HexFormat.of().formatHex(digest.digest()); + } + public static String hash(String key, String algorithm) { MessageDigest digest; try { diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java index bd824bb..79ce2f7 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java @@ -1,6 +1,6 @@ package dev.lukebemish.taskgraphrunner.runtime.zips; -import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import dev.lukebemish.taskgraphrunner.runtime.Invocation; import dev.lukebemish.taskgraphrunner.runtime.util.HashUtils; @@ -12,7 +12,6 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.function.Consumer; public class PiecewiseZips { private final Invocation invocation; @@ -78,7 +77,7 @@ public int compareTo(FileCompressed o) { if (localHeaderStart + 30 + compressedSize > fileSize) { continue outer; } - // TODO: We could make this configureable? + // TODO: We could make this configurable? if (compressedSize < 1) { // skip small/empty files continue; @@ -97,23 +96,24 @@ public int compareTo(FileCompressed o) { throw new IOException("Could not find valid EOCD record in zip file: " + inputZip); } + var start = System.nanoTime(); int head = 0; for (var file : files) { if (file.offset > head) { builder.addEntries(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() - .setRawData(ByteString.copyFrom(zipFileBytes, head, file.offset - head)) + .setRawData(UnsafeByteOperations.unsafeWrap(zipFileBytes, head, file.offset - head)) .build())); head = file.offset; } if (file.offset == head) { - byte[] fileBytes = new byte[file.length]; - System.arraycopy(zipFileBytes, file.offset, fileBytes, 0, fileBytes.length); - var hash = HashUtils.hash(fileBytes, "SHA-256"); + var hash = HashUtils.hash(zipFileBytes, file.offset, file.length, "SHA-256"); var fileOutPath = invocation.pathFromHash(hash, "dat"); if (!Files.exists(fileOutPath.getParent())) { Files.createDirectories(fileOutPath.getParent()); } - Files.write(fileOutPath, fileBytes); + try (var fileOutChannel = FileChannel.open(fileOutPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { + copyBytes(fileOutChannel, ByteBuffer.wrap(zipFileBytes, file.offset, file.length)); + } builder.addEntries(ZipEntry.newBuilder().setReferencePart(ReferenceZipPart.newBuilder() .setExpectedLength(file.length) .setContentHash(hash) @@ -125,9 +125,10 @@ public int compareTo(FileCompressed o) { } if (head < fileSize) { builder.addEntries(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() - .setRawData(ByteString.copyFrom(zipFileBytes, head, fileSize - head)) + .setRawData(UnsafeByteOperations.unsafeWrap(zipFileBytes, head, fileSize - head)) .build())); } + System.out.println((System.nanoTime() - start) / 1000f); builder.setFormat(1); @@ -190,14 +191,18 @@ private static void copyBytes(FileChannel outChannel, ByteBuffer bytes) throws I } private static void copyChannel(FileChannel outChannel, SeekableByteChannel inChannel) throws IOException { - long fullSize = inChannel.size(); + copyChannel(outChannel, inChannel, 0, inChannel.size()); + } + + private static void copyChannel(FileChannel outChannel, SeekableByteChannel inChannel, int offset, long length) throws IOException { + inChannel.position(offset); long total = 0; long transferred; - while (total < fullSize && (transferred = outChannel.transferFrom(inChannel, outChannel.position(), fullSize - total)) > 0) { + while (total < length && (transferred = outChannel.transferFrom(inChannel, outChannel.position(), length - total)) > 0) { total += transferred; outChannel.position(outChannel.position() + transferred); } - if (total < fullSize) { + if (total < length) { throw new IOException("Could not fully copy from channel"); } } From 6e13bfc9476d98641f3a9ba7a40fc7aca9d35765 Mon Sep 17 00:00:00 2001 From: Luke Bemish Date: Sat, 31 Jan 2026 12:28:57 -0500 Subject: [PATCH 4/6] Hash zip members in parallel --- .../runtime/zips/PiecewiseZips.java | 86 +++++++++++++------ 1 file changed, 60 insertions(+), 26 deletions(-) diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java index 79ce2f7..8e63726 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java @@ -3,8 +3,10 @@ import com.google.protobuf.UnsafeByteOperations; import dev.lukebemish.taskgraphrunner.runtime.Invocation; import dev.lukebemish.taskgraphrunner.runtime.util.HashUtils; +import org.jspecify.annotations.Nullable; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SeekableByteChannel; @@ -12,27 +14,36 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; public class PiecewiseZips { + private static final ThreadFactory FACTORY = Thread.ofVirtual().name("TaskGraphRunner-Zip-", 1).factory(); + private static final ExecutorService PARALLEL_EXECUTOR = Executors.newThreadPerTaskExecutor(FACTORY); + private final Invocation invocation; public PiecewiseZips(Invocation invocation) { this.invocation = invocation; } + private record FileCompressed(int offset, int length) implements Comparable { + @Override + public int compareTo(FileCompressed o) { + return Integer.compare(this.offset, o.offset); + } + } + public void disassemble(Path inputZip, Path outputZipPartsFile) throws IOException { var builder = ZipFile.newBuilder(); byte[] zipFileBytes = Files.readAllBytes(inputZip); var fileSize = zipFileBytes.length; - // TODO: do we risk streaming it and assume we get "nice" jars? And then validate at the end? - - record FileCompressed(int offset, int length) implements Comparable { - @Override - public int compareTo(FileCompressed o) { - return Integer.compare(this.offset, o.offset); - } - } var files = new ArrayList(); var maxEocdLength = 1 << 16; // 64KB @@ -96,39 +107,46 @@ public int compareTo(FileCompressed o) { throw new IOException("Could not find valid EOCD record in zip file: " + inputZip); } - var start = System.nanoTime(); int head = 0; + record OrRef(@Nullable ZipEntry entry, @Nullable Future future) {} + List partsList = new ArrayList<>(); for (var file : files) { if (file.offset > head) { - builder.addEntries(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() + partsList.add(new OrRef(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() .setRawData(UnsafeByteOperations.unsafeWrap(zipFileBytes, head, file.offset - head)) - .build())); + ).build(), null)); head = file.offset; } if (file.offset == head) { - var hash = HashUtils.hash(zipFileBytes, file.offset, file.length, "SHA-256"); - var fileOutPath = invocation.pathFromHash(hash, "dat"); - if (!Files.exists(fileOutPath.getParent())) { - Files.createDirectories(fileOutPath.getParent()); - } - try (var fileOutChannel = FileChannel.open(fileOutPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { - copyBytes(fileOutChannel, ByteBuffer.wrap(zipFileBytes, file.offset, file.length)); - } - builder.addEntries(ZipEntry.newBuilder().setReferencePart(ReferenceZipPart.newBuilder() - .setExpectedLength(file.length) - .setContentHash(hash) - .build())); + partsList.add(new OrRef(null, PARALLEL_EXECUTOR.submit(() -> { + try { + return referenceEntry(file, zipFileBytes); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }))); + head += file.length; } else { throw new IOException("Overlapping local files in zip disassembly"); } } if (head < fileSize) { - builder.addEntries(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() + partsList.add(new OrRef(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() .setRawData(UnsafeByteOperations.unsafeWrap(zipFileBytes, head, fileSize - head)) - .build())); + ).build(), null)); + } + for (var orRef : partsList) { + if (orRef.entry() instanceof ZipEntry entry) { + builder.addEntries(entry); + } else { + try { + builder.addEntries(Objects.requireNonNull(orRef.future()).get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } } - System.out.println((System.nanoTime() - start) / 1000f); builder.setFormat(1); @@ -139,6 +157,22 @@ public int compareTo(FileCompressed o) { } } + private ZipEntry referenceEntry(FileCompressed file, byte[] zipFileBytes) throws IOException { + var hash = HashUtils.hash(zipFileBytes, file.offset, file.length, "SHA-256"); + var fileOutPath = invocation.pathFromHash(hash, "dat"); + if (!Files.exists(fileOutPath.getParent())) { + Files.createDirectories(fileOutPath.getParent()); + } + try (var fileOutChannel = FileChannel.open(fileOutPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { + copyBytes(fileOutChannel, ByteBuffer.wrap(zipFileBytes, file.offset, file.length)); + } + + return ZipEntry.newBuilder().setReferencePart(ReferenceZipPart.newBuilder() + .setExpectedLength(file.length) + .setContentHash(hash) + ).build(); + } + public void assemble(Path outputZip, Path zipPartsFile) throws IOException { ZipFile zipFile; try (var is = Files.newInputStream(zipPartsFile)) { From f204efc1e34d6e2f40602fe139ab28afc20ce16e Mon Sep 17 00:00:00 2001 From: Luke Bemish Date: Sat, 31 Jan 2026 12:48:10 -0500 Subject: [PATCH 5/6] Reassemble zips in parallel Still need to avoid reassembly of unnecessary dependencies --- .../taskgraphrunner/runtime/TaskOutput.java | 1 + .../runtime/zips/PiecewiseZips.java | 81 ++++++++++++++----- 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java index 1eeb87a..98f72d1 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java @@ -5,6 +5,7 @@ public record TaskOutput(String taskName, String name) { public Path resolvePath(Context context) { + // TODO: avoid reassembly of unchanged dependencies if they are not needed // Reassembles ZIP or the like if needed var path = Objects.requireNonNull(context.existingTaskOutput(context.getTask(taskName), name), "Output did not exist"); var lastDot = path.getFileName().toString().lastIndexOf('.'); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java index 8e63726..825a0d9 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java @@ -20,11 +20,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; public class PiecewiseZips { - private static final ThreadFactory FACTORY = Thread.ofVirtual().name("TaskGraphRunner-Zip-", 1).factory(); - private static final ExecutorService PARALLEL_EXECUTOR = Executors.newThreadPerTaskExecutor(FACTORY); + private static final ExecutorService PARALLEL_EXECUTOR = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("TaskGraphRunner-Zip-", 1).factory()); private final Invocation invocation; @@ -182,60 +180,103 @@ public void assemble(Path outputZip, Path zipPartsFile) throws IOException { if (zipFile.getFormat() != 1) { throw new IOException("Unsupported zip part holder format: " + zipFile.getFormat()); } + int totalTargetSize = 0; + for (int i = 0; i < zipFile.getEntriesCount(); i++) { + var entry = zipFile.getEntries(i); + switch (entry.getEntryCase()) { + case SIMPLEPART -> { + var part = entry.getSimplePart(); + totalTargetSize += part.getRawData().size(); + } + case REFERENCEPART -> { + var part = entry.getReferencePart(); + totalTargetSize += part.getExpectedLength(); + } + default -> throw new IOException("Unsupported zip entry case: " + entry.getEntryCase()); + } + } try (var outChannel = FileChannel.open(outputZip, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { + outChannel.truncate(totalTargetSize); + int offset = 0; + var futures = new Future[zipFile.getEntriesCount()]; for (int i = 0; i < zipFile.getEntriesCount(); i++) { var entry = zipFile.getEntries(i); + var thisOffset = offset; switch (entry.getEntryCase()) { case SIMPLEPART -> { var part = entry.getSimplePart(); - copyBytes(outChannel, part.getRawData().asReadOnlyByteBuffer()); + copyBytes(outChannel, thisOffset, part.getRawData().asReadOnlyByteBuffer()); + offset += part.getRawData().size(); } case REFERENCEPART -> { var part = entry.getReferencePart(); - var contentPath = invocation.pathFromHash(part.getContentHash(), "dat"); - if (!Files.exists(contentPath)) { - throw new IOException("Missing zip entry content for hash " + part.getContentHash() + " at " + contentPath); - } - - try (var contentChannel = FileChannel.open(contentPath, StandardOpenOption.READ)) { - var totalSize = contentChannel.size(); - copyChannel(outChannel, contentChannel); - if (totalSize != part.getExpectedLength()) { - throw new IOException("Mismatched content length for zip entry with hash " + part.getContentHash() + ": expected " + part.getExpectedLength() + ", got " + totalSize); + futures[i] = PARALLEL_EXECUTOR.submit(() -> { + try { + var contentPath = invocation.pathFromHash(part.getContentHash(), "dat"); + if (!Files.exists(contentPath)) { + throw new IOException("Missing zip entry content for hash " + part.getContentHash() + " at " + contentPath); + } + + try (var contentChannel = FileChannel.open(contentPath, StandardOpenOption.READ)) { + var totalSize = contentChannel.size(); + if (totalSize != part.getExpectedLength()) { + throw new IOException("Mismatched content length for zip entry with hash " + part.getContentHash() + ": expected " + part.getExpectedLength() + ", got " + totalSize); + } + copyChannel(outChannel, thisOffset, contentChannel, 0, totalSize); + } + } catch (IOException e) { + throw new UncheckedIOException(e); } - } + }); + + offset += part.getExpectedLength(); } default -> throw new IOException("Unsupported zip entry case: " + entry.getEntryCase()); } } + for (var future : futures) { + if (future != null) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } } } private static void copyBytes(FileChannel outChannel, ByteBuffer bytes) throws IOException { + copyBytes(outChannel, outChannel.position(), bytes); + } + + private static void copyBytes(FileChannel outChannel, long outPosition, ByteBuffer bytes) throws IOException { int fullSize = bytes.remaining(); int total = 0; int transferred; - while (total < fullSize && (transferred = outChannel.write(bytes)) != 0) { + while (total < fullSize && (transferred = outChannel.write(bytes, outPosition)) != 0) { total += transferred; } + outChannel.position(outPosition + total); if (total < fullSize) { throw new IOException("Could not fully copy from byte buffer"); } } private static void copyChannel(FileChannel outChannel, SeekableByteChannel inChannel) throws IOException { - copyChannel(outChannel, inChannel, 0, inChannel.size()); + copyChannel(outChannel, outChannel.position(), inChannel, 0, inChannel.size()); } - private static void copyChannel(FileChannel outChannel, SeekableByteChannel inChannel, int offset, long length) throws IOException { + private static void copyChannel(FileChannel outChannel, long outPosition, SeekableByteChannel inChannel, int offset, long length) throws IOException { inChannel.position(offset); long total = 0; long transferred; - while (total < length && (transferred = outChannel.transferFrom(inChannel, outChannel.position(), length - total)) > 0) { + while (total < length && (transferred = outChannel.transferFrom(inChannel, outPosition, length - total)) > 0) { total += transferred; - outChannel.position(outChannel.position() + transferred); + outPosition += transferred; } + outChannel.position(outPosition); if (total < length) { throw new IOException("Could not fully copy from channel"); } From 8a73c1d46d724e85b6b0e43d29d6f43f9b7c4414 Mon Sep 17 00:00:00 2001 From: Luke Bemish Date: Sat, 31 Jan 2026 23:01:25 -0500 Subject: [PATCH 6/6] Avoid hashing as much as possible, and switch to length-based validation for content-addressed data as the hash is implied by the location --- .../taskgraphrunner/runtime/Context.java | 4 +- .../taskgraphrunner/runtime/Invocation.java | 17 ++++-- .../taskgraphrunner/runtime/Task.java | 59 +++++++++++++------ .../taskgraphrunner/runtime/TaskInput.java | 5 +- .../taskgraphrunner/runtime/TaskOutput.java | 4 +- .../runtime/tasks/DownloadAssetsTask.java | 2 +- .../runtime/zips/PiecewiseZips.java | 17 ++++-- 7 files changed, 74 insertions(+), 34 deletions(-) diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java index 2d27673..7297101 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Context.java @@ -23,7 +23,9 @@ public interface Context { String storeTaskOutput(Task task, String output) throws IOException; - Path existingTaskOutput(Task task, String outputName); + String contentAddressForTaskOutput(Task task, String outputName); + + Path contentAddressedTaskOutput(Task task, String outputName); Path reassembleTaskOutput(Task task, String outputName, Path reassemblyInfo); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java index fd692b3..4efd0e6 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Invocation.java @@ -3,7 +3,6 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import dev.lukebemish.taskgraphrunner.model.Output; -import dev.lukebemish.taskgraphrunner.runtime.util.HashUtils; import dev.lukebemish.taskgraphrunner.runtime.util.JsonUtils; import dev.lukebemish.taskgraphrunner.runtime.util.LockManager; import dev.lukebemish.taskgraphrunner.runtime.zips.PiecewiseZips; @@ -92,8 +91,16 @@ public Path taskOutputMarkerPath(Task task, String outputName) { "jar" ); + public String contentAddressForTaskOutput(Task task, String outputName) { + var outputType = task.outputTypes().get(outputName); + if (outputType == null) { + throw new IllegalArgumentException("No such output `"+outputName+"` for task `"+task.name()+"`"); + } + return task.getContentAddress(outputName); + } + @Override - public Path existingTaskOutput(Task task, String outputName) { + public Path contentAddressedTaskOutput(Task task, String outputName) { var outputType = task.outputTypes().get(outputName); if (outputType == null) { throw new IllegalArgumentException("No such output `"+outputName+"` for task `"+task.name()+"`"); @@ -107,6 +114,7 @@ public Path existingTaskOutput(Task task, String outputName) { var contents = Files.readString(markerPath, StandardCharsets.UTF_8); if (contents.contains("/")) { var parts = contents.split("/"); + task.setContentAddress(outputName, parts[0]); if (CAN_REASSEMBLE.contains(parts[1])) { var prefix = parts[0].substring(0, 2); return contentAddressableDirectory().resolve(prefix).resolve(parts[0] + "." + outputType + ".binpb"); @@ -115,6 +123,7 @@ public Path existingTaskOutput(Task task, String outputName) { return null; } } + task.setContentAddress(outputName, contents); var prefix = contents.substring(0, 2); return contentAddressableDirectory().resolve(prefix).resolve(contents + "." + outputType); } catch (IOException e) { @@ -128,7 +137,7 @@ public Path existingTaskOutput(Task task, String outputName) { public String storeTaskOutput(Task task, String output) throws IOException { var outputPath = taskOutputPath(task, output); var outputType = task.outputTypes().get(output); - var hash = HashUtils.hash(outputPath, "SHA-256"); + var hash = contentAddressForTaskOutput(task, output); return switch (outputType) { case "zip", "jar" -> { var outPath = pathFromHash(hash, outputType + ".binpb"); @@ -290,7 +299,7 @@ public void execute(Map results, @Nullable Path taskRecordJson) { JsonArray outputs = new JsonArray(); singleTask.addProperty("state", taskStatePath(task).toAbsolutePath().toString()); for (var output : task.outputTypes().entrySet()) { - var existingPath = existingTaskOutput(task, output.getKey()); + var existingPath = contentAddressedTaskOutput(task, output.getKey()); var lastDot = existingPath.getFileName().toString().lastIndexOf('.'); if (lastDot != -1 && !"binpb".equals(output.getValue()) && "binpb".equals(existingPath.getFileName().toString().substring(lastDot + 1))) { var reassemblyInfo = new JsonObject(); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java index ce070da..fa848a0 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/Task.java @@ -274,12 +274,12 @@ private void execute(Context context) { } catch (IOException e) { throw new UncheckedIOException(e); } - Map currentHashes = new HashMap<>(); + Map currentLengths = new HashMap<>(); if (context.useCached() && Files.exists(statePath)) { try (var reader = Files.newBufferedReader(statePath, StandardCharsets.UTF_8)) { JsonObject existingState = GSON.fromJson(reader, JsonObject.class); JsonElement existingInputState = existingState.get("inputs"); - var targetHashes = existingState.get("hashes").getAsJsonObject(); + var targetLengths = existingState.get("lengths").getAsJsonObject(); var lastExecutedJson = existingState.get("lastExecuted"); this.outputId = existingState.get("outputId").getAsInt(); long lastExecuted = 0; @@ -288,20 +288,20 @@ private void execute(Context context) { } boolean allOutputsMatch = true; for (var output : outputTypes().keySet()) { - var oldHashElement = targetHashes.get(output); - if (oldHashElement == null || !oldHashElement.isJsonPrimitive() || !oldHashElement.getAsJsonPrimitive().isString()) { + var oldLengthElement = targetLengths.get(output); + if (oldLengthElement == null || !oldLengthElement.isJsonPrimitive() || !oldLengthElement.getAsJsonPrimitive().isNumber()) { allOutputsMatch = false; break; } - var oldHash = oldHashElement.getAsString(); - var outputPath = context.existingTaskOutput(this, output); - if (outputPath == null || !Files.exists(outputPath)) { + var oldLength = oldLengthElement.getAsLong(); + var contentOutputPath = context.contentAddressedTaskOutput(this, output); + if (contentOutputPath == null || !Files.exists(contentOutputPath)) { allOutputsMatch = false; break; } - var hash = HashUtils.hash(outputPath); - currentHashes.put(output, hash); - if (!hash.equals(oldHash)) { + var size = Files.size(contentOutputPath); + currentLengths.put(output, size); + if (size != oldLength) { allOutputsMatch = false; break; } @@ -330,17 +330,29 @@ private void execute(Context context) { } } boolean nothingChanged = true; + for (var output : outputTypes().keySet()) { + var newHash = HashUtils.hash(context.taskOutputPath(this, output), "SHA-256"); + setContentAddress(output, newHash); + } for (var output : outputTypes().keySet()) { var outputPath = context.taskOutputPath(this, output); - var existingHash = currentHashes.get(output); - if (existingHash == null) { + var newHash = getContentAddress(output); + var existingLength = currentLengths.get(output); + if (existingLength == null) { nothingChanged = false; break; } - var newHash = HashUtils.hash(outputPath); - if (!existingHash.equals(newHash)) { + var newLength = Files.size(outputPath); + if (newLength != existingLength) { nothingChanged = false; break; + } else { + // lengths match, check hashes. Old hash comes from the cache key + var oldHash = context.contentAddressForTaskOutput(this, output); + if (!newHash.equals(oldHash)) { + nothingChanged = false; + break; + } } } if (nothingChanged) { @@ -375,16 +387,25 @@ public final String type() { return type; } + private final Map contentAddress = new HashMap<>(); + void setContentAddress(String output, String address) { + contentAddress.put(output, address); + } + + String getContentAddress(String output) { + return contentAddress.get(output); + } + private void saveState(Context context) { var statePath = context.taskStatePath(this); var inputState = recordedValue(context); - JsonObject outputHashes = new JsonObject(); + JsonObject outputLengths = new JsonObject(); for (var output : outputTypes().keySet()) { - var outputPath = Objects.requireNonNull(context.existingTaskOutput(this, output), "Output did not exist"); + var outputPath = Objects.requireNonNull(context.contentAddressedTaskOutput(this, output), "Output did not exist"); if (Files.exists(outputPath)) { try { - var hash = HashUtils.hash(outputPath); - outputHashes.addProperty(output, hash); + var length = Files.size(outputPath); + outputLengths.addProperty(output, length); } catch (IOException e) { throw new RuntimeException(e); } @@ -394,7 +415,7 @@ private void saveState(Context context) { } JsonObject state = new JsonObject(); state.add("inputs", inputState); - state.add("hashes", outputHashes); + state.add("lengths", outputLengths); state.addProperty("outputId", outputId); var currentTime = System.currentTimeMillis(); state.add("lastExecuted", new JsonPrimitive(currentTime)); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java index f956b64..5c02e93 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskInput.java @@ -23,6 +23,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HexFormat; import java.util.List; import java.util.stream.Collectors; @@ -147,7 +148,9 @@ public void hashReference(ByteConsumer digest, Context context) {} @Override public void hashContents(ByteConsumer digest, Context context) { - HashUtils.hash(output.resolvePath(context), digest); + // We already have a hash from the content-address name + var contentAddress = context.contentAddressForTaskOutput(context.getTask(output.taskName()), output.name()); + digest.update(HexFormat.of().parseHex(contentAddress)); } @Override diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java index 98f72d1..95db93e 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/TaskOutput.java @@ -5,9 +5,7 @@ public record TaskOutput(String taskName, String name) { public Path resolvePath(Context context) { - // TODO: avoid reassembly of unchanged dependencies if they are not needed - // Reassembles ZIP or the like if needed - var path = Objects.requireNonNull(context.existingTaskOutput(context.getTask(taskName), name), "Output did not exist"); + var path = Objects.requireNonNull(context.contentAddressedTaskOutput(context.getTask(taskName), name), "Output did not exist"); var lastDot = path.getFileName().toString().lastIndexOf('.'); if (lastDot != -1) { var extension = path.getFileName().toString().substring(lastDot + 1); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java index 0c29f3f..307ab64 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/tasks/DownloadAssetsTask.java @@ -44,7 +44,7 @@ public Map outputTypes() { @Override protected boolean upToDate(long lastExecuted, Context context) { - var propertiesPath = context.existingTaskOutput(this, "properties"); + var propertiesPath = context.contentAddressedTaskOutput(this, "properties"); var properties = new Properties(); try (var reader = Files.newBufferedReader(propertiesPath, StandardCharsets.UTF_8)) { properties.load(reader); diff --git a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java index 825a0d9..7a4b7b3 100644 --- a/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java +++ b/src/main/java/dev/lukebemish/taskgraphrunner/runtime/zips/PiecewiseZips.java @@ -13,6 +13,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileTime; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -108,6 +110,7 @@ public void disassemble(Path inputZip, Path outputZipPartsFile) throws IOExcepti int head = 0; record OrRef(@Nullable ZipEntry entry, @Nullable Future future) {} List partsList = new ArrayList<>(); + var time = FileTime.from(Instant.now()); for (var file : files) { if (file.offset > head) { partsList.add(new OrRef(ZipEntry.newBuilder().setSimplePart(SimpleZipPart.newBuilder() @@ -118,7 +121,7 @@ record OrRef(@Nullable ZipEntry entry, @Nullable Future future) {} if (file.offset == head) { partsList.add(new OrRef(null, PARALLEL_EXECUTOR.submit(() -> { try { - return referenceEntry(file, zipFileBytes); + return referenceEntry(file, zipFileBytes, time); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -149,20 +152,24 @@ record OrRef(@Nullable ZipEntry entry, @Nullable Future future) {} builder.setFormat(1); try (var os = Files.newOutputStream(outputZipPartsFile)) { - // TODO: do we set this to be last used before the parts were written? var zipFile = builder.build(); zipFile.writeTo(os); } + Files.setLastModifiedTime(outputZipPartsFile, time); } - private ZipEntry referenceEntry(FileCompressed file, byte[] zipFileBytes) throws IOException { + private ZipEntry referenceEntry(FileCompressed file, byte[] zipFileBytes, FileTime time) throws IOException { var hash = HashUtils.hash(zipFileBytes, file.offset, file.length, "SHA-256"); var fileOutPath = invocation.pathFromHash(hash, "dat"); if (!Files.exists(fileOutPath.getParent())) { Files.createDirectories(fileOutPath.getParent()); } - try (var fileOutChannel = FileChannel.open(fileOutPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { - copyBytes(fileOutChannel, ByteBuffer.wrap(zipFileBytes, file.offset, file.length)); + if (Files.exists(fileOutPath) && Files.size(fileOutPath) == file.length) { + Files.setLastModifiedTime(fileOutPath, time); + } else { + try (var fileOutChannel = FileChannel.open(fileOutPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { + copyBytes(fileOutChannel, ByteBuffer.wrap(zipFileBytes, file.offset, file.length)); + } } return ZipEntry.newBuilder().setReferencePart(ReferenceZipPart.newBuilder()