Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ subgrounds.egg-info/

# apple
.DS_Store
.venv*/
Binary file added data/curve_swaps.parquet
Binary file not shown.
124 changes: 81 additions & 43 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ dash = { version = "^2.3.1", optional = true }
plotly = { version = "^5.14.1", optional = true }
httpx = { extras = ["http2"], version = "^0.24.1" }
pytest-asyncio = "^0.21.0"
polars = "^0.19.12"
pyarrow = { version = "^13.0.0", optional = true }

[tool.poetry.extras]
dash = ["dash"]
plotly = ["plotly"]
polars = ["polars", "pyarrow"]
all = ["dash", "plotly"]

# https://python-poetry.org/docs/managing-dependencies/#dependency-groups
Expand Down
3 changes: 3 additions & 0 deletions subgrounds/contrib/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .client import PolarsSubgrounds

__all__ = ["PolarsSubgrounds"]
167 changes: 167 additions & 0 deletions subgrounds/contrib/polars/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from functools import cached_property
from json import JSONDecodeError
from typing import Any, Type

import httpx
import polars as pl
from pipe import map, traverse

from subgrounds.client import SubgroundsBase
from subgrounds.contrib.polars.utils import df_of_json
from subgrounds.errors import GraphQLError, ServerError
from subgrounds.pagination import LegacyStrategy, PaginationStrategy
from subgrounds.query import DataRequest, DataResponse, DocumentResponse
from subgrounds.subgraph import FieldPath, Subgraph
from subgrounds.utils import default_header

HTTP2_SUPPORT = True


class PolarsSubgrounds(SubgroundsBase):
"""TODO: Write comment"""

@cached_property
def _client(self):
"""Cached client"""

return httpx.Client(http2=HTTP2_SUPPORT, timeout=self.timeout)

def load(
self,
url: str,
save_schema: bool = False,
is_subgraph: bool = True,
) -> Subgraph:
try:
loader = self._load(url, save_schema, is_subgraph)
url, query = next(loader) # if this fails, schema is loaded from cache
data = self._fetch(url, {"query": query})
loader.send(data)

except StopIteration as e:
return e.value

assert False

def load_subgraph(
self, url: str, save_schema: bool = False, cache_dir: str | None = None
) -> Subgraph:
"""Performs introspection on the provided GraphQL API ``url`` to get the
schema, stores the schema if ``save_schema`` is ``True`` and returns a
generated class representing the subgraph with all its entities.

Args:
url The url of the API.
save_schema: Flag indicating whether or not the schema should be cached to
disk.

Returns:
Subgraph: A generated class representing the subgraph and its entities
"""

return self.load(url, save_schema, cache_dir, True)

def _fetch(self, url: str, blob: dict[str, Any]) -> dict[str, Any]:
resp = self._client.post(
url, json=blob, headers=default_header(url) | self.headers
)
resp.raise_for_status()

try:
raw_data = resp.json()

except JSONDecodeError:
raise ServerError(
f"Server ({url}) did not respond with proper JSON"
f"\nDid you query a proper GraphQL endpoint?"
f"\n\n{resp.content}"
)

if (data := raw_data.get("data")) is None:
raise GraphQLError(raw_data.get("errors", "Unknown Error(s) Found"))

return data

def execute(
self,
req: DataRequest,
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy,
) -> DataResponse:
"""Executes a :class:`DataRequest` and returns a :class:`DataResponse`.

Args:
req: The :class:`DataRequest` object to be executed.
pagination_strategy: A Class implementing the :class:`PaginationStrategy`
``Protocol``. If ``None``, then automatic pagination is disabled.
Defaults to :class:`LegacyStrategy`.

Returns:
A :class:`DataResponse` object representing the response
"""

try:
executor = self._execute(req, pagination_strategy)

doc = next(executor)
while True:
data = self._fetch(
doc.url, {"query": doc.graphql, "variables": doc.variables}
)
doc = executor.send(DocumentResponse(url=doc.url, data=data))

except StopIteration as e:
return e.value

def query_json(
self,
fpaths: FieldPath | list[FieldPath],
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy,
) -> list[dict[str, Any]]:
"""Equivalent to
``Subgrounds.execute(Subgrounds.mk_request(fpaths), pagination_strategy)``.

Args:
fpaths: One or more :class:`FieldPath` objects
that should be included in the request.
pagination_strategy: A class implementing the :class:`PaginationStrategy`
``Protocol``. If ``None``, then automatic pagination is disabled.
Defaults to :class:`LegacyStrategy`.

Returns:
The reponse data
"""

fpaths = list([fpaths] | traverse | map(FieldPath._auto_select) | traverse)
req = self.mk_request(fpaths)
data = self.execute(req, pagination_strategy)
return [doc.data for doc in data.responses]

def query_df(
self,
fpaths: FieldPath | list[FieldPath],
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy,
) -> pl.DataFrame:
"""
Queries and converts raw GraphQL data to a Polars DataFrame.

Args:
fpaths: One or more FieldPath objects that
should be included in the request.
pagination_strategy: A class implementing the :class:`PaginationStrategy`
``Protocol``. If ``None``, then automatic pagination is disabled.
Defaults to :class:`LegacyStrategy`.
parquet_name: The name of the parquet file to write to.

Returns:
pl.DataFrame: A Polars DataFrame containing the queried data.
"""

# Query raw GraphQL data
fpaths = list([fpaths] | traverse | map(FieldPath._auto_select) | traverse)

# TODO: check fpaths for only one entity

json_data = self.query_json(fpaths, pagination_strategy=pagination_strategy)
data = json_data[0]

return df_of_json(data)
Loading