- Added a TTLCache class for in-memory caching of duty-related data, improving performance by reducing database queries. - Integrated caching into the group duty pin functionality, allowing for efficient retrieval of message text and next shift end times. - Introduced new methods to invalidate caches when relevant data changes, ensuring data consistency. - Created a new Alembic migration to add indexes on the duties table for improved query performance. - Updated tests to cover the new caching behavior and ensure proper functionality.
159 lines
5.0 KiB
Python
159 lines
5.0 KiB
Python
"""Fetch and parse external ICS calendar; in-memory cache with 7-day TTL."""
|
|
|
|
import logging
|
|
from datetime import date, datetime, timedelta
|
|
from urllib.request import Request
|
|
from urllib.error import URLError
|
|
|
|
from icalendar import Calendar
|
|
|
|
from duty_teller.cache import TTLCache
|
|
from duty_teller.utils.http_client import safe_urlopen
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Raw ICS bytes cache: url -> (cached_at_timestamp, raw_ics_bytes)
|
|
_ics_cache: dict[str, tuple[float, bytes]] = {}
|
|
# Parsed events cache: url -> list of {date, summary}. TTL 7 days.
|
|
_parsed_events_cache = TTLCache(ttl_seconds=7 * 24 * 3600, max_size=100)
|
|
CACHE_TTL_SECONDS = 7 * 24 * 3600 # 1 week
|
|
FETCH_TIMEOUT_SECONDS = 15
|
|
|
|
|
|
def _fetch_ics(url: str) -> bytes | None:
|
|
"""GET url, return response body or None on error. Only https/http schemes allowed."""
|
|
try:
|
|
req = Request(url, headers={"User-Agent": "DutyTeller/1.0"})
|
|
with safe_urlopen(req, timeout=FETCH_TIMEOUT_SECONDS) as resp:
|
|
return resp.read()
|
|
except ValueError:
|
|
log.warning("ICS URL scheme not allowed (only https/http): %s", url)
|
|
return None
|
|
except URLError as e:
|
|
log.warning("Failed to fetch ICS from %s: %s", url, e)
|
|
return None
|
|
except OSError as e:
|
|
log.warning("Error fetching ICS from %s: %s", url, e)
|
|
return None
|
|
|
|
|
|
def _to_date(dt) -> date | None:
|
|
"""Convert icalendar DATE or DATE-TIME to date. Return None if invalid."""
|
|
if isinstance(dt, datetime):
|
|
return dt.date()
|
|
if isinstance(dt, date):
|
|
return dt
|
|
return None
|
|
|
|
|
|
def _event_date_range(component) -> tuple[date | None, date | None]:
|
|
"""
|
|
Get (start_date, end_date) for a VEVENT. DTEND is exclusive in iCalendar;
|
|
last day of event = DTEND date - 1 day. Returns (None, None) if invalid.
|
|
"""
|
|
dtstart = component.get("dtstart")
|
|
if not dtstart:
|
|
return (None, None)
|
|
start_d = _to_date(dtstart.dt)
|
|
if not start_d:
|
|
return (None, None)
|
|
|
|
dtend = component.get("dtend")
|
|
if not dtend:
|
|
return (start_d, start_d)
|
|
|
|
end_dt = dtend.dt
|
|
end_d = _to_date(end_dt)
|
|
if not end_d:
|
|
return (start_d, start_d)
|
|
# DTEND is exclusive: last day of event is end_d - 1 day
|
|
last_d = end_d - timedelta(days=1)
|
|
return (start_d, last_d)
|
|
|
|
|
|
def _parse_ics_to_events(raw: bytes) -> list[dict]:
|
|
"""Parse ICS bytes and return all events as list of {date, summary}. One-time events only."""
|
|
result: list[dict] = []
|
|
try:
|
|
cal = Calendar.from_ical(raw)
|
|
if not cal:
|
|
return result
|
|
except Exception as e:
|
|
log.warning("Failed to parse ICS: %s", e)
|
|
return result
|
|
|
|
for component in cal.walk():
|
|
if component.name != "VEVENT":
|
|
continue
|
|
if component.get("rrule"):
|
|
continue # skip recurring in first iteration
|
|
start_d, end_d = _event_date_range(component)
|
|
if not start_d or not end_d:
|
|
continue
|
|
summary = component.get("summary")
|
|
summary_str = str(summary) if summary else ""
|
|
|
|
d = start_d
|
|
while d <= end_d:
|
|
result.append({"date": d.strftime("%Y-%m-%d"), "summary": summary_str})
|
|
d += timedelta(days=1)
|
|
|
|
return result
|
|
|
|
|
|
def _filter_events_by_range(
|
|
events: list[dict], from_date: str, to_date: str
|
|
) -> list[dict]:
|
|
"""Filter events list to [from_date, to_date] range."""
|
|
from_d = date.fromisoformat(from_date)
|
|
to_d = date.fromisoformat(to_date)
|
|
return [e for e in events if from_d <= date.fromisoformat(e["date"]) <= to_d]
|
|
|
|
|
|
def _get_events_from_ics(raw: bytes, from_date: str, to_date: str) -> list[dict]:
|
|
"""Parse ICS bytes and return events in [from_date, to_date]. Wrapper for tests."""
|
|
events = _parse_ics_to_events(raw)
|
|
return _filter_events_by_range(events, from_date, to_date)
|
|
|
|
|
|
def get_calendar_events(
|
|
url: str,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> list[dict]:
|
|
"""Fetch ICS from URL and return events in the given date range.
|
|
|
|
Uses in-memory cache with TTL 7 days. Recurring events are skipped.
|
|
On fetch or parse error returns an empty list.
|
|
|
|
Args:
|
|
url: URL of the ICS calendar.
|
|
from_date: Start date YYYY-MM-DD.
|
|
to_date: End date YYYY-MM-DD.
|
|
|
|
Returns:
|
|
List of dicts with keys "date" (YYYY-MM-DD) and "summary". Empty on error.
|
|
"""
|
|
if not url or from_date > to_date:
|
|
return []
|
|
|
|
now = datetime.now().timestamp()
|
|
raw: bytes | None = None
|
|
if url in _ics_cache:
|
|
cached_at, cached_raw = _ics_cache[url]
|
|
if now - cached_at < CACHE_TTL_SECONDS:
|
|
raw = cached_raw
|
|
if raw is None:
|
|
raw = _fetch_ics(url)
|
|
if raw is None:
|
|
return []
|
|
_ics_cache[url] = (now, raw)
|
|
|
|
# Use parsed events cache to avoid repeated Calendar.from_ical() + walk()
|
|
cache_key = (url,)
|
|
events, found = _parsed_events_cache.get(cache_key)
|
|
if not found:
|
|
events = _parse_ics_to_events(raw)
|
|
_parsed_events_cache.set(cache_key, events)
|
|
return _filter_events_by_range(events, from_date, to_date)
|