saigon.orm
saigon.orm.config
- class saigon.orm.config.BaseDbEnv(var_prefix, credentials_type=<class 'saigon.orm.config.PostgreSQLCredentials'>, secret_vault=None, **kwargs)
Bases:
Environment,GenericManages database environment variables, optionally loading credentials from a concrete SecretVault.
This class extends the Environment class to specifically handle database connection details. It can retrieve credentials from the following methods, and this order:
full json object specified in {var_prefix}_DATABASE_CREDENTIALS
Secret with key specified in {var_prefix}_DATABASE_CREDENTIALS_SECRET and fetch through the specified concrete SecretVault
environment variables (prefixed by var_prefix + _DB_)
- It also defines variables for the following parameters:
= {var_prefix}_DB_SCHEMA: database target schema. Defaults None.
Examples
- Scenario 1: Loading from a JSON variable::
# Assume an environment variable MYAPP_DATABASE_CREDENTIALS is defined with # the following content # json({ # “endpoint”: “db.example.com”, # “port”: 5432, # “database”: “prod_db”, # “username”: “prod_user”, # “password”: “super_secret_password” # })
- class MyAppDbEnv(BaseDbEnv):
MYAPP_DATABASE_CREDENTIALS: str
prod_env = MyAppDbEnv(“MY_APP”) print(prod_env.db_credentials.db_url) # Expected Output: # postgresql+psycopg://prod_user:super_secret_password@db.example.com:5432/prod_db?sslmode=prefe
- Scenario 2: Loading from SecretVault::
# Consider the same credentials json from the previous scenario that is now # stored in a SecretVault, whose secret name is provided through the # environment variable MYAPP_DATABASE_CREDENTIALS_SECRET:
- class MyAppDbEnv(BaseDbEnv):
MYAPP_DATABASE_CREDENTIALS_SECRET: str = “my/database/credentials”
prod_env = MyAppDbEnv(“PROD_APP”, AwsSecretVault()) print(prod_env.db_credentials.db_url) # Expected Output: # postgresql+psycopg://prod_user:super_secret_password@db.example.com:5432/prod_db?sslmode=prefer
Scenario 3: Loading from environment variables:
# Assuming environment variables are set # export MYAPP_DB_ENDPOINT="localhost" # export MYAPP_DB_PORT="5432" # export MYAPP_DB_DATABASE="mydata" # export MYAPP_DB_USERNAME="appuser" # export MYAPP_DB_PASSWORD="secure_password" class MyPostgreSqlEnv(BaseDbEnv): pass db_env = MyPostgreSqlEnv(var_prefix="MYAPP") print(db_env.db_credentials.db_url) # Expected Output: # postgresql+psycopg://appuser:secure_password@localhost:5432/mydata?sslmode=prefer In this scenario, the password can be optionally be supplied as a secret name using the variable MYAPP_DB_PASSWORD_SECRET. In such case, the password value is retrieved via SecretVault
- property db_credentials: CredentialsType
Provides the database credentials as an object of credentials_type.
This property dynamically constructs the DbCredentials object from the environment variables currently loaded into this BaseDbEnv instance.
- Returns:
- An instance of the configured
credentials_type populated with database details.
- Return type:
- get_credentials_from_secret(secret_key)
Retrieves database credentials from the installed SecretVault.
The secret’s value is expected to be a JSON string that can be parsed into the specified credentials_type.
Requires a SecretVault implementation.
- Parameters:
secret_key (str) – The secret key identifier to fetch the credentials from
- Returns:
- An instance of credentials_type populated
from the secret.
- Return type:
- Raises:
Exception – If no SecretVault is set
- class saigon.orm.config.DbCredentials(**data)
Bases:
ABC,BaseModelAbstract base class for database credentials.
Defines the common attributes and abstract methods for various database types, ensuring a consistent interface for accessing connection details.
- endpoint
The database server’s hostname or IP address. Defaults to ‘127.0.0.1’.
- Type:
str
- port
The port number for the database connection. Defaults to 1024.
- Type:
int
- database
The name of the database. Defaults to ‘test-db’.
- Type:
str
- username
The username for database access. Defaults to ‘test-user’.
- Type:
str
- password
The password for the database user. Defaults to ‘test-pass’.
- Type:
str
- abstract property db_url: str
Abstract method to construct the full database connection URL.
This method must be implemented by concrete subclasses to provide the specific URL format for their respective database types.
- Raises:
NotImplementedError – If the method is not implemented by a subclass.
- Returns:
The full database connection URL.
- Return type:
str
- property host_url: str
port) for the database.
- Returns:
The host URL, e.g., “127.0.0.1:1024”.
- Return type:
str
- Type:
Constructs the host URL (endpoint
- class saigon.orm.config.MySQLCredentials(**data)
Bases:
DbCredentialsConcrete implementation of DbCredentials for MySQL databases.
Provides the correct db_url format for MySQL connections.
- port
The default MySQL port. Defaults to 3306.
- Type:
int
Example:
mysql_creds = MySQLSecretCredentials( endpoint="my-mysql-server", database="mysq_db", username="mysqluser", password="mysqlpass" ) print(mysql_creds.db_url) # Expected Output: mysql+mysqlconnector://mysqluser:mysqlpass@my-mysql-server:3306/mysq_db
- property db_url: str
Constructs the MySQL database connection URL.
- Returns:
The full MySQL connection URL.
- Return type:
str
- class saigon.orm.config.PostgreSQLCredentials(**data)
Bases:
DbCredentialsConcrete implementation of DbCredentials for PostgreSQL databases.
Adds PostgreSQL-specific attributes like ssl_mode and provides the correct db_url format for PostgreSQL connections.
- port
The default PostgreSQL port. Defaults to 5432.
- Type:
int
- ssl_mode
The SSL mode for the PostgreSQL connection. Defaults to ‘prefer’.
- Type:
str
Example:
pg_creds = PostgreSQLSecretCredentials( endpoint="my-pg-server", database="mydb", username="pguser", password="pgpass", port=5433, ssl_mode="require" ) print(pg_creds.db_url) # Expected Output: postgresql+psycopg://pguser:pgpass@my-pg-server:5433/mydb?sslmode=require
- property db_url: str
Constructs the PostgreSQL database connection URL.
- Returns:
The full PostgreSQL connection URL, including SSL mode.
- Return type:
str
saigon.orm.connection
- class saigon.orm.connection.AbstractDbManager(db_connector, retries=5, schema=None, reflect=True)
Provides a uniform interface for interacting with a service’s database model.
This abstract class encapsulates common database operations such as transaction management, pagination, and entity retrieval/deletion, working with a DbConnector instance.
- delete_entity(delete_statement)
Executes a SQLAlchemy Delete statement to remove entities from the database.
- Parameters:
delete_statement (sqlalchemy.Delete) – The SQLAlchemy Delete statement to execute.
- get_entity(model_type, select_statement)
Fetches a single entity from the database and converts it to a Pydantic model.
- Parameters:
model_type (Type[ModelTypeDef]) – The Pydantic model type to convert the fetched row into.
select_statement (sqlalchemy.Select) – The SQLAlchemy Select statement to execute, expected to return at most one row.
- Returns:
- An instance of model_type if a row is found,
otherwise None.
- Return type:
Optional[ModelTypeDef]
- classmethod meta()
Returns the SQLAlchemy MetaData object containing reflected database schema.
This is a class method because the MetaData is typically shared across all instances of a DbManager subclass.
- Returns:
The reflected database metadata.
- Return type:
sqlalchemy.MetaData
- paginate(query_selection_type, query_params, build_select, single_row_to_data=None, multirow_to_data=None, **kwargs)
Paginates database queries based on provided parameters and converts results to models.
This method handles the logic for applying limits and offsets, decoding pagination tokens, executing the query, and converting the raw database rows into Pydantic models.
- Parameters:
query_selection_type (Type[QuerySelection]) – The Pydantic model type representing the query selection criteria.
query_params (QueryDataParams[QuerySelection]) – An object containing pagination and query selection parameters.
build_select (Callable[[Optional[QuerySelection]], sqlalchemy.Select]) – A callable that takes an optional QuerySelection object and returns a SQLAlchemy Select statement. This function defines the base query.
single_row_to_data (Optional[Callable[[RowMapping, ...], ModelType]]) – A callable that converts a single RowMapping (from SQLAlchemy) into an instance of ModelType. Required if multirow_to_data is None.
multirow_to_data (Optional[Callable[[Sequence[Row], ...], List[ModelType]]]) – A callable that converts a sequence of Row objects into a list of ModelType instances. Required if single_row_to_data is None.
**kwargs – Additional keyword arguments to pass to the single_row_to_data or multirow_to_data conversion functions.
- Returns:
- An object containing the fetched data
(list of ModelType instances) and an updated pagination token (if more data is available).
- Return type:
QueryDataResult[ModelType]
- Raises:
ValueError – If neither single_row_to_data nor multirow_to_data is provided.
Example:
Consider a User model and a UserQuery for selection:
from pydantic import BaseModel from sqlalchemy import Table, Column, Integer, String, MetaData, select # Assume 'users_table' is reflected via manager.meta() metadata = MetaData() users_table = Table( "users", metadata, Column("id", Integer, primary_key=True), Column("name", String), Column("email", String) ) class User(BaseModel): id: int name: str email: str class UserQuery(BaseModel): name_starts_with: Optional[str] = None def build_user_select(query_selection: Optional[UserQuery]) -> sqlalchemy.Select: stmt = select(users_table) if query_selection and query_selection.name_starts_with: stmt = stmt.where(users_table.c.name.startswith( query_selection.name_starts_with) ) return stmt def row_to_user_model(row_mapping: RowMapping) -> User: return User( id=row_mapping['id'], name=row_mapping['name'], email=row_mapping['email'] ) # Assuming db_manager is an instance of AbstractDbManager # db_manager = MyDbManager(db_connector=DbConnector(credentials)) # Query for users with name starting with 'J', limit 2 query_params = QueryDataParams[UserQuery]( query_selection=UserQuery(name_starts_with="J"), max_count=2 ) result = db_manager.paginate( query_selection_type=UserQuery, query_params=query_params, build_select=build_user_select, single_row_to_data=row_to_user_model ) print(f"Fetched users: {[u.name for u in result.data]}") if result.pagination_token: print(f"Next token available: {result.pagination_token.query_id}") # To fetch next page: # next_query_params = QueryDataParams[UserQuery]( # pagination_token=result.pagination_token, # max_count=2 # ) # next_result = db_manager.paginate( # query_selection_type=UserQuery, # query_params=next_query_params, # build_select=build_user_select, # single_row_to_data=row_to_user_model # ) # print(f"Fetched next users: {[u.name for u in next_result.data]}")
- transaction()
Returns a context manager for managing a database transaction.
Usage with with self.transaction(): ensures that all database operations within the block run within a single transaction.
- Returns:
- A context manager that yields a SQLAlchemy
connection for transactional operations.
- Return type:
AbstractContextManager
- class saigon.orm.connection.DbConnector(credentials, **kwargs)
Provides a thin wrapper around a SQLAlchemy engine for database interactions.
Manages the SQLAlchemy engine and provides methods for executing queries, fetching results, and reflecting database metadata. It also integrates with a context variable for managing transactional connections.
- property connection: Connection | None
Retrieves the current database connection from the context variable.
This property allows access to a connection that might be bound to a transaction via transaction_context or transactional decorator.
- Returns:
The current connection if set, otherwise None.
- Return type:
Optional[sqlalchemy.Connection]
- property engine: Engine
Obtains a reference to the underlying SQLAlchemy engine.
- Returns:
The SQLAlchemy engine instance.
- Return type:
sqlalchemy.engine.Engine
- execute(obj, **kwargs)
Executes the given SQLAlchemy callable object or literal SQL statement.
This method will acquire a connection from the pool, execute the given statement, and return the result. If a connection is already bound to the current context (e.g., by a transaction), that connection will be used.
- Parameters:
obj (sqlalchemy.Executable) – Statement to execute. See SQLAlchemy’s docs for the full list of supported types. For literal statements, prepare this value with sqlalchemy.text(‘SELECT … FROM’).
**kwargs – Keyword arguments for the execution, such as parameters (Union[Dict, Iterable]) for bind parameter values.
- Returns:
The statement result.
- Return type:
sqlalchemy.ResultProxy
- Raises:
DbExecutionError – All SQLAlchemy exceptions are caught and re-raised as DbExecutionError.
- fetch_all(selectable, **kwargs)
Executes the given SQLAlchemy selectable and returns all rows.
An empty list is returned if no rows match the selection.
- Parameters:
selectable (sqlalchemy.Executable) – Any object considered “selectable” by SQLAlchemy (e.g., a sqlalchemy.Select statement).
**kwargs – Additional keyword arguments to pass to the execute method, such as parameters for bind values.
- Returns:
- A sequence of all rows from the
result set.
- Return type:
Sequence[sqlalchemy.engine.result.Row]
- fetch_one(selectable, **kwargs)
Executes the given SQLAlchemy selectable and returns the first row.
- Parameters:
selectable (sqlalchemy.Executable) – Any object considered “selectable” by SQLAlchemy (e.g., a sqlalchemy.Select statement).
**kwargs – Additional keyword arguments to pass to the execute method, such as parameters for bind values.
- Returns:
- The first row of the result set,
or None if no rows are found.
- Return type:
Optional[sqlalchemy.engine.result.Row]
- reflect(retries, schema=None)
Reflects all database objects (tables, etc.) into a SQLAlchemy MetaData object.
This method is typically called at service startup. It includes a retry mechanism to handle transient connection issues or timing problems between service and database startup.
- Parameters:
retries (int) – The number of retries before raising an exception. Each retry uses an exponential back-off.
schema (Optional[str]) – Optional target schema. Defaults to None.
- Returns:
- A MetaData object containing the reflected
database schema.
- Return type:
sqlalchemy.MetaData
- Raises:
DbExecutionError – If reflection fails after all retries.
- refresh_engine(**kwargs)
Refreshes the database connection by re-creating the SQLAlchemy engine.
This method is useful for scenarios where connection parameters might change or to explicitly dispose of old connections. It relies on Python’s garbage collection to close previous engine’s connections.
- Raises:
DbExecutionError – If there’s an error during engine creation.
- Return type:
None
- exception saigon.orm.connection.DbExecutionError(*args)
Custom exception raised for database execution errors.
This exception wraps underlying SQLAlchemy exceptions to provide a consistent error handling mechanism within the application.
- saigon.orm.connection.transactional(func)
A decorator that ensures a method executes within a database transaction.
If the decorated method is called and a database connection is already bound to the current context (meaning it’s already within a transaction), the method will use that existing connection. Otherwise, it will create a new transaction context using transaction_context for the duration of the method’s execution.
- Parameters:
func (Callable) – The method to be decorated. This method is expected to be an instance method of a class that inherits from AbstractDbManager, and its first argument should be self (the manager instance).
- Returns:
The wrapped function, which now executes within a transaction.
- Return type:
Callable
Example:
class MyManager(AbstractDbManager): def __init__(self, db_connector: DbConnector): super().__init__(db_connector) # Assume 'users_table' is reflected and available via self.meta() @transactional def add_user_and_log(self, user_name: str, log_message: str): # Both operations will be part of the same transaction insert_stmt = sqlalchemy.text( "INSERT INTO users (name) VALUES (:name)" ).bindparams(name=user_name) self.db_connector.execute(insert_stmt) log_stmt = sqlalchemy.text( "INSERT INTO logs (message) VALUES (:message)" ).bindparams(message=log_message) self.db_connector.execute(log_stmt) print(f"User '{user_name}' added and log '{log_message}' recorded.") # Usage: # db_connector = DbConnector(credentials) # manager = MyManager(db_connector) # manager.add_user_and_log("Bob", "New user registered") # If any error occurs during add_user_and_log, both inserts are rolled back.
saigon.orm.model
- saigon.orm.model.filter_unknown_model_fields(model_type, model_data)
Filters a dictionary to include only keys that are fields in a given Pydantic model.
Also excludes values that are None. This is useful when preparing data from a database row or an external source to be validated against a Pydantic model, preventing errors from unexpected fields.
- Parameters:
model_type (Type[ModelType]) – The Pydantic model class to filter against.
model_data (Mapping[str, Any]) – A dictionary of data, typically from a database row or similar source.
- Returns:
- A new dictionary containing only the keys that
match the model_type’s fields and have non-None values.
- Return type:
Mapping[str, Any]
Example:
from pydantic import BaseModel class User(BaseModel): id: int name: str email: str age: int | None = None data = { "id": 1, "name": "Alice", "email": "alice@example.com", "unknown_field": "some_value", "age": None } filtered_data = filter_unknown_model_fields(User, data) print(filtered_data) # Expected output: {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'}
- saigon.orm.model.model_data_to_row_values(model_data, include=None, exclude=None, exclude_unset=True, exclude_none=True, **extra)
Converts a Pydantic model instance into a dictionary suitable for database row insertion/update.
Serializes the model’s data into a dictionary, with all values converted to strings, except for dictionaries, booleans, and lists which are kept as their native Python types (as they might be JSONB fields or similar). Allows for inclusion/exclusion of specific fields and adds extra key-value pairs.
- Parameters:
model_data (ModelType) – An instance of a Pydantic BaseModel.
include (Set[str], optional) – A set of field names to include. If None, all are included.
exclude (Set[str], optional) – A set of field names to exclude. If None, none are excluded.
exclude_unset (bool) – If True, fields that were not explicitly set on the model (even if they have a default value) are excluded. Defaults to True.
exclude_none (bool) – If True, fields whose value is None are excluded. Defaults to True.
**extra (Any) – Additional keyword arguments to include in the resulting dictionary.
- Returns:
- A dictionary representing the model data, with values
converted to strings (or kept as native types for dict, bool, list), ready for database operations.
- Return type:
Dict[str, str]
Example:
from pydantic import BaseModel from typing import Dict, List, Optional class Product(BaseModel): id: int name: str price: float metadata: Dict[str, Any] tags: List[str] description: Optional[str] = None product_instance = Product( id=101, name="Laptop", price=1200.50, metadata={"weight_kg": 2.5, "color": "silver"}, tags=["electronics", "portable"] ) row_values = model_data_to_row_values(product_instance, extra={"created_by": "system"}) print(row_values) # Expected output (order might vary): # { # 'id': '101', 'name': 'Laptop', 'price': '1200.5', # 'metadata': {'weight_kg': 2.5, 'color': 'silver'}, # 'tags': ['electronics', 'portable'], 'created_by': 'system' # } # Example with exclude_unset (if description was not passed in __init__) product_instance_2 = Product(id=102, name="Monitor", price=300.0, metadata={}, tags=[]) row_values_unset_excluded = model_data_to_row_values(product_instance_2, exclude_unset=True) print(row_values_unset_excluded) # Expected: {'id': '102', 'name': 'Monitor', 'price': '300.0', 'metadata': {}, 'tags': []} # 'description' is not included because it was not explicitly set and exclude_unset is True
- saigon.orm.model.row_mapping_to_model_data(model_type, row_mapping, **kwargs)
Converts a SQLAlchemy RowMapping object into an instance of a Pydantic model.
It first filters out any keys in the row_mapping that are not defined as fields in the model_type and removes None values, then instantiates the Pydantic model.
- Parameters:
model_type (Type[ModelType]) – The Pydantic model class to convert the row mapping into.
row_mapping (sqlalchemy.RowMapping) – A SQLAlchemy RowMapping object, which is a dictionary-like view of a database row.
**kwargs (Any) – Additional keyword arguments to pass directly to the Pydantic model’s constructor, which will override values from row_mapping if there are key conflicts.
- Returns:
- An instance of the specified Pydantic model populated with
data from the row_mapping and kwargs.
- Return type:
ModelType
Example:
from pydantic import BaseModel import sqlalchemy class User(BaseModel): id: int name: str email: str # Simulate a SQLAlchemy RowMapping # In a real scenario, this would come from db_connector.fetch_one(...)._mapping mock_row_mapping = sqlalchemy.RowMapping({ "id": 1, "name": "Charlie", "email": "charlie@example.com", "created_at": "2023-01-01" # Extra field not in User model }) user_model = row_mapping_to_model_data(User, mock_row_mapping) print(user_model) # Expected output: id=1 name='Charlie' email='charlie@example.com' # Example with overriding kwargs user_model_override = row_mapping_to_model_data(User, mock_row_mapping, name="Charles") print(user_model_override) # Expected output: id=1 name='Charles' email='charlie@example.com'