feat: enhance HTTP handling and configuration
All checks were successful
CI / lint-and-test (push) Successful in 24s

- Introduced a new utility function `safe_urlopen` to ensure only allowed URL schemes (http, https) are opened, enhancing security against path traversal vulnerabilities.
- Updated the `run.py` and `calendar_ics.py` files to utilize `safe_urlopen` for HTTP requests, improving error handling and security.
- Added `HTTP_HOST` configuration to the settings, allowing dynamic binding of the HTTP server host.
- Revised the `.env.example` file to include the new `HTTP_HOST` variable with a description.
- Enhanced tests for `safe_urlopen` to validate behavior with disallowed URL schemes and ensure proper integration in existing functionality.
This commit is contained in:
2026-02-24 14:16:34 +03:00
parent e6bc60b436
commit d5da265b5f
11 changed files with 150 additions and 19 deletions

View File

@@ -2,11 +2,13 @@
import logging
from datetime import date, datetime, timedelta
from urllib.request import Request, urlopen
from urllib.request import Request
from urllib.error import URLError
from icalendar import Calendar
from duty_teller.utils.http_client import safe_urlopen
log = logging.getLogger(__name__)
# In-memory cache: url -> (cached_at_timestamp, raw_ics_bytes)
@@ -16,11 +18,14 @@ FETCH_TIMEOUT_SECONDS = 15
def _fetch_ics(url: str) -> bytes | None:
"""GET url, return response body or None on error."""
"""GET url, return response body or None on error. Only https/http schemes allowed."""
try:
req = Request(url, headers={"User-Agent": "DutyTeller/1.0"})
with urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp:
with safe_urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp:
return resp.read()
except ValueError:
log.warning("ICS URL scheme not allowed (only https/http): %s", url)
return None
except URLError as e:
log.warning("Failed to fetch ICS from %s: %s", url, e)
return None

View File

@@ -53,6 +53,7 @@ class Settings:
bot_token: str
database_url: str
mini_app_base_url: str
http_host: str
http_port: int
allowed_usernames: set[str]
admin_usernames: set[str]
@@ -90,10 +91,13 @@ class Settings:
if raw_cors and raw_cors != "*"
else ["*"]
)
raw_host = (os.getenv("HTTP_HOST") or "127.0.0.1").strip()
http_host = raw_host if raw_host else "127.0.0.1"
return cls(
bot_token=bot_token,
database_url=os.getenv("DATABASE_URL", "sqlite:///data/duty_teller.db"),
mini_app_base_url=os.getenv("MINI_APP_BASE_URL", "").rstrip("/"),
http_host=http_host,
http_port=int(os.getenv("HTTP_PORT", "8080")),
allowed_usernames=allowed,
admin_usernames=admin,
@@ -120,6 +124,7 @@ _settings = Settings.from_env()
BOT_TOKEN = _settings.bot_token
DATABASE_URL = _settings.database_url
MINI_APP_BASE_URL = _settings.mini_app_base_url
HTTP_HOST = _settings.http_host
HTTP_PORT = _settings.http_port
ALLOWED_USERNAMES = _settings.allowed_usernames
ADMIN_USERNAMES = _settings.admin_usernames

View File

@@ -11,6 +11,7 @@ from telegram.ext import ApplicationBuilder
from duty_teller import config
from duty_teller.config import require_bot_token
from duty_teller.handlers import group_duty_pin, register_handlers
from duty_teller.utils.http_client import safe_urlopen
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
@@ -39,7 +40,7 @@ def _set_default_menu_button_webapp() -> None:
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
with safe_urlopen(req, timeout=10) as resp:
if resp.status == 200:
logger.info("Default menu button set to Web App: %s", menu_url)
else:
@@ -54,7 +55,7 @@ def _run_uvicorn(web_app, port: int) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
server = uvicorn.Server(
uvicorn.Config(web_app, host="0.0.0.0", port=port, log_level="info"),
uvicorn.Config(web_app, host=config.HTTP_HOST, port=port, log_level="info"),
)
loop.run_until_complete(server.serve())

View File

@@ -0,0 +1,36 @@
"""Safe HTTP client: open URLs only for allowed schemes (e.g. https, http) to avoid path traversal (CWE-22)."""
from urllib.request import Request, urlopen
from urllib.parse import urlparse
def safe_urlopen(
request: Request,
timeout: float = 10,
allowed_schemes: tuple[str, ...] = ("https", "http"),
) -> "urlopen":
"""Open URL only if its scheme is in allowed_schemes; otherwise raise ValueError.
Use this instead of raw urlopen() to satisfy Bandit B310 and prevent opening
file:// or other non-HTTP schemes.
Args:
request: urllib.request.Request (has .full_url or .get_full_url()).
timeout: Timeout in seconds for the request.
allowed_schemes: Tuple of lowercase scheme names, e.g. ("https", "http").
Returns:
Context manager from urlopen (use with 'with').
Raises:
ValueError: If the request URL scheme is not in allowed_schemes.
"""
url = request.get_full_url()
parsed = urlparse(url)
scheme = (parsed.scheme or "").lower()
if scheme not in allowed_schemes:
raise ValueError(
f"URL scheme not allowed: {scheme!r} (allowed: {allowed_schemes})"
)
# Scheme validated above; only https/http reach here (B310).
return urlopen(request, timeout=timeout) # nosec B310