-
Notifications
You must be signed in to change notification settings - Fork 12
Unified data cache layer #1323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Unified data cache layer #1323
Conversation
Reviewer's GuideThis PR introduces a complete unified async data cache layer replacing legacy apicache/imagecache modules. It adds a storage backend with TTL, metadata, queueing and maintenance; a high-level HTTP client with rate limiting, caching and retries; provider interfaces for MusicBrainz, images and generic APIs; token-bucket rate limiting utilities; background workers for queued requests; migration utilities for existing caches; plus comprehensive unit and integration tests. Sequence diagram for immediate cache fetch and fallback to HTTP fetchsequenceDiagram
participant Plugin
participant Providers
participant Client
participant Storage
participant HTTP
Plugin->>Providers: cache_artist_thumbnail(url, ... immediate=True)
Providers->>Client: get_or_fetch(url, ... immediate=True)
Client->>Storage: retrieve_by_url(url)
Storage-->>Client: cache miss (None)
Client->>HTTP: fetch(url)
HTTP-->>Client: data
Client->>Storage: store(url, ...)
Storage-->>Client: success
Client-->>Providers: (data, metadata)
Providers-->>Plugin: (data, metadata)
Entity relationship diagram for unified datacache tableserDiagram
CACHED_DATA {
TEXT url PK
TEXT cache_key
TEXT identifier
TEXT data_type
TEXT provider
BLOB data_value
TEXT metadata
INTEGER created_at
INTEGER expires_at
INTEGER access_count
INTEGER last_accessed
INTEGER data_size
}
PENDING_REQUESTS {
TEXT request_id PK
TEXT provider
TEXT request_key
TEXT params
INTEGER priority
INTEGER created_at
INTEGER attempts
INTEGER last_attempt
TEXT status
}
CACHED_DATA ||--o{ PENDING_REQUESTS : "related via provider"
Class diagram for unified datacache core typesclassDiagram
class DataStorage {
+database_path: Path
+initialize()
+store(url, identifier, data_type, provider, data_value, ttl_seconds, metadata)
+retrieve_by_url(url)
+retrieve_by_identifier(identifier, data_type, provider, random)
+get_cache_keys_for_identifier(identifier, data_type, provider)
+queue_request(provider, request_key, params, priority)
+get_next_request(provider)
+complete_request(request_id, success)
+cleanup_expired()
+maintenance()
+vacuum()
+close()
}
class DataCacheClient {
+storage: DataStorage
+rate_limiters: RateLimiterManager
+get_or_fetch(url, identifier, data_type, provider, ...)
+get_random_image(identifier, data_type, provider)
+get_cache_keys_for_identifier(identifier, data_type, provider)
+queue_url_fetch(url, identifier, data_type, provider, ...)
+process_queue(provider)
+close()
}
class RateLimiter {
+provider: str
+acquire(timeout)
+available_tokens()
+time_until_token()
}
class RateLimiterManager {
+get_limiter(provider)
}
class MusicBrainzProvider {
+search_artists(query, limit, immediate)
+get_artist(artist_id, includes, immediate)
+search_recordings(query, limit, immediate)
}
class ImageProvider {
+cache_artist_thumbnail(url, artist_identifier, provider, immediate, metadata)
+cache_artist_logo(url, artist_identifier, provider, immediate, metadata)
+cache_artist_banner(url, artist_identifier, provider, immediate, metadata)
+cache_artist_fanart(url, artist_identifier, provider, immediate, metadata)
+get_random_image(artist_identifier, image_type, provider)
+get_cache_keys_for_identifier(artist_identifier, image_type, provider)
}
class APIProvider {
+cache_api_response(url, identifier, data_type, provider, ...)
+cache_artist_bio(url, artist_identifier, provider, language, immediate, metadata)
}
class DataCacheProviders {
+musicbrainz: MusicBrainzProvider
+images: ImageProvider
+api: APIProvider
+process_queue(provider)
+initialize()
+close()
}
class DataCacheWorker {
+client: DataCacheClient
+start()
+shutdown()
}
class DataCacheWorkerManager {
+workers: list[DataCacheWorker]
+start()
+shutdown()
+get_stats()
}
DataCacheClient --> DataStorage
DataCacheClient --> RateLimiterManager
DataCacheProviders --> MusicBrainzProvider
DataCacheProviders --> ImageProvider
DataCacheProviders --> APIProvider
DataCacheWorker --> DataCacheClient
DataCacheWorkerManager --> DataCacheWorker
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
Blocking issues:
- Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option. (link)
General comments:
- Consider reusing SQLite connections or introducing a simple connection-pool in DataStorage to avoid opening/closing a new DB connection on every store/retrieve, which can reduce overhead and improve performance under load.
- Most methods in storage and client catch broad Exception; narrowing exception handling to specific error types can prevent silent masking of unexpected bugs and aid debugging.
- The RateLimiter uses time.time() for token refill and timeouts—switching to a monotonic clock (e.g. time.monotonic()) would avoid issues if system time jumps unexpectedly.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider reusing SQLite connections or introducing a simple connection-pool in DataStorage to avoid opening/closing a new DB connection on every store/retrieve, which can reduce overhead and improve performance under load.
- Most methods in storage and client catch broad Exception; narrowing exception handling to specific error types can prevent silent masking of unexpected bugs and aid debugging.
- The RateLimiter uses time.time() for token refill and timeouts—switching to a monotonic clock (e.g. time.monotonic()) would avoid issues if system time jumps unexpectedly.
## Individual Comments
### Comment 1
<location> `tests/datacache/test_queue.py:15-24` </location>
<code_context>
+ await client.close()
+
+
+@pytest.mark.asyncio
+async def test_client_initialization(temp_client):
+ """Test client initializes properly"""
</code_context>
<issue_to_address>
**suggestion (testing):** Rate limiter timeout is tested, but edge case for negative rate is missing.
Add a test to verify that RateLimiter with a negative or zero rate does not allow token acquisition and fails gracefully.
</issue_to_address>
### Comment 2
<location> `nowplaying/datacache/storage.py:266-273` </location>
<code_context>
await connection.execute(
f"""
UPDATE cached_data
SET access_count = access_count + 1, last_accessed = ?
WHERE url IN ({placeholders})
""",
[now] + urls,
)
</code_context>
<issue_to_address>
**security (python.sqlalchemy.security.sqlalchemy-execute-raw-query):** Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.
*Source: opengrep*
</issue_to_address>
### Comment 3
<location> `tests/datacache/test_queue.py:168` </location>
<code_context>
successful = sum(1 for result in results if result)
</code_context>
<issue_to_address>
**suggestion (code-quality):** Simplify constant sum() call ([`simplify-constant-sum`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/simplify-constant-sum))
```suggestion
successful = sum(bool(result)
```
<br/><details><summary>Explanation</summary>As `sum` add the values it treats `True` as `1`, and `False` as `0`. We make use
of this fact to simplify the generator expression inside the `sum` call.
</details>
</issue_to_address>
### Comment 4
<location> `nowplaying/datacache/client.py:130` </location>
<code_context>
async def _fetch_and_store( # pylint: disable=too-many-arguments,too-many-locals,too-many-branches
self,
url: str,
identifier: str,
data_type: str,
provider: str,
timeout: float,
retries: int,
ttl_seconds: int | None,
metadata: dict | None,
) -> tuple[Any, dict] | None:
"""
Fetch data from URL and store in cache.
Handles rate limiting and retries automatically.
"""
# Apply rate limiting
rate_limiter = self.rate_limiters.get_limiter(provider)
await rate_limiter.acquire()
if not self._session:
raise RuntimeError("DataCacheClient not initialized - call initialize() first")
# Default TTL based on provider and data type
if ttl_seconds is None:
ttl_seconds = self._get_default_ttl(provider, data_type)
# Fetch with retries
for attempt in range(retries + 1):
try:
# Type checker: self._session is guaranteed non-None after the check above
async with self._session.get( # pylint: disable=not-async-context-manager
url, timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
if response.status == 200:
# Determine data format based on content type
content_type = response.headers.get("content-type", "").lower()
if content_type.startswith("application/json"):
data = await response.json()
elif content_type.startswith(("image/", "audio/", "video/")):
data = await response.read() # Binary data
else:
data = await response.text() # Text data
# Store in cache
success = await self.storage.store(
url=url,
identifier=identifier,
data_type=data_type,
provider=provider,
data_value=data,
ttl_seconds=ttl_seconds,
metadata=metadata,
)
if success:
logging.debug("Cached data from URL: %s", url)
else:
logging.warning("Failed to cache data from URL: %s", url)
return data, metadata or {}
if response.status == 429:
# Rate limit - wait and retry
retry_after = int(response.headers.get("Retry-After", 60))
logging.warning(
"Rate limited by %s, waiting %d seconds", provider, retry_after
)
await asyncio.sleep(retry_after)
continue
logging.warning("HTTP %d error fetching %s", response.status, url)
if attempt < retries:
wait_time = (2**attempt) + (
time.time() % 1
) # Exponential backoff with jitter
await asyncio.sleep(wait_time)
continue
return None
except asyncio.TimeoutError:
logging.warning(
"Timeout fetching URL (attempt %d/%d): %s", attempt + 1, retries + 1, url
)
if attempt < retries:
wait_time = (2**attempt) + (time.time() % 1)
await asyncio.sleep(wait_time)
continue
return None
except Exception as error: # pylint: disable=broad-except
logging.error(
"Error fetching URL (attempt %d/%d): %s - %s",
attempt + 1,
retries + 1,
url,
error,
)
if attempt < retries:
wait_time = (2**attempt) + (time.time() % 1)
await asyncio.sleep(wait_time)
continue
return None
return None
</code_context>
<issue_to_address>
**issue (code-quality):** Low code quality found in DataCacheClient.\_fetch\_and\_store - 19% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
### Comment 5
<location> `nowplaying/datacache/storage.py:594` </location>
<code_context>
def run_datacache_maintenance(cache_dir: Path | None = None) -> dict[str, int]:
"""
Run datacache maintenance at system startup (sync version).
Args:
cache_dir: Optional custom cache directory. If None, uses Qt standard cache location.
Returns:
Dictionary with maintenance statistics
"""
database_path = get_datacache_path(cache_dir)
stats = {"expired_cleaned": 0, "requests_cleaned": 0, "vacuum_performed": False, "errors": 0}
try:
# Ensure database exists with proper schema
_ensure_datacache_schema(database_path)
# Clean up expired entries and old requests
with sqlite3.connect(str(database_path), timeout=30.0) as conn:
now = int(time.time())
# Clean expired cache data
cursor = conn.execute("DELETE FROM cached_data WHERE expires_at <= ?", (now,))
stats["expired_cleaned"] = cursor.rowcount
# Clean completed/failed requests older than 24 hours
one_day_ago = now - (24 * 3600)
cursor = conn.execute(
"DELETE FROM pending_requests WHERE status IN ('completed', 'failed') "
"AND created_at <= ?",
(one_day_ago,),
)
stats["requests_cleaned"] = cursor.rowcount
conn.commit()
if stats["expired_cleaned"] > 0:
logging.info("Cleaned up %d expired datacache entries", stats["expired_cleaned"])
if stats["requests_cleaned"] > 0:
logging.info("Cleaned up %d old request records", stats["requests_cleaned"])
# Vacuum database to reclaim space
conn.execute("VACUUM")
stats["vacuum_performed"] = True
except Exception as error: # pylint: disable=broad-exception-caught
logging.error("Datacache maintenance failed: %s", error)
stats["errors"] += 1
return stats
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 6
<location> `tests/datacache/test_storage.py:70` </location>
<code_context>
def test_maintenance_cleanup_expired():
"""Test that maintenance cleans up expired entries"""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
db_path = temp_path / "datacache.sqlite"
# Create database with expired entry
with sqlite3.connect(str(db_path)) as conn:
# Create schema
conn.execute("""
CREATE TABLE cached_data (
url TEXT PRIMARY KEY,
cache_key TEXT NOT NULL,
identifier TEXT NOT NULL,
data_type TEXT NOT NULL,
provider TEXT NOT NULL,
data_value BLOB,
metadata TEXT,
created_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL,
access_count INTEGER DEFAULT 1,
last_accessed INTEGER NOT NULL,
data_size INTEGER NOT NULL
)
""")
conn.execute("""
CREATE TABLE pending_requests (
request_id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
request_key TEXT NOT NULL,
params TEXT NOT NULL,
priority INTEGER NOT NULL,
created_at INTEGER NOT NULL,
attempts INTEGER DEFAULT 0,
last_attempt INTEGER DEFAULT 0,
status TEXT DEFAULT 'pending'
)
""")
now = int(time.time())
# Add expired entry
conn.execute(
"""
INSERT INTO cached_data
(url, cache_key, identifier, data_type, provider, data_value, metadata,
created_at, expires_at, last_accessed, data_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"https://example.com/expired.jpg",
"test_thumbnail_test_expired",
"test",
"thumbnail",
"test",
b"expired_data",
"{}",
now - 3600,
now - 1800, # Expired 30 minutes ago
now - 3600,
len(b"expired_data"),
),
)
# Add valid entry
conn.execute(
"""
INSERT INTO cached_data
(url, cache_key, identifier, data_type, provider, data_value, metadata,
created_at, expires_at, last_accessed, data_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"https://example.com/valid.jpg",
"test_thumbnail_test_valid",
"test",
"thumbnail",
"test",
b"valid_data",
"{}",
now,
now + 3600, # Expires in 1 hour
now,
len(b"valid_data"),
),
)
conn.commit()
# Run maintenance
stats = nowplaying.datacache.storage.run_datacache_maintenance(temp_path)
# Should have cleaned up 1 expired entry
assert stats["expired_cleaned"] == 1
# Verify cleanup
with sqlite3.connect(str(db_path)) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM cached_data")
count = cursor.fetchone()[0]
assert count == 1 # Only valid entry remains
cursor = conn.execute("SELECT url FROM cached_data")
url = cursor.fetchone()[0]
assert url == "https://example.com/valid.jpg"
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
- Extract duplicate code into function ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>
### Comment 7
<location> `tests/datacache/test_storage.py:558` </location>
<code_context>
@pytest.mark.asyncio
async def test_concurrent_access():
"""Test concurrent access to storage (multiprocess simulation)"""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create multiple storage instances (simulates multiple processes)
storages = []
for i in range(3):
storage = nowplaying.datacache.storage.DataStorage(temp_path)
await storage.initialize()
storages.append(storage)
try:
# Concurrent operations
tasks = []
for i, storage in enumerate(storages):
# Each "process" stores different data
task = storage.store(
url=f"https://example.com/concurrent_{i}.jpg",
identifier=f"artist_{i}",
data_type="thumbnail",
provider="test",
data_value=f"data_{i}".encode(),
ttl_seconds=3600,
)
tasks.append(task)
# Wait for all operations
results = await asyncio.gather(*tasks)
assert all(results) # All should succeed
# Verify all data is stored correctly
for i, storage in enumerate(storages):
result = await storage.retrieve_by_url(f"https://example.com/concurrent_{i}.jpg")
assert result is not None
data, metadata = result
assert data == f"data_{i}".encode()
finally:
# Cleanup
for storage in storages:
await storage.close()
</code_context>
<issue_to_address>
**suggestion (code-quality):** Replace unused for index with underscore ([`for-index-underscore`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/for-index-underscore/))
```suggestion
for _ in range(3):
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| @pytest.mark.asyncio | ||
| async def test_rate_limiter_basic_functionality(): | ||
| """Test basic rate limiter token acquisition""" | ||
| limiter = nowplaying.datacache.queue.RateLimiter("test", requests_per_second=2.0) | ||
|
|
||
| # Should have tokens available initially | ||
| assert limiter.available_tokens() > 0 | ||
|
|
||
| # Should be able to acquire token immediately | ||
| success = await limiter.acquire(timeout=1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Rate limiter timeout is tested, but edge case for negative rate is missing.
Add a test to verify that RateLimiter with a negative or zero rate does not allow token acquisition and fails gracefully.
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (56.08%) is below the target coverage (70.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #1323 +/- ##
==========================================
- Coverage 61.72% 61.42% -0.30%
==========================================
Files 101 108 +7
Lines 15280 16134 +854
==========================================
+ Hits 9431 9911 +480
- Misses 5849 6223 +374
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Summary by Sourcery
Implement a unified asynchronous data caching system under nowplaying.datacache, including storage, HTTP client with rate limiting, provider interfaces, background workers, migration utilities, and integrated maintenance calls, accompanied by extensive tests and documentation.
New Features:
Enhancements:
Documentation:
Tests: