diff --git a/.coverage b/.coverage index d5f46e1..6586674 100644 Binary files a/.coverage and b/.coverage differ diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 0000000..4032ec6 --- /dev/null +++ b/.cursorignore @@ -0,0 +1 @@ +.git/ \ No newline at end of file diff --git a/.env.example b/.env.example index 5d6a6ab..1403447 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ BOT_TOKEN=your_bot_token_here DATABASE_URL=sqlite:///data/duty_teller.db MINI_APP_BASE_URL= +# HTTP_HOST=127.0.0.1 # use 0.0.0.0 to bind all interfaces HTTP_PORT=8080 # Access: roles are assigned in the DB by an admin via /set_role. When a user has no role in DB, diff --git a/docs/configuration.md b/docs/configuration.md index 7c75828..130049d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -7,6 +7,7 @@ All configuration is read from the environment (e.g. `.env` via python-dotenv). | **BOT_TOKEN** | string | *(empty)* | Telegram bot token from [@BotFather](https://t.me/BotFather). Required for the bot to run; if unset, the entry point exits with a clear message. The server that serves the Mini App API must use the **same** token as the bot; otherwise initData validation returns `hash_mismatch`. | | **DATABASE_URL** | string (SQLAlchemy URL) | `sqlite:///data/duty_teller.db` | Database connection URL. Example: `sqlite:///data/duty_teller.db`. | | **MINI_APP_BASE_URL** | string (URL, no trailing slash) | *(empty)* | Base URL of the miniapp (for documentation and CORS). Trailing slash is stripped. Example: `https://your-domain.com/app`. | +| **HTTP_HOST** | string | `127.0.0.1` | Host to bind the HTTP server to. Use `127.0.0.1` to listen only on localhost; use `0.0.0.0` to accept connections from all interfaces (e.g. when behind a reverse proxy on another machine). | | **HTTP_PORT** | integer | `8080` | Port for the HTTP server (FastAPI + static webapp). | | **ALLOWED_USERNAMES** | comma-separated list | *(empty)* | **Not used for access.** Kept for reference only. Access to the miniapp is controlled by **roles in the DB** (assigned by an admin via `/set_role`). | | **ADMIN_USERNAMES** | comma-separated list | *(empty)* | Telegram usernames treated as **admin fallback** when the user has **no role in the DB**. If a user has a role in the DB, only that role applies. Example: `admin1,admin2`. | diff --git a/duty_teller/api/calendar_ics.py b/duty_teller/api/calendar_ics.py index 3c25753..31f8320 100644 --- a/duty_teller/api/calendar_ics.py +++ b/duty_teller/api/calendar_ics.py @@ -2,11 +2,13 @@ import logging from datetime import date, datetime, timedelta -from urllib.request import Request, urlopen +from urllib.request import Request from urllib.error import URLError from icalendar import Calendar +from duty_teller.utils.http_client import safe_urlopen + log = logging.getLogger(__name__) # In-memory cache: url -> (cached_at_timestamp, raw_ics_bytes) @@ -16,11 +18,14 @@ FETCH_TIMEOUT_SECONDS = 15 def _fetch_ics(url: str) -> bytes | None: - """GET url, return response body or None on error.""" + """GET url, return response body or None on error. Only https/http schemes allowed.""" try: req = Request(url, headers={"User-Agent": "DutyTeller/1.0"}) - with urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp: + with safe_urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp: return resp.read() + except ValueError: + log.warning("ICS URL scheme not allowed (only https/http): %s", url) + return None except URLError as e: log.warning("Failed to fetch ICS from %s: %s", url, e) return None diff --git a/duty_teller/config.py b/duty_teller/config.py index bcf0297..d9a4987 100644 --- a/duty_teller/config.py +++ b/duty_teller/config.py @@ -53,6 +53,7 @@ class Settings: bot_token: str database_url: str mini_app_base_url: str + http_host: str http_port: int allowed_usernames: set[str] admin_usernames: set[str] @@ -90,10 +91,13 @@ class Settings: if raw_cors and raw_cors != "*" else ["*"] ) + raw_host = (os.getenv("HTTP_HOST") or "127.0.0.1").strip() + http_host = raw_host if raw_host else "127.0.0.1" return cls( bot_token=bot_token, database_url=os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db"), mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"), + http_host=http_host, http_port=int(os.getenv("HTTP_PORT", "8080")), allowed_usernames=allowed, admin_usernames=admin, @@ -120,6 +124,7 @@ _settings = Settings.from_env() BOT_TOKEN = _settings.bot_token DATABASE_URL = _settings.database_url MINI_APP_BASE_URL = _settings.mini_app_base_url +HTTP_HOST = _settings.http_host HTTP_PORT = _settings.http_port ALLOWED_USERNAMES = _settings.allowed_usernames ADMIN_USERNAMES = _settings.admin_usernames diff --git a/duty_teller/run.py b/duty_teller/run.py index 3966ec5..dcb730f 100644 --- a/duty_teller/run.py +++ b/duty_teller/run.py @@ -11,6 +11,7 @@ from telegram.ext import ApplicationBuilder from duty_teller import config from duty_teller.config import require_bot_token from duty_teller.handlers import group_duty_pin, register_handlers +from duty_teller.utils.http_client import safe_urlopen logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", @@ -39,7 +40,7 @@ def _set_default_menu_button_webapp() -> None: method="POST", ) try: - with urllib.request.urlopen(req, timeout=10) as resp: + with safe_urlopen(req, timeout=10) as resp: if resp.status == 200: logger.info("Default menu button set to Web App: %s", menu_url) else: @@ -54,7 +55,7 @@ def _run_uvicorn(web_app, port: int) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) server = uvicorn.Server( - uvicorn.Config(web_app, host="0.0.0.0", port=port, log_level="info"), + uvicorn.Config(web_app, host=config.HTTP_HOST, port=port, log_level="info"), ) loop.run_until_complete(server.serve()) diff --git a/duty_teller/utils/http_client.py b/duty_teller/utils/http_client.py new file mode 100644 index 0000000..83532fa --- /dev/null +++ b/duty_teller/utils/http_client.py @@ -0,0 +1,36 @@ +"""Safe HTTP client: open URLs only for allowed schemes (e.g. https, http) to avoid path traversal (CWE-22).""" + +from urllib.request import Request, urlopen +from urllib.parse import urlparse + + +def safe_urlopen( + request: Request, + timeout: float = 10, + allowed_schemes: tuple[str, ...] = ("https", "http"), +) -> "urlopen": + """Open URL only if its scheme is in allowed_schemes; otherwise raise ValueError. + + Use this instead of raw urlopen() to satisfy Bandit B310 and prevent opening + file:// or other non-HTTP schemes. + + Args: + request: urllib.request.Request (has .full_url or .get_full_url()). + timeout: Timeout in seconds for the request. + allowed_schemes: Tuple of lowercase scheme names, e.g. ("https", "http"). + + Returns: + Context manager from urlopen (use with 'with'). + + Raises: + ValueError: If the request URL scheme is not in allowed_schemes. + """ + url = request.get_full_url() + parsed = urlparse(url) + scheme = (parsed.scheme or "").lower() + if scheme not in allowed_schemes: + raise ValueError( + f"URL scheme not allowed: {scheme!r} (allowed: {allowed_schemes})" + ) + # Scheme validated above; only https/http reach here (B310). + return urlopen(request, timeout=timeout) # nosec B310 diff --git a/tests/test_calendar_ics.py b/tests/test_calendar_ics.py index 14cb196..45b30dc 100644 --- a/tests/test_calendar_ics.py +++ b/tests/test_calendar_ics.py @@ -129,6 +129,17 @@ class TestGetCalendarEvents: def test_empty_url_returns_empty(self): assert mod.get_calendar_events("", "2025-01-01", "2025-01-31") == [] + def test_disallowed_url_scheme_returns_empty(self): + """get_calendar_events: file:// or ftp:// URL does not call urlopen, returns [].""" + result = mod.get_calendar_events( + "file:///etc/passwd", "2025-01-01", "2025-01-31" + ) + assert result == [] + result = mod.get_calendar_events( + "ftp://example.com/cal.ics", "2025-01-01", "2025-01-31" + ) + assert result == [] + def test_from_after_to_returns_empty(self): assert ( mod.get_calendar_events( diff --git a/tests/test_http_client.py b/tests/test_http_client.py new file mode 100644 index 0000000..a05b891 --- /dev/null +++ b/tests/test_http_client.py @@ -0,0 +1,56 @@ +"""Tests for duty_teller.utils.http_client (safe_urlopen scheme check).""" + +from unittest.mock import MagicMock, patch + +import pytest +from urllib.request import Request + +from duty_teller.utils.http_client import safe_urlopen + + +def test_safe_urlopen_rejects_file_scheme(): + """safe_urlopen: file:// URL raises ValueError and does not call urlopen.""" + req = Request("file:///etc/passwd") + with patch("duty_teller.utils.http_client.urlopen") as mock_urlopen: + with pytest.raises(ValueError, match="scheme not allowed"): + with safe_urlopen(req, timeout=5): + pass + mock_urlopen.assert_not_called() + + +def test_safe_urlopen_rejects_ftp_scheme(): + """safe_urlopen: ftp:// URL raises ValueError and does not call urlopen.""" + req = Request("ftp://example.com/file.txt") + with patch("duty_teller.utils.http_client.urlopen") as mock_urlopen: + with pytest.raises(ValueError, match="scheme not allowed"): + with safe_urlopen(req, timeout=5): + pass + mock_urlopen.assert_not_called() + + +def test_safe_urlopen_allows_https(): + """safe_urlopen: https:// URL is allowed and urlopen is called.""" + req = Request("https://api.telegram.org/bot123/setChatMenuButton") + mock_resp = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + with patch( + "duty_teller.utils.http_client.urlopen", return_value=mock_resp + ) as mock_urlopen: + with safe_urlopen(req, timeout=10) as resp: + assert resp is mock_resp + mock_urlopen.assert_called_once_with(req, timeout=10) + + +def test_safe_urlopen_allows_http_when_in_allowed_schemes(): + """safe_urlopen: http:// URL is allowed when allowed_schemes includes http.""" + req = Request("http://localhost:8080/health") + mock_resp = MagicMock() + mock_resp.__enter__ = MagicMock(return_value=mock_resp) + mock_resp.__exit__ = MagicMock(return_value=False) + with patch( + "duty_teller.utils.http_client.urlopen", return_value=mock_resp + ) as mock_urlopen: + with safe_urlopen(req, timeout=5, allowed_schemes=("https", "http")) as resp: + assert resp is mock_resp + mock_urlopen.assert_called_once_with(req, timeout=5) diff --git a/tests/test_run.py b/tests/test_run.py index f711d10..1b9d2f7 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -76,32 +76,32 @@ def test_set_default_menu_button_skips_when_not_https(): def test_set_default_menu_button_calls_telegram_api(): - """_set_default_menu_button_webapp: when URL is https, calls Telegram API (mocked).""" + """_set_default_menu_button_webapp: when URL is https, calls safe_urlopen (Telegram API mocked).""" with patch("duty_teller.run.config") as mock_cfg: mock_cfg.MINI_APP_BASE_URL = "https://example.com/" mock_cfg.BOT_TOKEN = "123:ABC" - with patch("urllib.request.urlopen") as mock_urlopen: + with patch("duty_teller.run.safe_urlopen") as mock_safe_urlopen: mock_resp = MagicMock() mock_resp.status = 200 mock_resp.__enter__ = MagicMock(return_value=mock_resp) mock_resp.__exit__ = MagicMock(return_value=False) - mock_urlopen.return_value = mock_resp + mock_safe_urlopen.return_value = mock_resp _set_default_menu_button_webapp() - mock_urlopen.assert_called_once() - req = mock_urlopen.call_args[0][0] - assert "setChatMenuButton" in req.full_url - assert "123:ABC" in req.full_url + mock_safe_urlopen.assert_called_once() + req = mock_safe_urlopen.call_args[0][0] + assert "setChatMenuButton" in req.get_full_url() + assert "123:ABC" in req.get_full_url() def test_set_default_menu_button_handles_urlopen_error(): - """_set_default_menu_button_webapp: on urlopen error, logs and does not raise.""" + """_set_default_menu_button_webapp: on safe_urlopen error, logs and does not raise.""" with patch("duty_teller.run.config") as mock_cfg: mock_cfg.MINI_APP_BASE_URL = "https://example.com/" mock_cfg.BOT_TOKEN = "123:ABC" - with patch("urllib.request.urlopen") as mock_urlopen: - mock_urlopen.side_effect = OSError("network error") + with patch("duty_teller.run.safe_urlopen") as mock_safe_urlopen: + mock_safe_urlopen.side_effect = OSError("network error") _set_default_menu_button_webapp() - mock_urlopen.assert_called_once() + mock_safe_urlopen.assert_called_once() def test_set_default_menu_button_handles_non_200_response(): @@ -109,11 +109,25 @@ def test_set_default_menu_button_handles_non_200_response(): with patch("duty_teller.run.config") as mock_cfg: mock_cfg.MINI_APP_BASE_URL = "https://example.com/" mock_cfg.BOT_TOKEN = "123:ABC" - with patch("urllib.request.urlopen") as mock_urlopen: + with patch("duty_teller.run.safe_urlopen") as mock_safe_urlopen: mock_resp = MagicMock() mock_resp.status = 400 mock_resp.__enter__ = MagicMock(return_value=mock_resp) mock_resp.__exit__ = MagicMock(return_value=False) - mock_urlopen.return_value = mock_resp + mock_safe_urlopen.return_value = mock_resp _set_default_menu_button_webapp() - mock_urlopen.assert_called_once() + mock_safe_urlopen.assert_called_once() + + +def test_run_uvicorn_uses_config_host(): + """_run_uvicorn: passes config.HTTP_HOST to uvicorn.Config.""" + with patch("uvicorn.Server") as mock_server_class: + mock_server = MagicMock() + mock_server.serve = MagicMock(return_value=asyncio.sleep(0)) + mock_server_class.return_value = mock_server + with patch("duty_teller.run.config") as mock_cfg: + mock_cfg.HTTP_HOST = "127.0.0.1" + _run_uvicorn(MagicMock(), 8080) + call_kwargs = mock_server_class.call_args[0][0] + assert call_kwargs.host == "127.0.0.1" + assert call_kwargs.port == 8080