Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 156 additions & 22 deletions datashield_opal/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from argparse import Namespace
from contextlib import suppress
from obiba_opal.core import OpalClient, UriBuilder, OpalRequest, OpalResponse, HTTPError
from datashield.interface import DSLoginInfo, DSDriver, DSConnection, DSResult, DSError
from datashield.interface import DSLoginInfo, DSDriver, DSConnection, DSResult, DSError, RSession


class OpalDSError(DSError):
Expand All @@ -23,15 +23,132 @@ def is_server_error(self) -> bool:
return isinstance(self.exception, HTTPError) and self.exception.code >= 500


class OpalRSession(RSession):
def __init__(self, client: OpalClient, profile: str = None, restore: str = None, verbose: bool = False):
self.client = client
self.profile = profile
self.restore = restore
self.verbose = verbose
self.id = None

def get_id(self) -> str:
if self.id is None:
self.start(False)
return self.id

def start(self, asynchronous: bool = True) -> None:
builder = UriBuilder(["datashield", "sessions"]).query("wait", not asynchronous)
if self.profile is not None:
builder.query("profile", self.profile)
if self.restore is not None:
builder.query("restore", self.restore)
response = self._post(builder.build()).send()
if response.code != 201:
raise OpalDSError(ValueError(f"Failed to start R session: {response.code}"))
session = response.from_json()
if "id" not in session:
raise OpalDSError(ValueError("Failed to start R session: no session id returned"))
self.id = session["id"]

def is_started(self) -> bool:
return self.id is not None

def is_ready(self) -> bool:
if self.id is None:
raise OpalDSError(ValueError("R session not started"))
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
if response.code != 200:
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
session = response.from_json()
return session.get("state", "").lower() == "running"

def is_pending(self) -> bool:
if self.id is None:
raise OpalDSError(ValueError("R session not started"))
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
if response.code != 200:
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
session = response.from_json()
return session.get("state", "").lower() == "pending"

def is_failed(self) -> bool:
if self.id is None:
raise OpalDSError(ValueError("R session not started"))
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
if response.code != 200:
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
session = response.from_json()
return session.get("state", "").lower() == "failed"

def is_terminated(self) -> bool:
if self.id is None:
raise OpalDSError(ValueError("R session not started"))
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
if response.code != 200:
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
session = response.from_json()
return session.get("state", "").lower() == "terminated"

def get_events(self) -> list:
if self.id is None:
raise OpalDSError(ValueError("R session not started"))
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
if response.code != 200:
raise OpalDSError(ValueError(f"Failed to retrieve R session events: {response.code}"))
session = response.from_json()
events = [evt.split(";") for evt in session.get("events", [])]
return events

def get_last_message(self) -> str:
events = self.get_events()
if events and len(events) > 0:
last_event = events[-1]
return last_event[2] if len(last_event) > 2 else "No message"
return "No recent events"

def close(self) -> None:
if self.id is not None:
builder = UriBuilder(["datashield", "session", self.id])
self._delete(builder.build()).send()
self.id = None

def _post(self, ws: str) -> OpalRequest:
request = self.client.new_request()
if self.verbose:
request.verbose()
return request.accept_json().post().resource(ws)

def _get(self, ws: str) -> OpalRequest:
request = self.client.new_request()
if self.verbose:
request.verbose()
return request.accept_json().get().resource(ws)

def _delete(self, ws: str) -> OpalRequest:
request = self.client.new_request()
if self.verbose:
request.verbose()
return request.accept_json().delete().resource(ws)


class OpalConnection(DSConnection):
def __init__(self, name: str, loginInfo: OpalClient.LoginInfo, profile: str = "default", restore: str = None):
self.name = name
self.client = OpalClient.build(loginInfo)
self.subject = None
self.profile = profile
self.restore = restore
self.session = None
self.verbose = False
self.rsession = None
self.rsession_started = False

def check_user(self) -> bool:
"""Check if the user can authenticate by trying to retrieve the current subject profile."""
try:
self._get("/system/subject-profile/_current").fail_on_error().send()
return True
except Exception:
return False

#
# Content listing
Expand Down Expand Up @@ -68,6 +185,34 @@ def has_resource(self, name: str) -> bool:
response = self._get(UriBuilder(["project", parts[0], "resource", parts[1]]).build()).send()
return response.code == 200

#
# R Session (server side)
#

def has_session(self) -> bool:
return self.rsession is not None

def start_session(self, asynchronous: bool = True) -> RSession:
if self.rsession is not None:
return self.rsession
self.rsession = OpalRSession(self.client, profile=self.profile, restore=self.restore, verbose=self.verbose)
self.rsession.start(asynchronous=asynchronous)
self.rsession_started = not asynchronous or not self.rsession.is_pending()
return self.rsession

def is_session_started(self) -> bool:
if self.rsession is None:
return False
if self.rsession_started:
return True
self.rsession_started = not self.rsession.is_pending()
return self.rsession_started

def get_session(self) -> RSession:
if self.rsession is None:
raise OpalDSError(ValueError("No R session established. Please start a session first."))
return self.rsession

#
# Assign
#
Expand Down Expand Up @@ -249,10 +394,8 @@ def disconnect(self) -> None:
"""
Close DataSHIELD session, and then Opal session.
"""
if self.session is not None:
builder = UriBuilder(["datashield", "session", self._get_session_id()])
self._delete(builder.build()).send()
self.session = None
if self.rsession is not None:
self.rsession.close()
self.client.close()

#
Expand All @@ -267,21 +410,8 @@ def _get_subject(self):
return self.subject

def _get_session_id(self) -> str:
return self._get_session()["id"]

def _get_session(self):
if self.session is None:
builder = UriBuilder(["datashield", "sessions"])
if self.profile is not None:
builder.query("profile", self.profile)
if self.restore is not None:
builder.query("restore", self.restore)
response = self._post(builder.build()).send()
if response.code == 201:
self.session = response.from_json()
else:
raise OpalDSError(ValueError(f"DataSHIELD session creation failed: {response.code}"))
return self.session
self.start_session(asynchronous=False)
return self.rsession.get_id()

def _get(self, ws) -> OpalRequest:
request = self.client.new_request()
Expand Down Expand Up @@ -313,7 +443,11 @@ class OpalDriver(DSDriver):
def new_connection(cls, args: DSLoginInfo, restore: str = None) -> DSConnection:
namedArgs = Namespace(opal=args.url, user=args.user, password=args.password, token=args.token)
loginInfo = OpalClient.LoginInfo.parse(namedArgs)
return OpalConnection(args.name, loginInfo, args.profile, restore)
conn = OpalConnection(args.name, loginInfo, args.profile, restore)
if not conn.check_user():
creds = f"user {args.user}" if args.user else "token"
raise OpalDSError(ValueError(f"Failed to authenticate on {args.url} with {creds}"))
return conn


class OpalResult(DSResult):
Expand Down
Loading
Loading