#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File: /home/ywatanabe/proj/scitex-code/src/scitex/scholar/core/oa_cache.py
"""
Open Access Sources Cache.
Caches OA journal/source information from OpenAlex API with daily refresh.
Provides fast local lookups without per-paper API calls.
"""
from __future__ import annotations
import asyncio
import json
import os
import time
from pathlib import Path
from typing import Dict, Optional, Set
import aiohttp
import scitex_logging as logging
logger = logging.getLogger(__name__)
# Cache settings
CACHE_TTL_SECONDS = 86400 # 1 day
OPENALEX_OA_SOURCES_URL = "https://api.openalex.org/sources"
OPENALEX_POLITE_EMAIL = "research@scitex.io"
def _get_default_cache_dir() -> Path:
"""Get default cache directory via PathManager (runtime/cache)."""
from scitex_scholar.config import ScholarConfig
return ScholarConfig().path_manager.cache_dir
[docs]
class OASourcesCache:
"""
Manages cached Open Access sources from OpenAlex.
Features:
- Lazy loading on first access
- 1-day TTL with automatic refresh
- Thread-safe singleton pattern
- Fallback to config YAML if API fails
- Journal name normalization via ISSN-L
- Handles abbreviations, variants, and historical names
"""
_instance: Optional["OASourcesCache"] = None
_lock = asyncio.Lock() if hasattr(asyncio, "Lock") else None
[docs]
def __init__(self, cache_dir: Optional[Path] = None):
self._cache_dir = cache_dir or _get_default_cache_dir()
self._cache_file = self._cache_dir / "oa_sources_cache.json"
self._oa_source_ids: Set[str] = set() # OpenAlex source IDs
self._oa_source_names: Set[str] = set() # Lowercase source names
self._oa_issns: Set[str] = set() # ISSNs for journal lookup
self._issn_l_map: Dict[str, str] = {} # ISSN → ISSN-L mapping
self._name_to_issn_l: Dict[str, str] = {} # name variant → ISSN-L
self._issn_l_to_canonical: Dict[str, str] = {} # ISSN-L → canonical name
self._last_updated: float = 0
self._loaded = False
[docs]
@classmethod
def get_instance(cls, cache_dir: Optional[Path] = None) -> "OASourcesCache":
"""Get singleton instance."""
if cls._instance is None:
cls._instance = cls(cache_dir)
return cls._instance
[docs]
def _is_cache_valid(self) -> bool:
"""Check if cache exists and is within TTL."""
if not self._cache_file.exists():
return False
try:
with open(self._cache_file, "r") as f:
data = json.load(f)
cached_time = data.get("timestamp", 0)
return (time.time() - cached_time) < CACHE_TTL_SECONDS
except (json.JSONDecodeError, IOError):
return False
[docs]
def _load_from_cache(self) -> bool:
"""Load cached data from file."""
if not self._cache_file.exists():
return False
try:
with open(self._cache_file, "r") as f:
data = json.load(f)
self._oa_source_names = set(data.get("source_names", []))
self._oa_issns = set(data.get("issns", []))
self._last_updated = data.get("timestamp", 0)
self._loaded = True
logger.info(f"Loaded {len(self._oa_source_names)} OA sources from cache")
return True
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load OA cache: {e}")
return False
[docs]
def _save_to_cache(self) -> None:
"""Save current data to cache file."""
try:
self._cache_dir.mkdir(parents=True, exist_ok=True)
data = {
"timestamp": time.time(),
"source_names": list(self._oa_source_names),
"issns": list(self._oa_issns),
"count": len(self._oa_source_names),
}
with open(self._cache_file, "w") as f:
json.dump(data, f)
logger.info(f"Saved {len(self._oa_source_names)} OA sources to cache")
except IOError as e:
logger.warning(f"Failed to save OA cache: {e}")
[docs]
async def _fetch_oa_sources_async(self, max_pages: int = 100) -> None:
"""
Fetch OA sources from OpenAlex API.
Args:
max_pages: Maximum pages to fetch (200 sources per page)
"""
source_names: Set[str] = set()
issns: Set[str] = set()
per_page = 200
cursor = "*"
pages_fetched = 0
async with aiohttp.ClientSession() as session:
while pages_fetched < max_pages:
url = (
f"{OPENALEX_OA_SOURCES_URL}"
f"?filter=is_oa:true"
f"&per_page={per_page}"
f"&cursor={cursor}"
f"&mailto={OPENALEX_POLITE_EMAIL}"
f"&select=display_name,issn"
)
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status != 200:
logger.warning(f"OpenAlex API returned {resp.status}")
break
data = await resp.json()
results = data.get("results", [])
if not results:
break
for source in results:
name = source.get("display_name", "")
if name:
source_names.add(name.lower())
# Also store ISSNs for precise matching
source_issns = source.get("issn", []) or []
for issn in source_issns:
if issn:
issns.add(issn)
# Get next cursor
meta = data.get("meta", {})
next_cursor = meta.get("next_cursor")
if not next_cursor or next_cursor == cursor:
break
cursor = next_cursor
pages_fetched += 1
# Progress log every 10 pages
if pages_fetched % 10 == 0:
logger.info(
f"Fetched {pages_fetched} pages, {len(source_names)} sources so far..."
)
except asyncio.TimeoutError:
logger.warning("OpenAlex API timeout")
break
except Exception as e:
logger.error(f"Error fetching OA sources: {e}")
break
if source_names:
self._oa_source_names = source_names
self._oa_issns = issns
self._last_updated = time.time()
self._loaded = True
self._save_to_cache()
logger.info(f"Fetched {len(source_names)} OA sources from OpenAlex")
[docs]
def _fetch_oa_sources_sync(self, max_pages: int = 100) -> None:
"""Synchronous wrapper for fetching OA sources."""
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._fetch_oa_sources_async(max_pages))
[docs]
def ensure_loaded(self, force_refresh: bool = False) -> None:
"""
Ensure cache is loaded, fetching from API if needed.
Args:
force_refresh: Force refresh even if cache is valid
"""
if self._loaded and not force_refresh and self._is_cache_valid():
return
# Try loading from cache first
if not force_refresh and self._load_from_cache() and self._is_cache_valid():
return
# Fetch from API
logger.info("Refreshing OA sources cache from OpenAlex...")
self._fetch_oa_sources_sync()
[docs]
def is_oa_source(self, source_name: str) -> bool:
"""
Check if a source/journal name is in the OA list.
Args:
source_name: Journal or source name to check
Returns:
True if source is known to be Open Access
"""
self.ensure_loaded()
if not source_name:
return False
return source_name.lower() in self._oa_source_names
[docs]
def is_oa_issn(self, issn: str) -> bool:
"""
Check if an ISSN belongs to an OA journal.
Args:
issn: ISSN to check
Returns:
True if ISSN belongs to an OA journal
"""
self.ensure_loaded()
if not issn:
return False
# Normalize ISSN format
issn = issn.replace("-", "").upper()
return issn in self._oa_issns or f"{issn[:4]}-{issn[4:]}" in self._oa_issns
@property
def source_count(self) -> int:
"""Get number of cached OA sources."""
self.ensure_loaded()
return len(self._oa_source_names)
@property
def cache_age_hours(self) -> float:
"""Get cache age in hours."""
if self._last_updated == 0:
return float("inf")
return (time.time() - self._last_updated) / 3600
# Convenience functions
[docs]
def get_oa_cache(cache_dir: Optional[Path] = None) -> OASourcesCache:
"""Get the OA sources cache singleton."""
return OASourcesCache.get_instance(cache_dir)
[docs]
def is_oa_journal_cached(journal_name: str) -> bool:
"""Check if journal is OA using cached OpenAlex data."""
return get_oa_cache().is_oa_source(journal_name)
[docs]
def refresh_oa_cache() -> None:
"""Force refresh the OA sources cache."""
get_oa_cache().ensure_loaded(force_refresh=True)
# EOF