diff --git a/VERSION b/VERSION index 332e8847..d7f89d02 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -5.8.3 +5.8.4 diff --git a/gortools/src/test/java/gorsat/UTestCram.java b/gortools/src/test/java/gorsat/UTestCram.java index 9b1d20bc..8b54460e 100644 --- a/gortools/src/test/java/gorsat/UTestCram.java +++ b/gortools/src/test/java/gorsat/UTestCram.java @@ -38,10 +38,9 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Paths; -import java.util.List; import static gorsat.TestUtils.LINE_SPLIT_PATTERN; -import static org.gorpipe.gor.driver.providers.stream.datatypes.cram.CramIterator.KEY_REFERENCE_FORCE_FOLDER; +import static org.gorpipe.gor.driver.providers.stream.datatypes.cram.CramIterator.KEY_REFERENCE_PREFER_FOLDER; public class UTestCram { @@ -121,7 +120,7 @@ public void readCramWithFastaReferenceFromConfigException() throws IOException { try { TestUtils.runGorPipeCount(args); } catch (GorResourceException e) { - Assert.assertTrue(e.getMessage().startsWith("Reference does not exist.")); + Assert.assertTrue(e.getMessage().startsWith("No cram reference found")); Assert.assertTrue(e.getUri().endsWith("cram_query_sorted2.fasta")); } } @@ -129,7 +128,7 @@ public void readCramWithFastaReferenceFromConfigException() throws IOException { @Test public void readCramWithFastaReferenceAndGenerateMissingAttributes() { System.setProperty("gor.driver.cram.fastareferencesource", DataUtil.toFile("../tests/data/external/samtools/cram_query_sorted", DataType.FASTA)); - System.setProperty(KEY_REFERENCE_FORCE_FOLDER, "false"); + System.setProperty(KEY_REFERENCE_PREFER_FOLDER, "false"); String[] args = new String[] {"gor " + DataUtil.toFile("../tests/data/external/samtools/cram_query_sorted", DataType.CRAM)}; diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/CramIterator.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/CramIterator.java index fe31c7a7..be21fd41 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/CramIterator.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/CramIterator.java @@ -70,11 +70,11 @@ public class CramIterator extends BamIterator { public final static String KEY_GENERATEMISSINGATTRIBUTES = "gor.driver.cram.generatemissingattributes"; public final static String KEY_FASTAREFERENCESOURCE = "gor.driver.cram.fastareferencesource"; - public final static String KEY_REFERENCE_FORCE_FOLDER = "gor.driver.cram.reference.force.folder."; + public final static String KEY_REFERENCE_PREFER_FOLDER = "gor.driver.cram.reference.preferfolder"; private static final Logger log = LoggerFactory.getLogger(CramIterator.class); - private CramFile cramFile; + private final CramFile cramFile; private int[] columns; ChromoLookup lookup; private String fileName; @@ -164,7 +164,7 @@ public void init(GorSession session) { fileName = cramFile.getFileSource().getSourceReference().getUrl(); - referenceSource = createReferenceSource(getInitialReferenceFile(), session.getProjectContext().getRealProjectRoot()); + referenceSource = createReferenceSource(session.getProjectContext().getRealProjectRoot()); SeekableBufferedStream cramStream = new SeekableBufferedStream(new StreamSourceSeekableStream(cramFile.getFileSource())); @@ -191,14 +191,9 @@ public void init(GorSession session) { * * @return initial reference file */ - private String getInitialReferenceFile() { + private String getSourceReferenceFile() { StreamSource ref = cramFile.getReferenceSource(); - String referenceFileName = ""; - - if (ref != null) { - referenceFileName = ref.getSourceReference().getUrl(); - } - return referenceFileName; + return ref != null ? ref.getSourceReference().getUrl() : null; } private void closeReferenceFile() { @@ -211,74 +206,99 @@ private void closeReferenceFile() { } } - private CRAMReferenceSource createReferenceSource(String ref, String root) { + private CRAMReferenceSource createReferenceSource(String root) { + File file = null; + boolean forceFolder = false; + + String sourceRef = getSourceReferenceFile(); + if (!Strings.isNullOrEmpty(sourceRef)) { + file = new File(sourceRef); + } + + if (file == null) { + file = getReferenceFromReferenceLinkFile(); + } + + if (file == null) { + file = getReferenceFromGorConfig(root); + forceFolder = Boolean.parseBoolean(System.getProperty(KEY_REFERENCE_PREFER_FOLDER, "true")); + } - File file = new File(ref); - file = getReferenceFromReferenceLinkFile(file); - file = getReferenceFromGorConfig(file, root); - file = getReferenceFromGorOptions(file); + if (file == null) { + file = getReferenceFromGorOptions(); + forceFolder = Boolean.parseBoolean(System.getProperty(KEY_REFERENCE_PREFER_FOLDER, "true")); - if (!file.exists()) { - throw new GorResourceException("Reference does not exist.", file.toString()); + } + if (file == null || !file.exists()) { + throw new GorResourceException("No cram reference found: %s".formatted(file), file != null ? file.getPath() : "null"); } // This reference should be fasta but we let the htsjdk library decide - return createFileReference(file); + return createFileReference(file, forceFolder); } - private File getReferenceFromGorOptions(File file) { - if (!file.exists()) { - String refPath = System.getProperty(KEY_FASTAREFERENCESOURCE, ""); + private File getReferenceFromGorOptions() { + String refPath = System.getProperty(KEY_FASTAREFERENCESOURCE, ""); - if (!StringUtils.isEmpty(refPath)) { - return new File(refPath); - } + if (!StringUtils.isEmpty(refPath)) { + return new File(refPath); } - return file; + + return null; } - private File getReferenceFromGorConfig(File file, String root) { - if (!file.exists() && !Strings.isNullOrEmpty(projectCramReferencePath)) { + private File getReferenceFromGorConfig(String root) { + if (!Strings.isNullOrEmpty(projectCramReferencePath)) { return PathUtils.resolve(Paths.get(root), Paths.get(projectCramReferencePath)).toFile(); } - return file; + return null; } - private File getReferenceFromReferenceLinkFile(File file) { - if (!file.exists()) { - File refLinkFile = new File(this.fileName + ".ref"); + private File getReferenceFromReferenceLinkFile() { + File refLinkFile = new File(this.fileName + ".ref"); - if (refLinkFile.exists()) { - try { - List lines = FileUtils.readLines(refLinkFile, Charset.defaultCharset()); + if (refLinkFile.exists()) { + try { + List lines = FileUtils.readLines(refLinkFile, Charset.defaultCharset()); - if (lines.size() > 0) { - return new File(lines.get(0)); - } - } catch (IOException e) { - /*Do Nothing*/ + if (lines.size() > 0) { + return new File(lines.get(0)); } + } catch (IOException e) { + /*Do Nothing*/ } } - return file; + + return null; } - private CRAMReferenceSource createFileReference(File refFile) { + private CRAMReferenceSource createFileReference(File refFile, boolean preferFolder) { if (refFile.isDirectory()) { - return new CompositeReferenceSource(List.of( - new FolderReferenceSource(refFile.getPath()), - new EBIReferenceSource(refFile.getPath()))); - } else if (Boolean.getBoolean(System.getProperty(KEY_REFERENCE_FORCE_FOLDER, "true"))) { - return new CompositeReferenceSource(List.of( - new FolderReferenceSource(refFile.getParent()), - new EBIReferenceSource(refFile.getParent()))); + return createCompositeReferenceSource(refFile); + } else if (preferFolder) { + try { + return createCompositeReferenceSource(refFile.getParentFile()); + } catch (Exception e) { + // Fallback to single file, in case none of the files contains proper meta. + return createSharedFastaReferenceSource(refFile); + } } else { - referenceSequenceFile = ReferenceSequenceFileFactory.getReferenceSequenceFile(refFile); - - String referenceKey = FilenameUtils.removeExtension(refFile.getName()); - var referenceFile = ReferenceSequenceFileFactory.getReferenceSequenceFile(refFile); - return new SharedFastaReferenceSource(referenceFile, referenceKey); + return createSharedFastaReferenceSource(refFile); } } + private CRAMReferenceSource createSharedFastaReferenceSource(File refFile) { + log.debug("Using fasta reference file for CRAM: {}", refFile.getPath()); + referenceSequenceFile = ReferenceSequenceFileFactory.getReferenceSequenceFile(refFile); + + String referenceKey = FilenameUtils.removeExtension(refFile.getName()); + var referenceFile = ReferenceSequenceFileFactory.getReferenceSequenceFile(refFile); + return new SharedFastaReferenceSource(referenceFile, referenceKey); + } + private CRAMReferenceSource createCompositeReferenceSource(File refFolder) { + log.debug("Using folder reference for CRAM: {}", refFolder.getPath()); + return new CompositeReferenceSource(List.of( + new FolderReferenceSource(refFolder.getPath()), + new EBIReferenceSource(refFolder.getPath()))); + } } diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/EBIReferenceSource.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/EBIReferenceSource.java index d12dbfac..85add05a 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/EBIReferenceSource.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/EBIReferenceSource.java @@ -2,6 +2,8 @@ import htsjdk.samtools.Defaults; import htsjdk.samtools.SAMSequenceRecord; +import htsjdk.samtools.cram.io.InputStreamUtils; +import htsjdk.samtools.util.SequenceUtil; import org.gorpipe.exceptions.GorDataException; import org.gorpipe.exceptions.GorResourceException; import org.gorpipe.gor.table.util.PathUtils; @@ -9,13 +11,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; import java.io.IOException; -import java.net.HttpURLConnection; +import java.io.InputStream; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -34,6 +36,8 @@ public class EBIReferenceSource extends MD5CachedReferenceSource { protected static Map md5ToRefbases = new ConcurrentHashMap<>(); + private static final int DOWNLOAD_TRIES_BEFORE_FAILING = 2; + private Path referenceFolder; // If null we do not download. public EBIReferenceSource() { @@ -81,8 +85,10 @@ private void processRefbasesFile(Path refbases) { @Override protected byte[] loadReference(final SAMSequenceRecord record) { + var md5 = record.getMd5(); + // Load from refbases file. - Path refbasesPath = md5ToRefbases.get(record.getMd5()); + Path refbasesPath = md5ToRefbases.get(md5); if (refbasesPath != null) { try { byte[] bases = Files.readAllBytes(refbasesPath); @@ -96,49 +102,55 @@ protected byte[] loadReference(final SAMSequenceRecord record) { } // Load from EBI service. - if (Boolean.parseBoolean(System.getProperty(KEY_USE_CRAM_REF_DOWNLOAD, - Boolean.toString(Defaults.USE_CRAM_REF_DOWNLOAD)))) { + if (Boolean.parseBoolean(System.getProperty(KEY_USE_CRAM_REF_DOWNLOAD, "True"))) { try { - - byte[] bases = downloadFromEBI(record.getMd5()); - if (bases != EMPTY_BASES) { - saveRefbasesToDisk(record.getMd5(), bases); + // Just use mem, this is going into mem cache anyway. + byte[] bases = downloadFromEBI(md5); + if (bases != null) { + saveRefbasesToDisk(md5, bases); } return bases; - } catch (IOException e) { - log.warn("Could not download/save reference sequence for md5 " + record.getMd5(), e); + } catch (Exception e) { + log.warn("Could not download/save reference sequence for md5 " + md5, e); } } - return EMPTY_BASES; + return null; } /** * Download reference sequence from EBI by MD5 and store it in the reference folder. * @param md5 * @return bytes of the reference sequence, null if not found. - * @throws IOException + * @throws IOException if the sequence is not found or the download fails. */ - private byte[] downloadFromEBI(String md5) throws IOException { - log.info("Downloading reference {} from ENA", md5); - URL url = new URL(String.format(Defaults.EBI_REFERENCE_SERVICE_URL_MASK, md5)); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setConnectTimeout(15000); - conn.setReadTimeout(30000); - conn.setRequestMethod("GET"); - - if (conn.getResponseCode() != 200) { - log.warn("ENA returned {} for {}", conn.getResponseCode(), md5); - return EMPTY_BASES; - } - - byte[] bases; - try (BufferedInputStream in = new BufferedInputStream(conn.getInputStream())) { - bases = in.readAllBytes(); + private byte[] downloadFromEBI(final String md5) throws IOException { + final String url = String.format(Locale.US, Defaults.EBI_REFERENCE_SERVICE_URL_MASK, md5); + + for (int i = 0; i < DOWNLOAD_TRIES_BEFORE_FAILING; i++) { + try (final InputStream is = new URL(url).openStream()) { + if (is == null) + return null; + + log.info("Downloading reference sequence: {}", url); + final byte[] bases = InputStreamUtils.readFully(is); + log.info("Downloaded {} bytes for md5 {}", bases.length, md5); + + final String downloadedMD5 = SequenceUtil.calculateMD5String(bases); + if (md5.equals(downloadedMD5)) { + return bases; + } else { + log.error("Downloaded sequence is corrupt: requested md5={}, received md5={}", + md5, downloadedMD5); + } + return bases; + } + catch (final IOException e) { + log.warn("Failed to download reference sequence for md5 {} on try {}/{}", + md5, (i + 1), DOWNLOAD_TRIES_BEFORE_FAILING, e); + } } - if (bases.length == 0) return EMPTY_BASES; - - return bases; + throw new IOException("Giving up on downloading sequence for md5 %s".formatted(md5)); } private void saveRefbasesToDisk(String md5, byte[] bases) throws IOException { diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/FolderReferenceSource.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/FolderReferenceSource.java index 127b3ebd..016b2715 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/FolderReferenceSource.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/FolderReferenceSource.java @@ -77,7 +77,7 @@ protected byte[] loadReference(final SAMSequenceRecord record) { return rsFile.getSequence(referencePath.contig()).getBases(); } - return EMPTY_BASES; + return null; } private void scanReferenceFolder() { diff --git a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/MD5CachedReferenceSource.java b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/MD5CachedReferenceSource.java index ccd6c618..fc8c69a6 100644 --- a/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/MD5CachedReferenceSource.java +++ b/model/src/main/java/org/gorpipe/gor/driver/providers/stream/datatypes/cram/reference/MD5CachedReferenceSource.java @@ -15,14 +15,11 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public abstract class MD5CachedReferenceSource implements CRAMReferenceSource, Closeable { private static final Logger log = LoggerFactory.getLogger(MD5CachedReferenceSource.class); - public static final byte[] EMPTY_BASES = new byte[0]; - private static final Cache md5BasesCache = createMd5BasesCache(); private static Cache createMd5BasesCache () { @@ -46,13 +43,26 @@ public synchronized byte[] getReferenceBases(final SAMSequenceRecord record, throw new GorDataException("Can not load reference bases as SAMSequenceRecord does not contain MD5"); } - try { - var bases = md5BasesCache.get(md5, () -> loadReference(record)); - StringUtil.toUpperCase(bases); - return bases != EMPTY_BASES ? bases : null; - }catch (ExecutionException e) { - throw new GorDataException("Failed to load CRAM reference: " + md5, e); + byte[] bases = md5BasesCache.getIfPresent(md5); + + if (bases == null) { + // Syncrhonize on md5 string to avoid multiple threads loading the same reference at the same time. + synchronized (md5) { + // Double check if another thread has loaded the reference while we were waiting for the lock. + bases = md5BasesCache.getIfPresent(md5); + if (bases == null) { + log.debug("Loading reference for md5 {}", md5); + bases = loadReference(record); + if (bases != null) { + // Normalize to upper case (that is what HTSJDK impl does). + StringUtil.toUpperCase(bases); + md5BasesCache.put(md5, bases); + } + } + } } + + return bases; } @Override @@ -77,7 +87,7 @@ public byte[] getReferenceBasesByRegion( /** * Load reference by MD5 from disk, downloading from EBI ENA if needed. * @param record SAM record with the sequence detail (must include MD5). - * @return the bases, or EMPTY_BASES if no bases where found on disk and on EBI. + * @return the bases, or null if no bases where found on disk and on EBI. */ protected abstract byte[] loadReference(SAMSequenceRecord record);