From 596fd83e1d6ce20b66f3ad4069a91b5cd34dec28 Mon Sep 17 00:00:00 2001 From: Alexander Goldstein Date: Tue, 17 Feb 2026 14:26:23 -0500 Subject: [PATCH 1/4] Handle bookmarks OAuth2-only auth errors --- LLMs.md | 14 +++++++------ README.md | 9 ++++++-- src/x_cli/api.py | 40 ++++++++++++++++++++++++++++++++--- tests/test_api.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 11 deletions(-) create mode 100644 tests/test_api.py diff --git a/LLMs.md b/LLMs.md index 53a06c2..9196949 100644 --- a/LLMs.md +++ b/LLMs.md @@ -44,8 +44,9 @@ Global flags: `-j`/`--json`, `-p`/`--plain`, `-md`/`--markdown` control output m `XApiClient` wraps all Twitter API v2 endpoints. Key patterns: - **Read-only endpoints** (get_tweet, search, get_user, get_timeline, get_followers, get_following) use Bearer token auth via `_bearer_get()` or direct `httpx` calls with Bearer header. -- **Write endpoints** (post_tweet, delete_tweet, like, retweet, bookmark) use OAuth 1.0a via `_oauth_request()`. -- **Authenticated read endpoints** (get_mentions, get_bookmarks) use OAuth 1.0a because they access the authenticated user's data. +- **Write endpoints** (post_tweet, delete_tweet, like, retweet) use OAuth 1.0a via `_oauth_request()`. +- **Authenticated read endpoints** like `get_mentions` use OAuth 1.0a. +- **Bookmarks endpoints** currently fail with 403 because X requires OAuth 2.0 User Context for bookmarks, while this CLI only supports OAuth 1.0a user auth + app bearer token. - `get_authenticated_user_id()` resolves and caches the current user's numeric ID (needed for like/retweet/bookmark/mentions endpoints). All methods return raw `dict` parsed from the API JSON response. Error handling is in `_handle()` -- raises `RuntimeError` on non-2xx or rate limit responses. @@ -105,9 +106,9 @@ Note: `timeline`, `followers`, `following` resolve username to numeric ID automa | Command | Args | Flags | API method | |---------|------|-------|------------| | `mentions` | | `--max N` | `get_mentions()` | -| `bookmarks` | | `--max N` | `get_bookmarks()` | -| `bookmark` | `ID_OR_URL` | | `bookmark_tweet()` | -| `unbookmark` | `ID_OR_URL` | | `unbookmark_tweet()` | +| `bookmarks` | | `--max N` | `get_bookmarks()` (currently blocked by X OAuth2-only requirement) | +| `bookmark` | `ID_OR_URL` | | `bookmark_tweet()` (currently blocked by X OAuth2-only requirement) | +| `unbookmark` | `ID_OR_URL` | | `unbookmark_tweet()` (currently blocked by X OAuth2-only requirement) | ### Top-level commands @@ -139,7 +140,7 @@ The Twitter API v2 requires numeric user IDs for timeline/followers/following en uv run pytest tests/ -v ``` -Tests cover utils (tweet ID parsing), formatters (JSON/TSV output), and auth (OAuth header generation). No live API calls in tests. +Tests cover utils (tweet ID parsing), formatters (JSON/TSV output), auth (OAuth header generation), and API error handling (including bookmarks OAuth2-required messaging). No live API calls in tests. --- @@ -148,6 +149,7 @@ Tests cover utils (tweet ID parsing), formatters (JSON/TSV output), and auth (OA | Error | Cause | Fix | |-------|-------|-----| | 403 "oauth1-permissions" | Access Token is Read-only | Enable "Read and write" in app settings, regenerate Access Token | +| 403 "Unsupported Authentication" on bookmarks | Bookmarks endpoints are OAuth2 user-context only | Not fixable with current auth model; implement OAuth2 user-context flow | | 401 Unauthorized | Bad credentials | Verify all 5 values in `.env` | | 429 Rate Limited | Too many requests | Error includes reset timestamp | | "Missing env var" | `.env` not found or incomplete | Check `~/.config/x-cli/.env` or set env vars directly | diff --git a/README.md b/README.md index 39aced4..6d257bd 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # x-cli -A CLI for X/Twitter that talks directly to the API v2. Post tweets, search, read timelines, manage bookmarks -- all from your terminal. +A CLI for X/Twitter that talks directly to the API v2. Post tweets, search, read timelines, and manage engagement from your terminal. Uses the same auth credentials as [x-mcp](https://github.com/INFATOSHI/x-mcp). If you already have x-mcp set up, x-cli works with zero additional config. @@ -16,7 +16,7 @@ Uses the same auth credentials as [x-mcp](https://github.com/INFATOSHI/x-mcp). I | **Read** | `tweet get`, `tweet search`, `user timeline`, `me mentions` | `x-cli tweet search "from:elonmusk"` | | **Users** | `user get`, `user followers`, `user following` | `x-cli user get openai` | | **Engage** | `like`, `retweet` | `x-cli like ` | -| **Bookmarks** | `me bookmarks`, `me bookmark`, `me unbookmark` | `x-cli me bookmarks --max 20` | +| **Bookmarks** | `me bookmarks`, `me bookmark`, `me unbookmark` | `Currently blocked by X auth requirements (OAuth2 user context)` | | **Analytics** | `tweet metrics` | `x-cli tweet metrics ` | Accepts tweet URLs or IDs interchangeably -- paste `https://x.com/user/status/123` or just `123`. @@ -105,6 +105,8 @@ x-cli me bookmark x-cli me unbookmark ``` +Note: X currently requires OAuth 2.0 User Context for bookmarks endpoints. This CLI's auth model is OAuth 1.0a + app bearer token, so bookmark commands are not available yet. + ### Quick actions ```bash @@ -146,6 +148,9 @@ Your Access Token was generated before you enabled write permissions. Go to the ### 401 Unauthorized Double-check all 5 credentials in your `.env`. No extra spaces or newlines. +### 403 "Unsupported Authentication" on bookmarks +X's bookmarks endpoints require OAuth 2.0 User Context. x-cli currently uses OAuth 1.0a user tokens plus app bearer token, so `me bookmarks`, `me bookmark`, and `me unbookmark` cannot be used until OAuth2 user-context auth is added. + ### 429 Rate Limited The error includes the reset timestamp. Wait until then. diff --git a/src/x_cli/api.py b/src/x_cli/api.py index 62282d9..bb37596 100644 --- a/src/x_cli/api.py +++ b/src/x_cli/api.py @@ -40,11 +40,45 @@ def _handle(self, resp: httpx.Response) -> dict[str, Any]: raise RuntimeError(f"Rate limited. Resets at {reset}.") data = resp.json() if not resp.is_success: - errors = data.get("errors", []) - msg = "; ".join(e.get("detail") or e.get("message", "") for e in errors) or resp.text[:500] + if self._is_bookmarks_oauth2_only_error(resp, data): + raise RuntimeError( + "Bookmarks endpoints require OAuth 2.0 User Context. " + "x-cli currently authenticates with OAuth 1.0a user tokens, so " + "`me bookmarks`, `me bookmark`, and `me unbookmark` are not supported yet." + ) + msg = self._extract_error_message(resp, data) raise RuntimeError(f"API error (HTTP {resp.status_code}): {msg}") return data + @staticmethod + def _extract_error_message(resp: httpx.Response, data: dict[str, Any]) -> str: + errors = data.get("errors", []) + if isinstance(errors, list): + details = [e.get("detail") or e.get("message", "") for e in errors if isinstance(e, dict)] + details = [d for d in details if d] + if details: + return "; ".join(details) + detail = data.get("detail") + if detail: + return str(detail) + title = data.get("title") + if title: + return str(title) + return resp.text[:500] + + @staticmethod + def _is_bookmarks_oauth2_only_error(resp: httpx.Response, data: dict[str, Any]) -> bool: + if resp.status_code != 403: + return False + problem_type = str(data.get("type", "")) + detail = str(data.get("detail", "")) + path = resp.request.url.path + return ( + "/bookmarks" in path + and "unsupported-authentication" in problem_type + and "OAuth 2.0 User Context" in detail + ) + def get_authenticated_user_id(self) -> str: if self._user_id: return self._user_id @@ -173,7 +207,7 @@ def retweet(self, tweet_id: str) -> dict[str, Any]: user_id = self.get_authenticated_user_id() return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/retweets", {"tweet_id": tweet_id}) - # ---- bookmarks (OAuth 1.0a -- basic tier may not support these) ---- + # ---- bookmarks (X currently requires OAuth 2.0 User Context) ---- def get_bookmarks(self, max_results: int = 10) -> dict[str, Any]: user_id = self.get_authenticated_user_id() diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..76cc583 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,53 @@ +"""Tests for x_cli.api error handling.""" + +import httpx +import pytest + +from x_cli.api import XApiClient +from x_cli.auth import Credentials + + +@pytest.fixture +def client(): + creds = Credentials( + api_key="test_key", + api_secret="test_secret", + access_token="test_token", + access_token_secret="test_token_secret", + bearer_token="test_bearer", + ) + c = XApiClient(creds) + yield c + c.close() + + +def test_bookmarks_requires_oauth2_user_context_message(client): + req = httpx.Request("GET", "https://api.x.com/2/users/123/bookmarks") + resp = httpx.Response( + 403, + request=req, + json={ + "title": "Unsupported Authentication", + "detail": ( + "Authenticating with OAuth 1.0a User Context is forbidden for this endpoint. " + "Supported authentication types are [OAuth 2.0 User Context]." + ), + "type": "https://api.twitter.com/2/problems/unsupported-authentication", + "status": 403, + }, + ) + + with pytest.raises(RuntimeError, match="Bookmarks endpoints require OAuth 2.0 User Context"): + client._handle(resp) + + +def test_non_bookmark_error_uses_api_detail(client): + req = httpx.Request("GET", "https://api.x.com/2/users/me") + resp = httpx.Response( + 401, + request=req, + json={"title": "Unauthorized", "detail": "Could not authenticate you"}, + ) + + with pytest.raises(RuntimeError, match="API error \\(HTTP 401\\): Could not authenticate you"): + client._handle(resp) From a0dc5f4ae1fa755c7768bdecc41a2a00f600be24 Mon Sep 17 00:00:00 2001 From: Alexander Goldstein Date: Tue, 17 Feb 2026 20:00:46 -0500 Subject: [PATCH 2/4] Add OAuth2 login flow to support bookmarks user-context endpoints --- .gitignore | 2 + LLMs.md | 62 +++++++++---- README.md | 59 ++++++++++-- src/x_cli/api.py | 111 +++++++++++++++++----- src/x_cli/auth.py | 39 ++++++-- src/x_cli/cli.py | 119 +++++++++++++++++++++++- src/x_cli/oauth2.py | 205 +++++++++++++++++++++++++++++++++++++++++ tests/test_api.py | 121 ++++++++++++++++++------ tests/test_cli_auth.py | 107 +++++++++++++++++++++ tests/test_oauth2.py | 138 +++++++++++++++++++++++++++ 10 files changed, 881 insertions(+), 82 deletions(-) create mode 100644 src/x_cli/oauth2.py create mode 100644 tests/test_cli_auth.py create mode 100644 tests/test_oauth2.py diff --git a/.gitignore b/.gitignore index 1e079ba..3ba3460 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ __pycache__/ dist/ build/ .pytest_cache/ +AGENTS.md +tasks/ diff --git a/LLMs.md b/LLMs.md index 9196949..661c3cb 100644 --- a/LLMs.md +++ b/LLMs.md @@ -6,7 +6,12 @@ You are an AI agent working with the x-cli codebase. This file tells you where e ## What This Is -x-cli is a Python CLI that talks directly to the Twitter/X API v2. It uses OAuth 1.0a for write operations and Bearer token auth for read operations. No third-party auth libraries -- signing is done with stdlib `hmac`/`hashlib`. +x-cli is a Python CLI that talks directly to the Twitter/X API v2. It uses: +- OAuth 1.0a for user-context write/engagement endpoints (tweet post/delete, like, retweet, mentions). +- App Bearer token for public read endpoints. +- OAuth 2.0 User Context (PKCE) for bookmarks endpoints. + +No third-party auth frameworks are used; OAuth signing/challenge logic is implemented in project code. It shares the same credentials as x-mcp (the MCP server counterpart). If a user already has x-mcp configured, they can symlink its `.env` to `~/.config/x-cli/.env`. @@ -17,11 +22,15 @@ It shares the same credentials as x-mcp (the MCP server counterpart). If a user ``` src/x_cli/ cli.py -- Click command groups and entry point - api.py -- XApiClient: one method per Twitter API v2 endpoint - auth.py -- Credential loading and OAuth 1.0a HMAC-SHA1 signing + api.py -- XApiClient: endpoint methods + auth routing + auth.py -- Env loading + OAuth 1.0a HMAC-SHA1 signing + oauth2.py -- OAuth2 PKCE helpers, token exchange/refresh, token persistence formatters.py -- Human (rich), JSON, and TSV output modes utils.py -- Tweet ID parsing from URLs, username stripping tests/ + test_api.py + test_cli_auth.py + test_oauth2.py test_utils.py test_formatters.py test_auth.py @@ -33,7 +42,7 @@ tests/ ### `cli.py` -- Start here -The entry point. Defines Click command groups: `tweet`, `user`, `me`, plus top-level `like` and `retweet`. Every command follows the same pattern: parse args, call the API client, pass the response to a formatter. +The entry point. Defines Click command groups: `auth`, `tweet`, `user`, `me`, plus top-level `like` and `retweet`. Most commands follow the same pattern: parse args, call the API client, pass the response to a formatter. The `State` object holds the output mode (`human`/`json`/`plain`/`markdown`) and verbose flag, and lazily initializes the API client. It's passed via Click's context system (`@pass_state`). @@ -43,23 +52,32 @@ Global flags: `-j`/`--json`, `-p`/`--plain`, `-md`/`--markdown` control output m `XApiClient` wraps all Twitter API v2 endpoints. Key patterns: -- **Read-only endpoints** (get_tweet, search, get_user, get_timeline, get_followers, get_following) use Bearer token auth via `_bearer_get()` or direct `httpx` calls with Bearer header. -- **Write endpoints** (post_tweet, delete_tweet, like, retweet) use OAuth 1.0a via `_oauth_request()`. -- **Authenticated read endpoints** like `get_mentions` use OAuth 1.0a. -- **Bookmarks endpoints** currently fail with 403 because X requires OAuth 2.0 User Context for bookmarks, while this CLI only supports OAuth 1.0a user auth + app bearer token. -- `get_authenticated_user_id()` resolves and caches the current user's numeric ID (needed for like/retweet/bookmark/mentions endpoints). +- **Read-only endpoints** (get_tweet, search, get_user, get_timeline, get_followers, get_following) use app Bearer token auth. +- **OAuth 1.0a endpoints** (post_tweet, delete_tweet, like, retweet, mentions) use `_oauth_request()`. +- **OAuth 2.0 user-context endpoints** (bookmarks) use `_oauth2_user_request()`. +- `get_authenticated_user_id()` caches OAuth1 user id; `get_authenticated_user_id_oauth2()` caches OAuth2 user id. +- OAuth2 refresh is automatic using stored refresh token and expiry metadata. +- OAuth2 token exchange/refresh optionally uses `X_OAUTH2_CLIENT_SECRET` for app types that require client authentication. All methods return raw `dict` parsed from the API JSON response. Error handling is in `_handle()` -- raises `RuntimeError` on non-2xx or rate limit responses. -### `auth.py` -- OAuth signing +### `auth.py` -- Env + OAuth1 signing Two responsibilities: -1. **`load_credentials()`** -- Loads 5 env vars (`X_API_KEY`, `X_API_SECRET`, `X_ACCESS_TOKEN`, `X_ACCESS_TOKEN_SECRET`, `X_BEARER_TOKEN`) with `.env` fallback from `~/.config/x-cli/.env` and current directory. +1. **`load_credentials()`** -- Loads required OAuth1/app vars plus optional OAuth2 vars from `~/.config/x-cli/.env` and current directory `.env`. 2. **`generate_oauth_header()`** -- Builds an OAuth 1.0a `Authorization` header using HMAC-SHA1. Follows the standard OAuth signature base string construction: percent-encode params, sort, concatenate with `&`, sign with consumer secret + token secret. Query string parameters from the URL are included in the signature base string (required by OAuth spec). +### `oauth2.py` -- OAuth2 PKCE + token management + +- Generates PKCE verifier/challenge/state. +- Builds browser authorization URL. +- Parses redirected URL for code/state validation (user must paste full browser address-bar URL). +- Exchanges authorization code for tokens and refreshes access token. +- Persists/removes OAuth2 token env vars in `~/.config/x-cli/.env`. + ### `formatters.py` -- Output Four modes routed by `format_output(data, mode, title, verbose)`: @@ -106,9 +124,17 @@ Note: `timeline`, `followers`, `following` resolve username to numeric ID automa | Command | Args | Flags | API method | |---------|------|-------|------------| | `mentions` | | `--max N` | `get_mentions()` | -| `bookmarks` | | `--max N` | `get_bookmarks()` (currently blocked by X OAuth2-only requirement) | -| `bookmark` | `ID_OR_URL` | | `bookmark_tweet()` (currently blocked by X OAuth2-only requirement) | -| `unbookmark` | `ID_OR_URL` | | `unbookmark_tweet()` (currently blocked by X OAuth2-only requirement) | +| `bookmarks` | | `--max N` | `get_bookmarks()` (OAuth2 login required) | +| `bookmark` | `ID_OR_URL` | | `bookmark_tweet()` (OAuth2 login required) | +| `unbookmark` | `ID_OR_URL` | | `unbookmark_tweet()` (OAuth2 login required) | + +### Auth commands (`x-cli auth `) + +| Command | Purpose | +|---------|---------| +| `login` | Run OAuth2 PKCE browser flow; store tokens | +| `status` | Show OAuth2 token presence/expiry | +| `logout` | Remove stored OAuth2 tokens | ### Top-level commands @@ -140,7 +166,7 @@ The Twitter API v2 requires numeric user IDs for timeline/followers/following en uv run pytest tests/ -v ``` -Tests cover utils (tweet ID parsing), formatters (JSON/TSV output), auth (OAuth header generation), and API error handling (including bookmarks OAuth2-required messaging). No live API calls in tests. +Tests cover utils, formatters, OAuth1 signing, OAuth2 helpers, API auth routing, and auth CLI command behavior. No live API calls in tests. --- @@ -149,7 +175,11 @@ Tests cover utils (tweet ID parsing), formatters (JSON/TSV output), auth (OAuth | Error | Cause | Fix | |-------|-------|-----| | 403 "oauth1-permissions" | Access Token is Read-only | Enable "Read and write" in app settings, regenerate Access Token | -| 403 "Unsupported Authentication" on bookmarks | Bookmarks endpoints are OAuth2 user-context only | Not fixable with current auth model; implement OAuth2 user-context flow | +| "Missing OAuth2 user token for bookmarks" | OAuth2 login not completed | Set `X_OAUTH2_CLIENT_ID` and run `x-cli auth login` | +| X consent page says "Something went wrong" | Callback URL mismatch | Configure callback exactly as `https://example.com/oauth/callback` (or set matching `X_OAUTH2_REDIRECT_URI`) | +| "OAuth2 token request failed (HTTP 401): Missing valid authorization header" | App requires OAuth2 client authentication for token exchange | Set `X_OAUTH2_CLIENT_SECRET` and retry `x-cli auth login` | +| "X_OAUTH2_ACCESS_TOKEN is not a user-context token" | Token is OAuth2 app-only bearer token | Run `x-cli auth login` and use returned user-context token | +| 401 on bookmarks after login | OAuth2 token expired/revoked and refresh failed | Re-run `x-cli auth login` | | 401 Unauthorized | Bad credentials | Verify all 5 values in `.env` | | 429 Rate Limited | Too many requests | Error includes reset timestamp | | "Missing env var" | `.env` not found or incomplete | Check `~/.config/x-cli/.env` or set env vars directly | diff --git a/README.md b/README.md index 6d257bd..593ac7f 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Uses the same auth credentials as [x-mcp](https://github.com/INFATOSHI/x-mcp). I | **Read** | `tweet get`, `tweet search`, `user timeline`, `me mentions` | `x-cli tweet search "from:elonmusk"` | | **Users** | `user get`, `user followers`, `user following` | `x-cli user get openai` | | **Engage** | `like`, `retweet` | `x-cli like ` | -| **Bookmarks** | `me bookmarks`, `me bookmark`, `me unbookmark` | `Currently blocked by X auth requirements (OAuth2 user context)` | +| **Bookmarks** | `me bookmarks`, `me bookmark`, `me unbookmark` | `x-cli auth login && x-cli me bookmarks --max 20` | | **Analytics** | `tweet metrics` | `x-cli tweet metrics ` | Accepts tweet URLs or IDs interchangeably -- paste `https://x.com/user/status/123` or just `123`. @@ -39,7 +39,7 @@ uv tool install x-cli ## Auth -You need 5 credentials from the [X Developer Portal](https://developer.x.com/en/portal/dashboard). +You need OAuth1 credentials for general commands and OAuth2 client settings for bookmarks login. ### If you already use x-mcp @@ -57,8 +57,9 @@ ln -s /path/to/x-mcp/.env ~/.config/x-cli/.env 3. Save your **Consumer Key** (API Key), **Secret Key** (API Secret), and **Bearer Token** 4. Under **User authentication settings**, set permissions to **Read and write** 5. Generate (or regenerate) **Access Token** and **Access Token Secret** +6. In OAuth2 app settings, add callback URL: `https://example.com/oauth/callback` -Put all 5 values in `~/.config/x-cli/.env`: +Put these values in `~/.config/x-cli/.env`: ``` X_API_KEY=your_consumer_key @@ -66,10 +67,39 @@ X_API_SECRET=your_secret_key X_BEARER_TOKEN=your_bearer_token X_ACCESS_TOKEN=your_access_token X_ACCESS_TOKEN_SECRET=your_access_token_secret +X_OAUTH2_CLIENT_ID=your_oauth2_client_id +X_OAUTH2_CLIENT_SECRET=your_oauth2_client_secret # optional, required by some X app types +# Optional: override callback URL if your app uses a different one. +# Must exactly match your app callback setting. +# X_OAUTH2_REDIRECT_URI=https://example.com/oauth/callback ``` x-cli also checks for a `.env` in the current directory. +### OAuth2 login for bookmarks + +Bookmarks endpoints require OAuth 2.0 User Context. Run: + +```bash +x-cli auth login +``` + +`auth login` opens a PKCE browser flow: +1. It prints an authorize URL. +2. You approve access in the browser. +3. Copy the full redirected URL from your browser address bar and paste it into the CLI. +4. x-cli stores: + - `X_OAUTH2_ACCESS_TOKEN` + - `X_OAUTH2_REFRESH_TOKEN` + - `X_OAUTH2_EXPIRES_AT` + +You can check or clear saved OAuth2 tokens: + +```bash +x-cli auth status +x-cli auth logout +``` + --- ## Usage @@ -105,8 +135,6 @@ x-cli me bookmark x-cli me unbookmark ``` -Note: X currently requires OAuth 2.0 User Context for bookmarks endpoints. This CLI's auth model is OAuth 1.0a + app bearer token, so bookmark commands are not available yet. - ### Quick actions ```bash @@ -148,8 +176,25 @@ Your Access Token was generated before you enabled write permissions. Go to the ### 401 Unauthorized Double-check all 5 credentials in your `.env`. No extra spaces or newlines. -### 403 "Unsupported Authentication" on bookmarks -X's bookmarks endpoints require OAuth 2.0 User Context. x-cli currently uses OAuth 1.0a user tokens plus app bearer token, so `me bookmarks`, `me bookmark`, and `me unbookmark` cannot be used until OAuth2 user-context auth is added. +### Bookmarks fail with "Missing OAuth2 user token" +Run `x-cli auth login`. You must set `X_OAUTH2_CLIENT_ID` first. + +### Login page says "Something went wrong" +Most common cause is callback mismatch. Ensure your X app has callback URL exactly: +`https://example.com/oauth/callback` +If you set `X_OAUTH2_REDIRECT_URI`, it must exactly match the callback in X app settings. + +### Bookmarks fail saying token is not user-context +Your `X_OAUTH2_ACCESS_TOKEN` is likely an OAuth2 app-only token. Run `x-cli auth login` to mint a user-context token. + +### `auth login` fails with "Missing valid authorization header" +Set `X_OAUTH2_CLIENT_SECRET` in your env and retry `x-cli auth login`. Some X app configurations require client authentication at token exchange time. + +### What URL should I paste into the CLI prompt? +Paste the full redirected URL from your browser address bar (the one containing `code=` and `state=`), not just the page contents. + +### Bookmarks fail with 401 after login +Your refresh token may be expired/revoked. Run `x-cli auth login` again to refresh OAuth2 credentials. ### 429 Rate Limited The error includes the reset timestamp. Wait until then. diff --git a/src/x_cli/api.py b/src/x_cli/api.py index bb37596..eb116ce 100644 --- a/src/x_cli/api.py +++ b/src/x_cli/api.py @@ -6,7 +6,8 @@ import httpx -from .auth import Credentials, generate_oauth_header +from .auth import Credentials, generate_oauth_header, get_config_env_path +from .oauth2 import expires_at_from_expires_in, persist_oauth2_tokens, refresh_access_token, token_expired API_BASE = "https://api.x.com/2" @@ -15,6 +16,7 @@ class XApiClient: def __init__(self, creds: Credentials) -> None: self.creds = creds self._user_id: str | None = None + self._oauth2_user_id: str | None = None self._http = httpx.Client(timeout=30.0) def close(self) -> None: @@ -34,18 +36,41 @@ def _oauth_request(self, method: str, url: str, json_body: dict | None = None) - resp = self._http.request(method, url, headers=headers, json=json_body if json_body else None) return self._handle(resp) + def _oauth2_user_request( + self, + method: str, + url: str, + json_body: dict | None = None, + *, + retry_on_401: bool = True, + ) -> dict[str, Any]: + self._ensure_oauth2_access_token() + headers: dict[str, str] = {"Authorization": f"Bearer {self.creds.oauth2_access_token}"} + if json_body is not None: + headers["Content-Type"] = "application/json" + resp = self._http.request(method, url, headers=headers, json=json_body if json_body else None) + if resp.status_code == 401 and retry_on_401: + self._refresh_oauth2_access_token() + return self._oauth2_user_request(method, url, json_body, retry_on_401=False) + if resp.status_code == 403: + try: + payload = resp.json() + except ValueError: + payload = {} + detail = str(payload.get("detail", "")) + if "OAuth 2.0 Application-Only" in detail: + raise RuntimeError( + "Stored X_OAUTH2_ACCESS_TOKEN is not a user-context token. " + "Run `x-cli auth login` to obtain an OAuth2 User Context token for bookmarks." + ) + return self._handle(resp) + def _handle(self, resp: httpx.Response) -> dict[str, Any]: if resp.status_code == 429: reset = resp.headers.get("x-rate-limit-reset", "unknown") raise RuntimeError(f"Rate limited. Resets at {reset}.") data = resp.json() if not resp.is_success: - if self._is_bookmarks_oauth2_only_error(resp, data): - raise RuntimeError( - "Bookmarks endpoints require OAuth 2.0 User Context. " - "x-cli currently authenticates with OAuth 1.0a user tokens, so " - "`me bookmarks`, `me bookmark`, and `me unbookmark` are not supported yet." - ) msg = self._extract_error_message(resp, data) raise RuntimeError(f"API error (HTTP {resp.status_code}): {msg}") return data @@ -66,17 +91,46 @@ def _extract_error_message(resp: httpx.Response, data: dict[str, Any]) -> str: return str(title) return resp.text[:500] - @staticmethod - def _is_bookmarks_oauth2_only_error(resp: httpx.Response, data: dict[str, Any]) -> bool: - if resp.status_code != 403: - return False - problem_type = str(data.get("type", "")) - detail = str(data.get("detail", "")) - path = resp.request.url.path - return ( - "/bookmarks" in path - and "unsupported-authentication" in problem_type - and "OAuth 2.0 User Context" in detail + def _ensure_oauth2_access_token(self) -> None: + if not self.creds.oauth2_access_token: + raise RuntimeError( + "Missing OAuth2 user token for bookmarks. Run `x-cli auth login` to set " + "X_OAUTH2_ACCESS_TOKEN/X_OAUTH2_REFRESH_TOKEN." + ) + if token_expired(self.creds.oauth2_expires_at): + self._refresh_oauth2_access_token() + + def _refresh_oauth2_access_token(self) -> None: + if not self.creds.oauth2_client_id: + raise RuntimeError( + "Missing env var X_OAUTH2_CLIENT_ID. Set it first, then run `x-cli auth login`." + ) + if not self.creds.oauth2_refresh_token: + raise RuntimeError( + "OAuth2 access token expired and no refresh token is available. Run `x-cli auth login`." + ) + data = refresh_access_token( + self._http, + client_id=self.creds.oauth2_client_id, + client_secret=self.creds.oauth2_client_secret, + refresh_token=self.creds.oauth2_refresh_token, + ) + self._persist_oauth2_tokens_from_response(data) + + def _persist_oauth2_tokens_from_response(self, data: dict[str, Any]) -> None: + access_token = str(data["access_token"]) + refresh_token = data.get("refresh_token") or self.creds.oauth2_refresh_token + expires_at = expires_at_from_expires_in(data.get("expires_in")) + + self.creds.oauth2_access_token = access_token + self.creds.oauth2_refresh_token = str(refresh_token) if refresh_token else None + self.creds.oauth2_expires_at = expires_at + + persist_oauth2_tokens( + get_config_env_path(), + access_token=access_token, + refresh_token=self.creds.oauth2_refresh_token, + expires_at=expires_at, ) def get_authenticated_user_id(self) -> str: @@ -86,6 +140,13 @@ def get_authenticated_user_id(self) -> str: self._user_id = data["data"]["id"] return self._user_id + def get_authenticated_user_id_oauth2(self) -> str: + if self._oauth2_user_id: + return self._oauth2_user_id + data = self._oauth2_user_request("GET", f"{API_BASE}/users/me") + self._oauth2_user_id = data["data"]["id"] + return self._oauth2_user_id + # ---- tweets ---- def post_tweet( @@ -207,10 +268,10 @@ def retweet(self, tweet_id: str) -> dict[str, Any]: user_id = self.get_authenticated_user_id() return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/retweets", {"tweet_id": tweet_id}) - # ---- bookmarks (X currently requires OAuth 2.0 User Context) ---- + # ---- bookmarks (OAuth 2.0 User Context) ---- def get_bookmarks(self, max_results: int = 10) -> dict[str, Any]: - user_id = self.get_authenticated_user_id() + user_id = self.get_authenticated_user_id_oauth2() max_results = max(1, min(max_results, 100)) params = { "max_results": str(max_results), @@ -221,12 +282,12 @@ def get_bookmarks(self, max_results: int = 10) -> dict[str, Any]: } qs = "&".join(f"{k}={v}" for k, v in params.items()) url = f"{API_BASE}/users/{user_id}/bookmarks?{qs}" - return self._oauth_request("GET", url) + return self._oauth2_user_request("GET", url) def bookmark_tweet(self, tweet_id: str) -> dict[str, Any]: - user_id = self.get_authenticated_user_id() - return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/bookmarks", {"tweet_id": tweet_id}) + user_id = self.get_authenticated_user_id_oauth2() + return self._oauth2_user_request("POST", f"{API_BASE}/users/{user_id}/bookmarks", {"tweet_id": tweet_id}) def unbookmark_tweet(self, tweet_id: str) -> dict[str, Any]: - user_id = self.get_authenticated_user_id() - return self._oauth_request("DELETE", f"{API_BASE}/users/{user_id}/bookmarks/{tweet_id}") + user_id = self.get_authenticated_user_id_oauth2() + return self._oauth2_user_request("DELETE", f"{API_BASE}/users/{user_id}/bookmarks/{tweet_id}") diff --git a/src/x_cli/auth.py b/src/x_cli/auth.py index 722ee01..4016ed1 100644 --- a/src/x_cli/auth.py +++ b/src/x_cli/auth.py @@ -1,4 +1,4 @@ -"""Auth: env var loading and OAuth 1.0a header generation.""" +"""Auth: env var loading and OAuth header generation.""" from __future__ import annotations @@ -22,15 +22,28 @@ class Credentials: access_token: str access_token_secret: str bearer_token: str + oauth2_client_id: str | None = None + oauth2_client_secret: str | None = None + oauth2_access_token: str | None = None + oauth2_refresh_token: str | None = None + oauth2_expires_at: int | None = None -def load_credentials() -> Credentials: - """Load credentials from env vars, with .env fallback.""" - # Try ~/.config/x-cli/.env then cwd .env - config_env = Path.home() / ".config" / "x-cli" / ".env" +def get_config_env_path() -> Path: + return Path.home() / ".config" / "x-cli" / ".env" + + +def load_env_files() -> None: + """Load ~/.config/x-cli/.env and cwd .env into process env.""" + config_env = get_config_env_path() if config_env.exists(): load_dotenv(config_env) - load_dotenv() # cwd .env + load_dotenv() + + +def load_credentials() -> Credentials: + """Load credentials from env vars, with .env fallback.""" + load_env_files() def require(name: str) -> str: val = os.environ.get(name) @@ -41,12 +54,26 @@ def require(name: str) -> str: ) return val + def optional_int(name: str) -> int | None: + val = os.environ.get(name) + if not val: + return None + try: + return int(val) + except ValueError as exc: + raise SystemExit(f"Invalid env var: {name} must be an integer unix timestamp.") from exc + return Credentials( api_key=require("X_API_KEY"), api_secret=require("X_API_SECRET"), access_token=require("X_ACCESS_TOKEN"), access_token_secret=require("X_ACCESS_TOKEN_SECRET"), bearer_token=require("X_BEARER_TOKEN"), + oauth2_client_id=os.environ.get("X_OAUTH2_CLIENT_ID"), + oauth2_client_secret=os.environ.get("X_OAUTH2_CLIENT_SECRET"), + oauth2_access_token=os.environ.get("X_OAUTH2_ACCESS_TOKEN"), + oauth2_refresh_token=os.environ.get("X_OAUTH2_REFRESH_TOKEN"), + oauth2_expires_at=optional_int("X_OAUTH2_EXPIRES_AT"), ) diff --git a/src/x_cli/cli.py b/src/x_cli/cli.py index 7816e3e..c447ffc 100644 --- a/src/x_cli/cli.py +++ b/src/x_cli/cli.py @@ -2,12 +2,27 @@ from __future__ import annotations +import os +import time import click +import httpx from .api import XApiClient -from .auth import load_credentials +from .auth import get_config_env_path, load_credentials, load_env_files from .formatters import format_output +from .oauth2 import ( + DEFAULT_REDIRECT_URI, + build_authorization_url, + clear_oauth2_tokens, + exchange_code_for_token, + expires_at_from_expires_in, + extract_code_from_redirect_url, + generate_code_challenge, + generate_code_verifier, + generate_state, + persist_oauth2_tokens, +) from .utils import parse_tweet_id, strip_at @@ -43,6 +58,108 @@ def cli(ctx, fmt, verbose): ctx.obj = State(fmt or "human", verbose=verbose) +# ============================================================ +# auth +# ============================================================ + +@cli.group() +def auth(): + """OAuth2 authentication helpers.""" + + +@auth.command("login") +def auth_login(): + """Run OAuth2 PKCE login for bookmarks endpoints.""" + load_env_files() + client_id = os.environ.get("X_OAUTH2_CLIENT_ID") + if not client_id: + raise click.ClickException("Missing env var X_OAUTH2_CLIENT_ID.") + client_secret = os.environ.get("X_OAUTH2_CLIENT_SECRET") + + redirect_uri = os.environ.get("X_OAUTH2_REDIRECT_URI", DEFAULT_REDIRECT_URI) + code_verifier = generate_code_verifier() + state = generate_state() + auth_url = build_authorization_url( + client_id=client_id, + redirect_uri=redirect_uri, + state=state, + code_challenge=generate_code_challenge(code_verifier), + ) + + click.echo("Open this URL in your browser and approve access:") + click.echo(auth_url) + click.echo("") + click.echo(f"Use a callback URL configured in your X app (current: {redirect_uri}).") + click.echo("Recommended callback URL: https://example.com/oauth/callback") + redirected_url = click.prompt("Paste the full redirected URL from your browser address bar") + + try: + code = extract_code_from_redirect_url(redirected_url, state) + with httpx.Client(timeout=30.0) as http: + token_data = exchange_code_for_token( + http, + client_id=client_id, + client_secret=client_secret, + code=code, + code_verifier=code_verifier, + redirect_uri=redirect_uri, + ) + except RuntimeError as exc: + msg = str(exc) + if "Missing valid authorization header" in msg and not client_secret: + msg += " Set X_OAUTH2_CLIENT_SECRET and retry `x-cli auth login`." + raise click.ClickException(msg) from exc + + access_token = str(token_data["access_token"]) + refresh_token = token_data.get("refresh_token") + expires_at = expires_at_from_expires_in(token_data.get("expires_in")) + persist_oauth2_tokens( + get_config_env_path(), + access_token=access_token, + refresh_token=str(refresh_token) if refresh_token else None, + expires_at=expires_at, + ) + if expires_at: + ttl = max(0, expires_at - int(time.time())) + click.echo(f"OAuth2 login successful. Token expires in about {ttl // 60} minutes.") + else: + click.echo("OAuth2 login successful.") + + +@auth.command("logout") +def auth_logout(): + """Remove stored OAuth2 tokens.""" + clear_oauth2_tokens(get_config_env_path()) + click.echo("Removed OAuth2 tokens from ~/.config/x-cli/.env") + + +@auth.command("status") +def auth_status(): + """Show OAuth2 login status.""" + load_env_files() + access = os.environ.get("X_OAUTH2_ACCESS_TOKEN") + refresh = os.environ.get("X_OAUTH2_REFRESH_TOKEN") + expires_raw = os.environ.get("X_OAUTH2_EXPIRES_AT") + if not access: + click.echo("OAuth2: not logged in") + return + click.echo("OAuth2: logged in") + click.echo(f"Refresh token: {'present' if refresh else 'missing'}") + if not expires_raw: + click.echo("Access token expiry: unknown") + return + try: + expires_at = int(expires_raw) + except ValueError: + click.echo("Access token expiry: invalid value in X_OAUTH2_EXPIRES_AT") + return + remaining = expires_at - int(time.time()) + if remaining <= 0: + click.echo("Access token expiry: expired") + else: + click.echo(f"Access token expiry: in {remaining // 60} minutes") + + # ============================================================ # tweet # ============================================================ diff --git a/src/x_cli/oauth2.py b/src/x_cli/oauth2.py new file mode 100644 index 0000000..519aa7d --- /dev/null +++ b/src/x_cli/oauth2.py @@ -0,0 +1,205 @@ +"""OAuth2 PKCE helpers and token persistence for x-cli.""" + +from __future__ import annotations + +import base64 +import hashlib +import secrets +import time +import urllib.parse +from pathlib import Path +from typing import Any + +import httpx +from dotenv import set_key, unset_key + +AUTH_URL = "https://twitter.com/i/oauth2/authorize" +TOKEN_URL = "https://api.x.com/2/oauth2/token" +DEFAULT_REDIRECT_URI = "https://example.com/oauth/callback" +DEFAULT_SCOPES = ( + "tweet.read", + "users.read", + "bookmark.read", + "bookmark.write", + "offline.access", +) +OAUTH2_ENV_KEYS = ("X_OAUTH2_ACCESS_TOKEN", "X_OAUTH2_REFRESH_TOKEN", "X_OAUTH2_EXPIRES_AT") + + +def generate_code_verifier(length: int = 64) -> str: + """Generate a PKCE verifier (43-128 chars, URL-safe).""" + if length < 43 or length > 128: + raise ValueError("PKCE code verifier length must be between 43 and 128.") + raw = base64.urlsafe_b64encode(secrets.token_bytes(length)).decode().rstrip("=") + if len(raw) < length: + raw += "A" * (length - len(raw)) + return raw[:length] + + +def generate_code_challenge(code_verifier: str) -> str: + digest = hashlib.sha256(code_verifier.encode()).digest() + return base64.urlsafe_b64encode(digest).decode().rstrip("=") + + +def generate_state(length: int = 24) -> str: + return secrets.token_urlsafe(length) + + +def build_authorization_url( + *, + client_id: str, + redirect_uri: str, + state: str, + code_challenge: str, + scopes: tuple[str, ...] = DEFAULT_SCOPES, +) -> str: + params = { + "response_type": "code", + "client_id": client_id, + "redirect_uri": redirect_uri, + "scope": " ".join(scopes), + "state": state, + "code_challenge": code_challenge, + "code_challenge_method": "S256", + } + return f"{AUTH_URL}?{urllib.parse.urlencode(params)}" + + +def extract_code_from_redirect_url(redirect_url: str, expected_state: str) -> str: + parsed = urllib.parse.urlparse(redirect_url.strip()) + if not parsed.scheme or not parsed.netloc: + raise RuntimeError("Invalid redirect URL. Paste the full URL from your browser address bar.") + query = urllib.parse.parse_qs(parsed.query) + error = (query.get("error") or [None])[0] + if error: + description = (query.get("error_description") or [""])[0] + raise RuntimeError(f"OAuth2 authorization failed: {error} {description}".strip()) + code = (query.get("code") or [None])[0] + state = (query.get("state") or [None])[0] + if not code: + raise RuntimeError("Missing `code` in redirect URL.") + if state != expected_state: + raise RuntimeError("State mismatch in redirect URL. Abort and retry login.") + return code + + +def exchange_code_for_token( + http: httpx.Client, + *, + client_id: str, + client_secret: str | None, + code: str, + code_verifier: str, + redirect_uri: str, +) -> dict[str, Any]: + resp = http.post( + TOKEN_URL, + headers=_token_headers(client_id, client_secret), + data={ + "grant_type": "authorization_code", + "code": code, + "client_id": client_id, + "redirect_uri": redirect_uri, + "code_verifier": code_verifier, + }, + ) + return _parse_token_response(resp) + + +def refresh_access_token( + http: httpx.Client, + *, + client_id: str, + client_secret: str | None, + refresh_token: str, +) -> dict[str, Any]: + resp = http.post( + TOKEN_URL, + headers=_token_headers(client_id, client_secret), + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": client_id, + }, + ) + return _parse_token_response(resp) + + +def token_expired(expires_at: int | None, buffer_seconds: int = 120) -> bool: + if not expires_at: + return False + return int(time.time()) >= (expires_at - buffer_seconds) + + +def expires_at_from_expires_in(expires_in: Any) -> int | None: + if expires_in is None: + return None + try: + return int(time.time()) + int(expires_in) + except (TypeError, ValueError): + return None + + +def ensure_env_file(path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + if not path.exists(): + path.touch(mode=0o600) + else: + path.chmod(0o600) + + +def persist_oauth2_tokens( + path: Path, + *, + access_token: str, + refresh_token: str | None, + expires_at: int | None, +) -> None: + ensure_env_file(path) + set_key(str(path), "X_OAUTH2_ACCESS_TOKEN", access_token, quote_mode="never") + if refresh_token: + set_key(str(path), "X_OAUTH2_REFRESH_TOKEN", refresh_token, quote_mode="never") + else: + unset_key(str(path), "X_OAUTH2_REFRESH_TOKEN", quote_mode="never") + if expires_at is not None: + set_key(str(path), "X_OAUTH2_EXPIRES_AT", str(expires_at), quote_mode="never") + else: + unset_key(str(path), "X_OAUTH2_EXPIRES_AT", quote_mode="never") + + +def clear_oauth2_tokens(path: Path) -> None: + if not path.exists(): + return + for key in OAUTH2_ENV_KEYS: + unset_key(str(path), key, quote_mode="never") + + +def _parse_token_response(resp: httpx.Response) -> dict[str, Any]: + try: + payload = resp.json() + except ValueError: + payload = {} + if not resp.is_success: + msg = _extract_token_error(payload) or resp.text[:500] + raise RuntimeError(f"OAuth2 token request failed (HTTP {resp.status_code}): {msg}") + if "access_token" not in payload: + raise RuntimeError("OAuth2 token response missing `access_token`.") + return payload + + +def _token_headers(client_id: str, client_secret: str | None) -> dict[str, str]: + headers = {"Content-Type": "application/x-www-form-urlencoded"} + if client_secret: + token = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode() + headers["Authorization"] = f"Basic {token}" + return headers + + +def _extract_token_error(payload: dict[str, Any]) -> str: + if not isinstance(payload, dict): + return "" + for key in ("error_description", "error", "detail", "title"): + value = payload.get(key) + if value: + return str(value) + return "" diff --git a/tests/test_api.py b/tests/test_api.py index 76cc583..28a131c 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,4 +1,4 @@ -"""Tests for x_cli.api error handling.""" +"""Tests for x_cli.api auth routing and error handling.""" import httpx import pytest @@ -21,33 +21,100 @@ def client(): c.close() -def test_bookmarks_requires_oauth2_user_context_message(client): - req = httpx.Request("GET", "https://api.x.com/2/users/123/bookmarks") - resp = httpx.Response( - 403, - request=req, - json={ - "title": "Unsupported Authentication", - "detail": ( - "Authenticating with OAuth 1.0a User Context is forbidden for this endpoint. " - "Supported authentication types are [OAuth 2.0 User Context]." - ), - "type": "https://api.twitter.com/2/problems/unsupported-authentication", - "status": 403, - }, - ) +def _set_transport(client: XApiClient, handler) -> None: + client._http.close() + client._http = httpx.Client(transport=httpx.MockTransport(handler)) - with pytest.raises(RuntimeError, match="Bookmarks endpoints require OAuth 2.0 User Context"): - client._handle(resp) +def test_bookmarks_require_oauth2_login(client): + with pytest.raises(RuntimeError, match="Missing OAuth2 user token"): + client.get_bookmarks() -def test_non_bookmark_error_uses_api_detail(client): - req = httpx.Request("GET", "https://api.x.com/2/users/me") - resp = httpx.Response( - 401, - request=req, - json={"title": "Unauthorized", "detail": "Could not authenticate you"}, - ) - with pytest.raises(RuntimeError, match="API error \\(HTTP 401\\): Could not authenticate you"): - client._handle(resp) +def test_bookmarks_use_oauth2_bearer_token(client): + client.creds.oauth2_access_token = "oauth2_user_token" + + def handler(request: httpx.Request) -> httpx.Response: + assert request.headers["Authorization"] == "Bearer oauth2_user_token" + if request.url.path == "/2/users/me": + return httpx.Response(200, request=request, json={"data": {"id": "42"}}) + if request.url.path == "/2/users/42/bookmarks": + return httpx.Response(200, request=request, json={"data": []}) + return httpx.Response(404, request=request, json={"detail": "not found"}) + + _set_transport(client, handler) + data = client.get_bookmarks(max_results=10) + assert data["data"] == [] + + +def test_mentions_still_use_oauth1(client): + def handler(request: httpx.Request) -> httpx.Response: + assert request.headers["Authorization"].startswith("OAuth ") + if request.url.path == "/2/users/me": + return httpx.Response(200, request=request, json={"data": {"id": "100"}}) + if request.url.path == "/2/users/100/mentions": + return httpx.Response(200, request=request, json={"data": [{"id": "1"}]}) + return httpx.Response(404, request=request, json={"detail": "not found"}) + + _set_transport(client, handler) + data = client.get_mentions(max_results=5) + assert data["data"][0]["id"] == "1" + + +def test_oauth2_request_refreshes_on_401(client, monkeypatch): + client.creds.oauth2_client_id = "client-id" + client.creds.oauth2_access_token = "old-access" + client.creds.oauth2_refresh_token = "refresh-1" + + calls = {"users_me": 0, "refresh": 0, "persist": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/2/users/me": + calls["users_me"] += 1 + if calls["users_me"] == 1: + assert request.headers["Authorization"] == "Bearer old-access" + return httpx.Response(401, request=request, json={"detail": "expired"}) + assert request.headers["Authorization"] == "Bearer new-access" + return httpx.Response(200, request=request, json={"data": {"id": "77"}}) + return httpx.Response(404, request=request, json={"detail": "not found"}) + + def fake_refresh(http, *, client_id: str, client_secret: str | None, refresh_token: str): + calls["refresh"] += 1 + assert client_id == "client-id" + assert client_secret is None + assert refresh_token == "refresh-1" + return {"access_token": "new-access", "refresh_token": "refresh-2", "expires_in": 3600} + + def fake_persist(*args, **kwargs): + calls["persist"] += 1 + + monkeypatch.setattr("x_cli.api.refresh_access_token", fake_refresh) + monkeypatch.setattr("x_cli.api.persist_oauth2_tokens", fake_persist) + _set_transport(client, handler) + + user_id = client.get_authenticated_user_id_oauth2() + assert user_id == "77" + assert calls["refresh"] == 1 + assert calls["persist"] == 1 + assert client.creds.oauth2_access_token == "new-access" + assert client.creds.oauth2_refresh_token == "refresh-2" + + +def test_oauth2_app_only_token_shows_actionable_error(client): + client.creds.oauth2_access_token = "app-only-token" + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response( + 403, + request=request, + json={ + "detail": ( + "Authenticating with OAuth 2.0 Application-Only is forbidden for this endpoint. " + "Supported authentication types are [OAuth 1.0a User Context, OAuth 2.0 User Context]." + ), + }, + ) + + _set_transport(client, handler) + with pytest.raises(RuntimeError, match="not a user-context token"): + client.get_bookmarks() diff --git a/tests/test_cli_auth.py b/tests/test_cli_auth.py new file mode 100644 index 0000000..e7bbd08 --- /dev/null +++ b/tests/test_cli_auth.py @@ -0,0 +1,107 @@ +"""Tests for OAuth2 auth CLI commands.""" + +from pathlib import Path + +from click.testing import CliRunner + +from x_cli.cli import cli + + +def test_auth_status_not_logged_in(monkeypatch): + monkeypatch.setattr("x_cli.cli.load_env_files", lambda: None) + monkeypatch.delenv("X_OAUTH2_ACCESS_TOKEN", raising=False) + monkeypatch.delenv("X_OAUTH2_REFRESH_TOKEN", raising=False) + monkeypatch.delenv("X_OAUTH2_EXPIRES_AT", raising=False) + + runner = CliRunner() + result = runner.invoke(cli, ["auth", "status"]) + assert result.exit_code == 0 + assert "OAuth2: not logged in" in result.output + + +def test_auth_login_requires_client_id(monkeypatch): + monkeypatch.setattr("x_cli.cli.load_env_files", lambda: None) + monkeypatch.delenv("X_OAUTH2_CLIENT_ID", raising=False) + + runner = CliRunner() + result = runner.invoke(cli, ["auth", "login"]) + assert result.exit_code != 0 + assert "Missing env var X_OAUTH2_CLIENT_ID" in result.output + + +def test_auth_login_success(monkeypatch): + monkeypatch.setattr("x_cli.cli.load_env_files", lambda: None) + monkeypatch.setenv("X_OAUTH2_CLIENT_ID", "client-123") + monkeypatch.setattr("x_cli.cli.generate_code_verifier", lambda: "verifier-1") + monkeypatch.setattr("x_cli.cli.generate_state", lambda: "state-1") + monkeypatch.setattr("x_cli.cli.generate_code_challenge", lambda _: "challenge-1") + monkeypatch.setattr("x_cli.cli.build_authorization_url", lambda **kwargs: "https://auth.example") + monkeypatch.setattr("x_cli.cli.extract_code_from_redirect_url", lambda *_: "code-1") + captured = {} + + def fake_exchange(http, **kwargs): + captured.update(kwargs) + return {"access_token": "a1", "refresh_token": "r1", "expires_in": 3600} + + monkeypatch.setattr( + "x_cli.cli.exchange_code_for_token", + fake_exchange, + ) + + saved = {} + + def fake_persist(path, *, access_token, refresh_token, expires_at): + saved["path"] = path + saved["access_token"] = access_token + saved["refresh_token"] = refresh_token + saved["expires_at"] = expires_at + + monkeypatch.setattr("x_cli.cli.persist_oauth2_tokens", fake_persist) + monkeypatch.setattr("x_cli.cli.get_config_env_path", lambda: Path("/tmp/fake.env")) + + runner = CliRunner() + result = runner.invoke(cli, ["auth", "login"], input="https://example.com/oauth/callback?code=x&state=y\n") + assert result.exit_code == 0 + assert "OAuth2 login successful" in result.output + assert saved["path"] == Path("/tmp/fake.env") + assert saved["access_token"] == "a1" + assert saved["refresh_token"] == "r1" + assert isinstance(saved["expires_at"], int) + assert captured["client_secret"] is None + + +def test_auth_login_401_hints_client_secret(monkeypatch): + monkeypatch.setattr("x_cli.cli.load_env_files", lambda: None) + monkeypatch.setenv("X_OAUTH2_CLIENT_ID", "client-123") + monkeypatch.delenv("X_OAUTH2_CLIENT_SECRET", raising=False) + monkeypatch.setattr("x_cli.cli.generate_code_verifier", lambda: "verifier-1") + monkeypatch.setattr("x_cli.cli.generate_state", lambda: "state-1") + monkeypatch.setattr("x_cli.cli.generate_code_challenge", lambda _: "challenge-1") + monkeypatch.setattr("x_cli.cli.build_authorization_url", lambda **kwargs: "https://auth.example") + monkeypatch.setattr("x_cli.cli.extract_code_from_redirect_url", lambda *_: "code-1") + monkeypatch.setattr( + "x_cli.cli.exchange_code_for_token", + lambda http, **kwargs: (_ for _ in ()).throw( + RuntimeError("OAuth2 token request failed (HTTP 401): Missing valid authorization header") + ), + ) + + runner = CliRunner() + result = runner.invoke(cli, ["auth", "login"], input="https://example.com/oauth/callback?code=x&state=y\n") + assert result.exit_code != 0 + assert "Set X_OAUTH2_CLIENT_SECRET" in result.output + + +def test_auth_logout_clears_tokens(monkeypatch): + cleared = {"called": False} + + def fake_clear(path): + cleared["called"] = True + assert str(path).endswith("/.config/x-cli/.env") + + monkeypatch.setattr("x_cli.cli.clear_oauth2_tokens", fake_clear) + + runner = CliRunner() + result = runner.invoke(cli, ["auth", "logout"]) + assert result.exit_code == 0 + assert cleared["called"] is True diff --git a/tests/test_oauth2.py b/tests/test_oauth2.py new file mode 100644 index 0000000..2330a73 --- /dev/null +++ b/tests/test_oauth2.py @@ -0,0 +1,138 @@ +"""Tests for x_cli.oauth2 helpers.""" + +import base64 +import urllib.parse + +import httpx + +from pathlib import Path + +from x_cli.oauth2 import ( + build_authorization_url, + clear_oauth2_tokens, + exchange_code_for_token, + extract_code_from_redirect_url, + generate_code_challenge, + generate_code_verifier, + refresh_access_token, + persist_oauth2_tokens, +) + + +def test_generate_code_verifier_length(): + verifier = generate_code_verifier(64) + assert len(verifier) == 64 + + +def test_generate_code_challenge_known_value(): + # PKCE sample verifier from RFC 7636 Appendix B + verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk" + assert generate_code_challenge(verifier) == "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM" + + +def test_build_authorization_url_contains_required_params(): + url = build_authorization_url( + client_id="cid", + redirect_uri="https://example.com/oauth/callback", + state="state-123", + code_challenge="challenge-abc", + ) + assert "response_type=code" in url + assert "client_id=cid" in url + assert "state=state-123" in url + assert "code_challenge=challenge-abc" in url + + +def test_extract_code_from_redirect_url_valid(): + code = extract_code_from_redirect_url( + "https://example.com/oauth/callback?code=abc123&state=s1", + "s1", + ) + assert code == "abc123" + + +def test_extract_code_from_redirect_url_state_mismatch(): + try: + extract_code_from_redirect_url( + "https://example.com/oauth/callback?code=abc123&state=wrong", + "expected", + ) + assert False, "expected RuntimeError" + except RuntimeError as exc: + assert "State mismatch" in str(exc) + + +def test_persist_and_clear_oauth2_tokens(tmp_path: Path): + env_path = tmp_path / ".env" + persist_oauth2_tokens( + env_path, + access_token="access", + refresh_token="refresh", + expires_at=1234, + ) + text = env_path.read_text() + assert "X_OAUTH2_ACCESS_TOKEN=access" in text + assert "X_OAUTH2_REFRESH_TOKEN=refresh" in text + assert "X_OAUTH2_EXPIRES_AT=1234" in text + + clear_oauth2_tokens(env_path) + text2 = env_path.read_text() + assert "X_OAUTH2_ACCESS_TOKEN" not in text2 + assert "X_OAUTH2_REFRESH_TOKEN" not in text2 + assert "X_OAUTH2_EXPIRES_AT" not in text2 + + +def test_persist_oauth2_tokens_unsets_optional_fields_when_missing(tmp_path: Path): + env_path = tmp_path / ".env" + persist_oauth2_tokens( + env_path, + access_token="access-1", + refresh_token="refresh-1", + expires_at=1234, + ) + persist_oauth2_tokens( + env_path, + access_token="access-2", + refresh_token=None, + expires_at=None, + ) + text = env_path.read_text() + assert "X_OAUTH2_ACCESS_TOKEN=access-2" in text + assert "X_OAUTH2_REFRESH_TOKEN" not in text + assert "X_OAUTH2_EXPIRES_AT" not in text + + +def test_exchange_code_for_token_uses_basic_header_when_secret_present(): + def handler(request: httpx.Request) -> httpx.Response: + expected = base64.b64encode(b"cid:csecret").decode() + assert request.headers.get("Authorization") == f"Basic {expected}" + body = urllib.parse.parse_qs(request.content.decode()) + assert body["client_id"][0] == "cid" + return httpx.Response(200, request=request, json={"access_token": "a"}) + + with httpx.Client(transport=httpx.MockTransport(handler)) as http: + data = exchange_code_for_token( + http, + client_id="cid", + client_secret="csecret", + code="abc", + code_verifier="verifier", + redirect_uri="https://example.com/oauth/callback", + ) + assert data["access_token"] == "a" + + +def test_refresh_access_token_uses_basic_header_when_secret_present(): + def handler(request: httpx.Request) -> httpx.Response: + expected = base64.b64encode(b"cid:csecret").decode() + assert request.headers.get("Authorization") == f"Basic {expected}" + return httpx.Response(200, request=request, json={"access_token": "a2"}) + + with httpx.Client(transport=httpx.MockTransport(handler)) as http: + data = refresh_access_token( + http, + client_id="cid", + client_secret="csecret", + refresh_token="r1", + ) + assert data["access_token"] == "a2" From 76ced2d6a063a6fb3dd204b7fe3645ad91c44dfc Mon Sep 17 00:00:00 2001 From: Alexander Goldstein Date: Wed, 18 Feb 2026 00:23:46 -0500 Subject: [PATCH 3/4] Split mutable OAuth2 tokens into .env.auth2 with safe auto-migration Move rotating OAuth2 token state (access/refresh/expiry) out of ~/.config/x-cli/.env into ~/.config/x-cli/.env.auth2 while keeping static OAuth and app config in .env. Implementation details: - add get_config_auth2_env_path() and load order that overlays .env.auth2 over .env - auto-migrate legacy token keys from .env -> .env.auth2 - make migration best-effort so commands do not fail when .env.auth2 is not writable - persist/clear OAuth2 tokens from auth login/logout and refresh flows in .env.auth2 - update docs for static vs mutable env files and migration behavior - add tests for env precedence, migration, and unwritable destination fallback --- LLMs.md | 9 ++++---- README.md | 16 ++++++++++++- src/x_cli/api.py | 4 ++-- src/x_cli/auth.py | 11 ++++++++- src/x_cli/cli.py | 8 +++---- src/x_cli/oauth2.py | 35 ++++++++++++++++++++++++++++- tests/test_auth.py | 20 ++++++++++++++++- tests/test_cli_auth.py | 6 ++--- tests/test_oauth2.py | 51 ++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 143 insertions(+), 17 deletions(-) diff --git a/LLMs.md b/LLMs.md index 661c3cb..c440aba 100644 --- a/LLMs.md +++ b/LLMs.md @@ -13,7 +13,7 @@ x-cli is a Python CLI that talks directly to the Twitter/X API v2. It uses: No third-party auth frameworks are used; OAuth signing/challenge logic is implemented in project code. -It shares the same credentials as x-mcp (the MCP server counterpart). If a user already has x-mcp configured, they can symlink its `.env` to `~/.config/x-cli/.env`. +It shares static credentials with x-mcp via `~/.config/x-cli/.env`. Mutable OAuth2 token keys are stored in `~/.config/x-cli/.env.auth2`. --- @@ -65,7 +65,7 @@ All methods return raw `dict` parsed from the API JSON response. Error handling Two responsibilities: -1. **`load_credentials()`** -- Loads required OAuth1/app vars plus optional OAuth2 vars from `~/.config/x-cli/.env` and current directory `.env`. +1. **`load_credentials()`** -- Loads static vars from `~/.config/x-cli/.env` and current directory `.env`, then overlays mutable OAuth2 token vars from `~/.config/x-cli/.env.auth2`. 2. **`generate_oauth_header()`** -- Builds an OAuth 1.0a `Authorization` header using HMAC-SHA1. Follows the standard OAuth signature base string construction: percent-encode params, sort, concatenate with `&`, sign with consumer secret + token secret. Query string parameters from the URL are included in the signature base string (required by OAuth spec). @@ -76,7 +76,8 @@ Query string parameters from the URL are included in the signature base string ( - Builds browser authorization URL. - Parses redirected URL for code/state validation (user must paste full browser address-bar URL). - Exchanges authorization code for tokens and refreshes access token. -- Persists/removes OAuth2 token env vars in `~/.config/x-cli/.env`. +- Persists/removes OAuth2 token env vars in `~/.config/x-cli/.env.auth2`. +- Auto-migrates legacy token keys from `~/.config/x-cli/.env` to `.env.auth2`. ### `formatters.py` -- Output @@ -182,5 +183,5 @@ Tests cover utils, formatters, OAuth1 signing, OAuth2 helpers, API auth routing, | 401 on bookmarks after login | OAuth2 token expired/revoked and refresh failed | Re-run `x-cli auth login` | | 401 Unauthorized | Bad credentials | Verify all 5 values in `.env` | | 429 Rate Limited | Too many requests | Error includes reset timestamp | -| "Missing env var" | `.env` not found or incomplete | Check `~/.config/x-cli/.env` or set env vars directly | +| "Missing env var" | Static `.env` missing required keys | Check `~/.config/x-cli/.env` (and optional cwd `.env`) | | `RuntimeError: API error` | Twitter API returned an error | Check the error message for details (usually permissions or invalid IDs) | diff --git a/README.md b/README.md index 593ac7f..cff9561 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,19 @@ X_OAUTH2_CLIENT_SECRET=your_oauth2_client_secret # optional, required by some X x-cli also checks for a `.env` in the current directory. +Mutable OAuth2 token values are stored separately in: + +```bash +~/.config/x-cli/.env.auth2 +``` + +Managed keys: +- `X_OAUTH2_ACCESS_TOKEN` +- `X_OAUTH2_REFRESH_TOKEN` +- `X_OAUTH2_EXPIRES_AT` + +If these keys already exist in `~/.config/x-cli/.env`, x-cli auto-migrates them to `.env.auth2`. + ### OAuth2 login for bookmarks Bookmarks endpoints require OAuth 2.0 User Context. Run: @@ -92,6 +105,7 @@ x-cli auth login - `X_OAUTH2_ACCESS_TOKEN` - `X_OAUTH2_REFRESH_TOKEN` - `X_OAUTH2_EXPIRES_AT` + in `~/.config/x-cli/.env.auth2` You can check or clear saved OAuth2 tokens: @@ -200,7 +214,7 @@ Your refresh token may be expired/revoked. Run `x-cli auth login` again to refre The error includes the reset timestamp. Wait until then. ### "Missing env var" on startup -x-cli looks for credentials in `~/.config/x-cli/.env`, then the current directory's `.env`, then environment variables. Make sure at least one source has all 5 values. +x-cli loads static credentials from `~/.config/x-cli/.env` and current `.env`, then overlays mutable OAuth2 token keys from `~/.config/x-cli/.env.auth2`. --- diff --git a/src/x_cli/api.py b/src/x_cli/api.py index eb116ce..330080a 100644 --- a/src/x_cli/api.py +++ b/src/x_cli/api.py @@ -6,7 +6,7 @@ import httpx -from .auth import Credentials, generate_oauth_header, get_config_env_path +from .auth import Credentials, generate_oauth_header, get_config_auth2_env_path from .oauth2 import expires_at_from_expires_in, persist_oauth2_tokens, refresh_access_token, token_expired API_BASE = "https://api.x.com/2" @@ -127,7 +127,7 @@ def _persist_oauth2_tokens_from_response(self, data: dict[str, Any]) -> None: self.creds.oauth2_expires_at = expires_at persist_oauth2_tokens( - get_config_env_path(), + get_config_auth2_env_path(), access_token=access_token, refresh_token=self.creds.oauth2_refresh_token, expires_at=expires_at, diff --git a/src/x_cli/auth.py b/src/x_cli/auth.py index 4016ed1..a02b1a9 100644 --- a/src/x_cli/auth.py +++ b/src/x_cli/auth.py @@ -13,6 +13,7 @@ from pathlib import Path from dotenv import load_dotenv +from .oauth2 import migrate_legacy_oauth2_tokens @dataclass @@ -33,12 +34,20 @@ def get_config_env_path() -> Path: return Path.home() / ".config" / "x-cli" / ".env" +def get_config_auth2_env_path() -> Path: + return Path.home() / ".config" / "x-cli" / ".env.auth2" + + def load_env_files() -> None: - """Load ~/.config/x-cli/.env and cwd .env into process env.""" + """Load env files with auth2 token precedence over static config.""" config_env = get_config_env_path() if config_env.exists(): load_dotenv(config_env) load_dotenv() + auth2_env = get_config_auth2_env_path() + migrate_legacy_oauth2_tokens(config_env, auth2_env) + if auth2_env.exists(): + load_dotenv(auth2_env, override=True) def load_credentials() -> Credentials: diff --git a/src/x_cli/cli.py b/src/x_cli/cli.py index c447ffc..6d54907 100644 --- a/src/x_cli/cli.py +++ b/src/x_cli/cli.py @@ -9,7 +9,7 @@ import httpx from .api import XApiClient -from .auth import get_config_env_path, load_credentials, load_env_files +from .auth import get_config_auth2_env_path, load_credentials, load_env_files from .formatters import format_output from .oauth2 import ( DEFAULT_REDIRECT_URI, @@ -114,7 +114,7 @@ def auth_login(): refresh_token = token_data.get("refresh_token") expires_at = expires_at_from_expires_in(token_data.get("expires_in")) persist_oauth2_tokens( - get_config_env_path(), + get_config_auth2_env_path(), access_token=access_token, refresh_token=str(refresh_token) if refresh_token else None, expires_at=expires_at, @@ -129,8 +129,8 @@ def auth_login(): @auth.command("logout") def auth_logout(): """Remove stored OAuth2 tokens.""" - clear_oauth2_tokens(get_config_env_path()) - click.echo("Removed OAuth2 tokens from ~/.config/x-cli/.env") + clear_oauth2_tokens(get_config_auth2_env_path()) + click.echo("Removed OAuth2 tokens from ~/.config/x-cli/.env.auth2") @auth.command("status") diff --git a/src/x_cli/oauth2.py b/src/x_cli/oauth2.py index 519aa7d..35fcd23 100644 --- a/src/x_cli/oauth2.py +++ b/src/x_cli/oauth2.py @@ -11,7 +11,7 @@ from typing import Any import httpx -from dotenv import set_key, unset_key +from dotenv import dotenv_values, set_key, unset_key AUTH_URL = "https://twitter.com/i/oauth2/authorize" TOKEN_URL = "https://api.x.com/2/oauth2/token" @@ -148,6 +148,39 @@ def ensure_env_file(path: Path) -> None: path.chmod(0o600) +def migrate_legacy_oauth2_tokens(config_env_path: Path, auth2_env_path: Path) -> None: + """Move mutable OAuth2 token keys from .env to .env.auth2.""" + if not config_env_path.exists(): + return + + config_values = dotenv_values(config_env_path) + legacy_values = { + key: config_values.get(key) + for key in OAUTH2_ENV_KEYS + if key in config_values + } + if not legacy_values: + return + + try: + auth2_values = dotenv_values(auth2_env_path) if auth2_env_path.exists() else {} + ensure_env_file(auth2_env_path) + for key, value in legacy_values.items(): + if auth2_values.get(key): + continue + if value: + set_key(str(auth2_env_path), key, str(value), quote_mode="never") + + merged_auth2 = dotenv_values(auth2_env_path) + for key, value in legacy_values.items(): + # Remove from .env once value exists in .env.auth2 (or was empty in .env). + if merged_auth2.get(key) or not value: + unset_key(str(config_env_path), key, quote_mode="never") + except OSError: + # Best-effort migration: keep legacy values in .env if auth2 file is not writable. + return + + def persist_oauth2_tokens( path: Path, *, diff --git a/tests/test_auth.py b/tests/test_auth.py index e146644..57b36e8 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,9 +1,11 @@ """Tests for x_cli.auth.""" +import os +from pathlib import Path import pytest -from x_cli.auth import generate_oauth_header, Credentials +from x_cli.auth import Credentials, generate_oauth_header, load_env_files @pytest.fixture @@ -49,3 +51,19 @@ def test_url_with_query_params(self, creds): url = "https://api.x.com/2/tweets/123?tweet.fields=created_at,public_metrics" header = generate_oauth_header("GET", url, creds) assert header.startswith("OAuth ") + + +def test_load_env_files_prefers_auth2_file(monkeypatch, tmp_path: Path): + config_env = tmp_path / ".env" + auth2_env = tmp_path / ".env.auth2" + config_env.write_text("X_OAUTH2_ACCESS_TOKEN=config-token\n") + auth2_env.write_text("X_OAUTH2_ACCESS_TOKEN=auth2-token\n") + + monkeypatch.chdir(tmp_path) + monkeypatch.setattr("x_cli.auth.get_config_env_path", lambda: config_env) + monkeypatch.setattr("x_cli.auth.get_config_auth2_env_path", lambda: auth2_env) + monkeypatch.delenv("X_OAUTH2_ACCESS_TOKEN", raising=False) + + load_env_files() + + assert os.environ.get("X_OAUTH2_ACCESS_TOKEN") == "auth2-token" diff --git a/tests/test_cli_auth.py b/tests/test_cli_auth.py index e7bbd08..45404fa 100644 --- a/tests/test_cli_auth.py +++ b/tests/test_cli_auth.py @@ -57,13 +57,13 @@ def fake_persist(path, *, access_token, refresh_token, expires_at): saved["expires_at"] = expires_at monkeypatch.setattr("x_cli.cli.persist_oauth2_tokens", fake_persist) - monkeypatch.setattr("x_cli.cli.get_config_env_path", lambda: Path("/tmp/fake.env")) + monkeypatch.setattr("x_cli.cli.get_config_auth2_env_path", lambda: Path("/tmp/fake.env.auth2")) runner = CliRunner() result = runner.invoke(cli, ["auth", "login"], input="https://example.com/oauth/callback?code=x&state=y\n") assert result.exit_code == 0 assert "OAuth2 login successful" in result.output - assert saved["path"] == Path("/tmp/fake.env") + assert saved["path"] == Path("/tmp/fake.env.auth2") assert saved["access_token"] == "a1" assert saved["refresh_token"] == "r1" assert isinstance(saved["expires_at"], int) @@ -97,7 +97,7 @@ def test_auth_logout_clears_tokens(monkeypatch): def fake_clear(path): cleared["called"] = True - assert str(path).endswith("/.config/x-cli/.env") + assert str(path).endswith("/.config/x-cli/.env.auth2") monkeypatch.setattr("x_cli.cli.clear_oauth2_tokens", fake_clear) diff --git a/tests/test_oauth2.py b/tests/test_oauth2.py index 2330a73..ed6eb8c 100644 --- a/tests/test_oauth2.py +++ b/tests/test_oauth2.py @@ -14,6 +14,7 @@ extract_code_from_redirect_url, generate_code_challenge, generate_code_verifier, + migrate_legacy_oauth2_tokens, refresh_access_token, persist_oauth2_tokens, ) @@ -136,3 +137,53 @@ def handler(request: httpx.Request) -> httpx.Response: refresh_token="r1", ) assert data["access_token"] == "a2" + + +def test_migrate_legacy_oauth2_tokens_moves_mutable_keys(tmp_path: Path): + config_env = tmp_path / ".env" + auth2_env = tmp_path / ".env.auth2" + config_env.write_text( + "X_API_KEY=static\n" + "X_OAUTH2_ACCESS_TOKEN=old-access\n" + "X_OAUTH2_REFRESH_TOKEN=old-refresh\n" + "X_OAUTH2_EXPIRES_AT=1234\n" + ) + + migrate_legacy_oauth2_tokens(config_env, auth2_env) + + config_text = config_env.read_text() + auth2_text = auth2_env.read_text() + assert "X_API_KEY=static" in config_text + assert "X_OAUTH2_ACCESS_TOKEN" not in config_text + assert "X_OAUTH2_REFRESH_TOKEN" not in config_text + assert "X_OAUTH2_EXPIRES_AT" not in config_text + assert "X_OAUTH2_ACCESS_TOKEN=old-access" in auth2_text + assert "X_OAUTH2_REFRESH_TOKEN=old-refresh" in auth2_text + assert "X_OAUTH2_EXPIRES_AT=1234" in auth2_text + + +def test_migrate_legacy_oauth2_tokens_keeps_auth2_precedence(tmp_path: Path): + config_env = tmp_path / ".env" + auth2_env = tmp_path / ".env.auth2" + config_env.write_text("X_OAUTH2_ACCESS_TOKEN=old-access\n") + auth2_env.write_text("X_OAUTH2_ACCESS_TOKEN=new-access\n") + + migrate_legacy_oauth2_tokens(config_env, auth2_env) + + assert "X_OAUTH2_ACCESS_TOKEN=new-access" in auth2_env.read_text() + assert "X_OAUTH2_ACCESS_TOKEN" not in config_env.read_text() + + +def test_migrate_legacy_oauth2_tokens_best_effort_on_unwritable_auth2(monkeypatch, tmp_path: Path): + config_env = tmp_path / ".env" + auth2_env = tmp_path / ".env.auth2" + config_env.write_text("X_OAUTH2_ACCESS_TOKEN=old-access\n") + + def fail_ensure(_: Path): + raise PermissionError("no write access") + + monkeypatch.setattr("x_cli.oauth2.ensure_env_file", fail_ensure) + migrate_legacy_oauth2_tokens(config_env, auth2_env) + + # Legacy token remains in .env when migration cannot write destination. + assert "X_OAUTH2_ACCESS_TOKEN=old-access" in config_env.read_text() From b18896e6de857e9e6ff6d6b248fdcf3af141f771 Mon Sep 17 00:00:00 2001 From: Alexander Goldstein Date: Wed, 18 Feb 2026 01:04:06 -0500 Subject: [PATCH 4/4] Refactor OAuth2 code paths to reduce redundancy Consolidate repeated OAuth2 request, URL query, and user-id plumbing across API helpers and CLI command handlers while preserving existing behavior. What changed: - oauth2.py: split legacy token migration into focused read/write/remove helpers and simplify orchestrating flow. - api.py: add shared _request, _query_url, and _get_user_id helpers; route bookmark and related endpoints through common logic. - cli.py: add helper wrappers for repeated command glue (output formatting, tweet-id resolution, identity lookup, OAuth2 status lines). Why: - Reduce duplicate code and branching complexity in the OAuth2 branch. - Make bookmark-oriented OAuth2 login/usage paths easier to maintain. Verification: - UV_CACHE_DIR=.uv-cache uv run ruff check - UV_CACHE_DIR=.uv-cache uv run pytest -q (51 passed) --- src/x_cli/api.py | 93 ++++++++++++++++++++++++++++------------ src/x_cli/cli.py | 100 +++++++++++++++++++++----------------------- src/x_cli/oauth2.py | 56 +++++++++++++++---------- 3 files changed, 149 insertions(+), 100 deletions(-) diff --git a/src/x_cli/api.py b/src/x_cli/api.py index 330080a..c6ceb98 100644 --- a/src/x_cli/api.py +++ b/src/x_cli/api.py @@ -24,8 +24,30 @@ def close(self) -> None: # ---- internal ---- + def _request( + self, + method: str, + url: str, + *, + headers: dict[str, str], + params: dict[str, str] | None = None, + json_body: dict | None = None, + ) -> httpx.Response: + return self._http.request( + method, + url, + headers=headers, + params=params, + json=json_body if json_body is not None else None, + ) + + @staticmethod + def _query_url(base_url: str, params: dict[str, str]) -> str: + qs = "&".join(f"{k}={v}" for k, v in params.items()) + return f"{base_url}?{qs}" if qs else base_url + def _bearer_get(self, url: str) -> dict[str, Any]: - resp = self._http.get(url, headers={"Authorization": f"Bearer {self.creds.bearer_token}"}) + resp = self._request("GET", url, headers={"Authorization": f"Bearer {self.creds.bearer_token}"}) return self._handle(resp) def _oauth_request(self, method: str, url: str, json_body: dict | None = None) -> dict[str, Any]: @@ -33,7 +55,7 @@ def _oauth_request(self, method: str, url: str, json_body: dict | None = None) - headers: dict[str, str] = {"Authorization": auth_header} if json_body is not None: headers["Content-Type"] = "application/json" - resp = self._http.request(method, url, headers=headers, json=json_body if json_body else None) + resp = self._request(method, url, headers=headers, json_body=json_body) return self._handle(resp) def _oauth2_user_request( @@ -48,7 +70,7 @@ def _oauth2_user_request( headers: dict[str, str] = {"Authorization": f"Bearer {self.creds.oauth2_access_token}"} if json_body is not None: headers["Content-Type"] = "application/json" - resp = self._http.request(method, url, headers=headers, json=json_body if json_body else None) + resp = self._request(method, url, headers=headers, json_body=json_body) if resp.status_code == 401 and retry_on_401: self._refresh_oauth2_access_token() return self._oauth2_user_request(method, url, json_body, retry_on_401=False) @@ -147,6 +169,11 @@ def get_authenticated_user_id_oauth2(self) -> str: self._oauth2_user_id = data["data"]["id"] return self._oauth2_user_id + def _get_user_id(self, *, oauth2: bool = False) -> str: + if oauth2: + return self.get_authenticated_user_id_oauth2() + return self.get_authenticated_user_id() + # ---- tweets ---- def post_tweet( @@ -176,8 +203,7 @@ def get_tweet(self, tweet_id: str) -> dict[str, Any]: "user.fields": "name,username,verified,profile_image_url,public_metrics", "media.fields": "url,preview_image_url,type,width,height,alt_text", } - qs = "&".join(f"{k}={v}" for k, v in params.items()) - return self._bearer_get(f"{API_BASE}/tweets/{tweet_id}?{qs}") + return self._bearer_get(self._query_url(f"{API_BASE}/tweets/{tweet_id}", params)) def search_tweets(self, query: str, max_results: int = 10) -> dict[str, Any]: max_results = max(10, min(max_results, 100)) @@ -190,18 +216,32 @@ def search_tweets(self, query: str, max_results: int = 10) -> dict[str, Any]: "media.fields": "url,preview_image_url,type", } url = f"{API_BASE}/tweets/search/recent" - resp = self._http.get(url, params=params, headers={"Authorization": f"Bearer {self.creds.bearer_token}"}) + resp = self._request( + "GET", + url, + headers={"Authorization": f"Bearer {self.creds.bearer_token}"}, + params=params, + ) return self._handle(resp) def get_tweet_metrics(self, tweet_id: str) -> dict[str, Any]: - params = "tweet.fields=public_metrics,non_public_metrics,organic_metrics" - return self._oauth_request("GET", f"{API_BASE}/tweets/{tweet_id}?{params}") + return self._oauth_request( + "GET", + self._query_url( + f"{API_BASE}/tweets/{tweet_id}", + {"tweet.fields": "public_metrics,non_public_metrics,organic_metrics"}, + ), + ) # ---- users ---- def get_user(self, username: str) -> dict[str, Any]: - fields = "user.fields=created_at,description,public_metrics,verified,profile_image_url,url,location,pinned_tweet_id" - return self._bearer_get(f"{API_BASE}/users/by/username/{username}?{fields}") + return self._bearer_get( + self._query_url( + f"{API_BASE}/users/by/username/{username}", + {"user.fields": "created_at,description,public_metrics,verified,profile_image_url,url,location,pinned_tweet_id"}, + ) + ) def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]: max_results = max(5, min(max_results, 100)) @@ -212,10 +252,11 @@ def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]: "user.fields": "name,username,verified", "media.fields": "url,preview_image_url,type", } - resp = self._http.get( + resp = self._request( + "GET", f"{API_BASE}/users/{user_id}/tweets", - params=params, headers={"Authorization": f"Bearer {self.creds.bearer_token}"}, + params=params, ) return self._handle(resp) @@ -225,10 +266,11 @@ def get_followers(self, user_id: str, max_results: int = 100) -> dict[str, Any]: "max_results": str(max_results), "user.fields": "created_at,description,public_metrics,verified,profile_image_url", } - resp = self._http.get( + resp = self._request( + "GET", f"{API_BASE}/users/{user_id}/followers", - params=params, headers={"Authorization": f"Bearer {self.creds.bearer_token}"}, + params=params, ) return self._handle(resp) @@ -238,15 +280,16 @@ def get_following(self, user_id: str, max_results: int = 100) -> dict[str, Any]: "max_results": str(max_results), "user.fields": "created_at,description,public_metrics,verified,profile_image_url", } - resp = self._http.get( + resp = self._request( + "GET", f"{API_BASE}/users/{user_id}/following", - params=params, headers={"Authorization": f"Bearer {self.creds.bearer_token}"}, + params=params, ) return self._handle(resp) def get_mentions(self, max_results: int = 10) -> dict[str, Any]: - user_id = self.get_authenticated_user_id() + user_id = self._get_user_id() max_results = max(5, min(max_results, 100)) params = { "max_results": str(max_results), @@ -254,24 +297,23 @@ def get_mentions(self, max_results: int = 10) -> dict[str, Any]: "expansions": "author_id", "user.fields": "name,username,verified", } - qs = "&".join(f"{k}={v}" for k, v in params.items()) - url = f"{API_BASE}/users/{user_id}/mentions?{qs}" + url = self._query_url(f"{API_BASE}/users/{user_id}/mentions", params) return self._oauth_request("GET", url) # ---- engagement ---- def like_tweet(self, tweet_id: str) -> dict[str, Any]: - user_id = self.get_authenticated_user_id() + user_id = self._get_user_id() return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/likes", {"tweet_id": tweet_id}) def retweet(self, tweet_id: str) -> dict[str, Any]: - user_id = self.get_authenticated_user_id() + user_id = self._get_user_id() return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/retweets", {"tweet_id": tweet_id}) # ---- bookmarks (OAuth 2.0 User Context) ---- def get_bookmarks(self, max_results: int = 10) -> dict[str, Any]: - user_id = self.get_authenticated_user_id_oauth2() + user_id = self._get_user_id(oauth2=True) max_results = max(1, min(max_results, 100)) params = { "max_results": str(max_results), @@ -280,14 +322,13 @@ def get_bookmarks(self, max_results: int = 10) -> dict[str, Any]: "user.fields": "name,username,verified,profile_image_url", "media.fields": "url,preview_image_url,type", } - qs = "&".join(f"{k}={v}" for k, v in params.items()) - url = f"{API_BASE}/users/{user_id}/bookmarks?{qs}" + url = self._query_url(f"{API_BASE}/users/{user_id}/bookmarks", params) return self._oauth2_user_request("GET", url) def bookmark_tweet(self, tweet_id: str) -> dict[str, Any]: - user_id = self.get_authenticated_user_id_oauth2() + user_id = self._get_user_id(oauth2=True) return self._oauth2_user_request("POST", f"{API_BASE}/users/{user_id}/bookmarks", {"tweet_id": tweet_id}) def unbookmark_tweet(self, tweet_id: str) -> dict[str, Any]: - user_id = self.get_authenticated_user_id_oauth2() + user_id = self._get_user_id(oauth2=True) return self._oauth2_user_request("DELETE", f"{API_BASE}/users/{user_id}/bookmarks/{tweet_id}") diff --git a/src/x_cli/cli.py b/src/x_cli/cli.py index 6d54907..bc1442f 100644 --- a/src/x_cli/cli.py +++ b/src/x_cli/cli.py @@ -46,6 +46,39 @@ def output(self, data, title: str = "") -> None: pass_state = click.make_pass_decorator(State) +def _call_and_output(state: State, title: str, fn, *args, **kwargs) -> None: + data = fn(*args, **kwargs) + state.output(data, title) + + +def _call_with_tweet_id(state: State, id_or_url: str, title: str, fn) -> None: + tid = parse_tweet_id(id_or_url) + _call_and_output(state, title, fn, tid) + + +def _resolve_user_identity(state: State, username: str) -> tuple[str, str]: + uname = strip_at(username) + user_data = state.client.get_user(uname) + return uname, user_data["data"]["id"] + + +def _oauth2_status_lines(access: str | None, refresh: str | None, expires_raw: str | None) -> list[str]: + if not access: + return ["OAuth2: not logged in"] + lines = ["OAuth2: logged in", f"Refresh token: {'present' if refresh else 'missing'}"] + if not expires_raw: + lines.append("Access token expiry: unknown") + return lines + try: + expires_at = int(expires_raw) + except ValueError: + lines.append("Access token expiry: invalid value in X_OAUTH2_EXPIRES_AT") + return lines + remaining = expires_at - int(time.time()) + lines.append("Access token expiry: expired" if remaining <= 0 else f"Access token expiry: in {remaining // 60} minutes") + return lines + + @click.group() @click.option("--json", "-j", "fmt", flag_value="json", help="JSON output") @click.option("--plain", "-p", "fmt", flag_value="plain", help="TSV output for piping") @@ -140,24 +173,8 @@ def auth_status(): access = os.environ.get("X_OAUTH2_ACCESS_TOKEN") refresh = os.environ.get("X_OAUTH2_REFRESH_TOKEN") expires_raw = os.environ.get("X_OAUTH2_EXPIRES_AT") - if not access: - click.echo("OAuth2: not logged in") - return - click.echo("OAuth2: logged in") - click.echo(f"Refresh token: {'present' if refresh else 'missing'}") - if not expires_raw: - click.echo("Access token expiry: unknown") - return - try: - expires_at = int(expires_raw) - except ValueError: - click.echo("Access token expiry: invalid value in X_OAUTH2_EXPIRES_AT") - return - remaining = expires_at - int(time.time()) - if remaining <= 0: - click.echo("Access token expiry: expired") - else: - click.echo(f"Access token expiry: in {remaining // 60} minutes") + for line in _oauth2_status_lines(access, refresh, expires_raw): + click.echo(line) # ============================================================ @@ -187,8 +204,7 @@ def tweet_post(state, text, poll, poll_duration): def tweet_get(state, id_or_url): """Fetch a tweet by ID or URL.""" tid = parse_tweet_id(id_or_url) - data = state.client.get_tweet(tid) - state.output(data, f"Tweet {tid}") + _call_and_output(state, f"Tweet {tid}", state.client.get_tweet, tid) @tweet.command("delete") @@ -196,9 +212,7 @@ def tweet_get(state, id_or_url): @pass_state def tweet_delete(state, id_or_url): """Delete a tweet.""" - tid = parse_tweet_id(id_or_url) - data = state.client.delete_tweet(tid) - state.output(data, "Deleted") + _call_with_tweet_id(state, id_or_url, "Deleted", state.client.delete_tweet) @tweet.command("reply") @@ -239,8 +253,7 @@ def tweet_search(state, query, max_results): def tweet_metrics(state, id_or_url): """Get tweet engagement metrics.""" tid = parse_tweet_id(id_or_url) - data = state.client.get_tweet_metrics(tid) - state.output(data, f"Metrics {tid}") + _call_and_output(state, f"Metrics {tid}", state.client.get_tweet_metrics, tid) # ============================================================ @@ -267,11 +280,8 @@ def user_get(state, username): @pass_state def user_timeline(state, username, max_results): """Fetch a user's recent tweets.""" - uname = strip_at(username) - user_data = state.client.get_user(uname) - uid = user_data["data"]["id"] - data = state.client.get_timeline(uid, max_results) - state.output(data, f"@{uname} timeline") + uname, uid = _resolve_user_identity(state, username) + _call_and_output(state, f"@{uname} timeline", state.client.get_timeline, uid, max_results) @user.command("followers") @@ -280,11 +290,8 @@ def user_timeline(state, username, max_results): @pass_state def user_followers(state, username, max_results): """List a user's followers.""" - uname = strip_at(username) - user_data = state.client.get_user(uname) - uid = user_data["data"]["id"] - data = state.client.get_followers(uid, max_results) - state.output(data, f"@{uname} followers") + uname, uid = _resolve_user_identity(state, username) + _call_and_output(state, f"@{uname} followers", state.client.get_followers, uid, max_results) @user.command("following") @@ -293,11 +300,8 @@ def user_followers(state, username, max_results): @pass_state def user_following(state, username, max_results): """List who a user follows.""" - uname = strip_at(username) - user_data = state.client.get_user(uname) - uid = user_data["data"]["id"] - data = state.client.get_following(uid, max_results) - state.output(data, f"@{uname} following") + uname, uid = _resolve_user_identity(state, username) + _call_and_output(state, f"@{uname} following", state.client.get_following, uid, max_results) # ============================================================ @@ -332,9 +336,7 @@ def me_bookmarks(state, max_results): @pass_state def me_bookmark(state, id_or_url): """Bookmark a tweet.""" - tid = parse_tweet_id(id_or_url) - data = state.client.bookmark_tweet(tid) - state.output(data, "Bookmarked") + _call_with_tweet_id(state, id_or_url, "Bookmarked", state.client.bookmark_tweet) @me.command("unbookmark") @@ -342,9 +344,7 @@ def me_bookmark(state, id_or_url): @pass_state def me_unbookmark(state, id_or_url): """Remove a bookmark.""" - tid = parse_tweet_id(id_or_url) - data = state.client.unbookmark_tweet(tid) - state.output(data, "Unbookmarked") + _call_with_tweet_id(state, id_or_url, "Unbookmarked", state.client.unbookmark_tweet) # ============================================================ @@ -356,9 +356,7 @@ def me_unbookmark(state, id_or_url): @pass_state def like(state, id_or_url): """Like a tweet.""" - tid = parse_tweet_id(id_or_url) - data = state.client.like_tweet(tid) - state.output(data, "Liked") + _call_with_tweet_id(state, id_or_url, "Liked", state.client.like_tweet) @cli.command("retweet") @@ -366,9 +364,7 @@ def like(state, id_or_url): @pass_state def retweet(state, id_or_url): """Retweet a tweet.""" - tid = parse_tweet_id(id_or_url) - data = state.client.retweet(tid) - state.output(data, "Retweeted") + _call_with_tweet_id(state, id_or_url, "Retweeted", state.client.retweet) def main(): diff --git a/src/x_cli/oauth2.py b/src/x_cli/oauth2.py index 35fcd23..872669b 100644 --- a/src/x_cli/oauth2.py +++ b/src/x_cli/oauth2.py @@ -150,37 +150,49 @@ def ensure_env_file(path: Path) -> None: def migrate_legacy_oauth2_tokens(config_env_path: Path, auth2_env_path: Path) -> None: """Move mutable OAuth2 token keys from .env to .env.auth2.""" - if not config_env_path.exists(): - return - - config_values = dotenv_values(config_env_path) - legacy_values = { - key: config_values.get(key) - for key in OAUTH2_ENV_KEYS - if key in config_values - } + legacy_values = _read_legacy_oauth2_values(config_env_path) if not legacy_values: return try: - auth2_values = dotenv_values(auth2_env_path) if auth2_env_path.exists() else {} - ensure_env_file(auth2_env_path) - for key, value in legacy_values.items(): - if auth2_values.get(key): - continue - if value: - set_key(str(auth2_env_path), key, str(value), quote_mode="never") - - merged_auth2 = dotenv_values(auth2_env_path) - for key, value in legacy_values.items(): - # Remove from .env once value exists in .env.auth2 (or was empty in .env). - if merged_auth2.get(key) or not value: - unset_key(str(config_env_path), key, quote_mode="never") + merged_auth2 = _write_missing_auth2_values(auth2_env_path, legacy_values) + _remove_migrated_legacy_values(config_env_path, legacy_values, merged_auth2) except OSError: # Best-effort migration: keep legacy values in .env if auth2 file is not writable. return +def _read_legacy_oauth2_values(config_env_path: Path) -> dict[str, str | None]: + if not config_env_path.exists(): + return {} + config_values = dotenv_values(config_env_path) + return {key: config_values.get(key) for key in OAUTH2_ENV_KEYS if key in config_values} + + +def _write_missing_auth2_values( + auth2_env_path: Path, + legacy_values: dict[str, str | None], +) -> dict[str, str | None]: + auth2_values = dotenv_values(auth2_env_path) if auth2_env_path.exists() else {} + ensure_env_file(auth2_env_path) + for key, value in legacy_values.items(): + if auth2_values.get(key) or not value: + continue + set_key(str(auth2_env_path), key, str(value), quote_mode="never") + return dotenv_values(auth2_env_path) + + +def _remove_migrated_legacy_values( + config_env_path: Path, + legacy_values: dict[str, str | None], + merged_auth2: dict[str, str | None], +) -> None: + for key, value in legacy_values.items(): + # Remove from .env once value exists in .env.auth2 (or was empty in .env). + if merged_auth2.get(key) or not value: + unset_key(str(config_env_path), key, quote_mode="never") + + def persist_oauth2_tokens( path: Path, *,