Source code for pynenc.client_data_store.sqlite_client_data_store
"""
SQLite-based client data store for cross-process testing.
Provides a SQLite-backed implementation that enables true cross-process
data sharing for testing process runners.
Key components:
- SQLiteClientDataStore: SQLite-backed client data store
.. warning::
Designed for testing purposes only. Not recommended for production.
"""
import sqlite3
from functools import cached_property
from typing import TYPE_CHECKING
from pynenc.client_data_store.base_client_data_store import BaseClientDataStore
from pynenc.conf.config_client_data_store import ConfigClientDataStoreSQLite
from pynenc.util.sqlite_utils import (
delete_tables_with_prefix,
get_sqlite_sqlite_db_path,
)
if TYPE_CHECKING:
from pynenc.app import Pynenc
[docs]
class Tables:
"""Table names for client data store."""
STORE = "client_data"
[docs]
class SQLiteClientDataStore(BaseClientDataStore):
"""
SQLite-backed client data store for cross-process testing.
Uses SQLite to allow multiple processes to share cached client data.
Implements the three abstract methods from BaseClientDataStore.
.. warning::
Designed for testing purposes only. Uses temporary SQLite files.
"""
def __init__(self, app: "Pynenc") -> None:
super().__init__(app)
self.sqlite_db_path = get_sqlite_sqlite_db_path(self.conf.sqlite_db_path)
self._init_tables()
[docs]
def _init_tables(self) -> None:
"""Create the client_data table if it doesn't exist."""
with sqlite3.connect(self.sqlite_db_path) as conn:
conn.execute(f"""
CREATE TABLE IF NOT EXISTS {Tables.STORE} (
data_key TEXT PRIMARY KEY,
data_value BLOB NOT NULL,
created_at REAL NOT NULL DEFAULT (julianday('now'))
)
""")
conn.execute(
f"CREATE INDEX IF NOT EXISTS idx_client_data_created "
f"ON {Tables.STORE}(created_at)"
)
conn.commit()
@cached_property
def conf(self) -> ConfigClientDataStoreSQLite:
"""Get SQLite-specific configuration."""
return ConfigClientDataStoreSQLite(
config_values=self.app.config_values,
config_filepath=self.app.config_filepath,
)
[docs]
def _store(self, key: str, value: str) -> None:
"""Store a key-value pair in SQLite."""
with sqlite3.connect(self.sqlite_db_path) as conn:
conn.execute(
f"INSERT OR REPLACE INTO {Tables.STORE} "
f"(data_key, data_value) VALUES (?, ?)",
(key, value.encode("utf-8")),
)
conn.commit()
[docs]
def _retrieve(self, key: str) -> str:
"""
Retrieve a stored value by key.
:param str key: The reference key
:return: The stored serialized string
:raises KeyError: If key not found
"""
with sqlite3.connect(self.sqlite_db_path) as conn:
cursor = conn.execute(
f"SELECT data_value FROM {Tables.STORE} WHERE data_key = ?",
(key,),
)
if row := cursor.fetchone():
return row[0].decode("utf-8")
raise KeyError(f"Key {key} not found in client data store")
[docs]
def _purge(self) -> None:
"""Clear all stored client data."""
delete_tables_with_prefix(self.sqlite_db_path, "client_")
self._init_tables()