flow

rest

class saigon.aws.flow.rest.FetchResourceHandler(fetch_resource, build_query, dump_to_file=True)

Bases: EventHandler, Generic

on_event(input_data)

An abstract context manager method for handling events.

This method defines the core logic for an event handler, providing a context for processing input data and yielding output data.

Parameters:

input_data (TypeVar(InputData)) – The input data to be processed by the event handler.

Yields:

OutputData – The output data generated by the event handler.

Return type:

Generator[TypeVar(Result, bound= Union[Path, List[BaseModel]]), None, None]

s3

class saigon.aws.flow.s3.S3WriterHandler

Bases: EventHandler[WriterHandlerInput, None]

on_event(input_data)

An abstract context manager method for handling events.

This method defines the core logic for an event handler, providing a context for processing input data and yielding output data.

Parameters:

input_data (WriterHandlerInput) – The input data to be processed by the event handler.

Yields:

OutputData – The output data generated by the event handler.

class saigon.aws.flow.s3.WriterHandlerInput(**data)

Bases: BaseModel

sqs

class saigon.aws.flow.sqs.SqlStatementBuilder(*args, **kwargs)

Bases: Protocol, Generic

A protocol defining the interface for building SQLAlchemy SQL statements and extracting parameters from a message.

This protocol ensures that any class implementing it can prepare an executable SQLAlchemy statement based on a message type and then extract the necessary parameters from a specific message instance for that statement.

TypeVar ModelTypeDef is bound to saigon.model.ModelTypeDef, implying that the messages processed by this builder are Pydantic-like models.

get_statement_params(message)

Extracts parameters from a message for use with a prepared SQL statement.

This method takes an instance of a message and transforms it into a format suitable for parameter binding in a SQLAlchemy statement (e.g., a dictionary for INSERT or UPDATE statements, or a list of dictionaries for INSERT with multiple rows).

Parameters:

message (ModelTypeDef) – An instance of the Pydantic model representing an SQS message.

Returns:

A dictionary or an iterable of dictionaries

containing the parameters to be bound to the SQL statement.

Return type:

Union[Dict, Iterable]

prepare(message_type)

Prepares an executable SQLAlchemy statement for a given message type.

This method should define the SQL operation (e.g., INSERT, UPDATE) and specify how it interacts with the database schema, often based on the structure of message_type. The returned sqlalchemy.Executable object can then be executed by a SQLAlchemy connection.

Parameters:

message_type (Type[ModelTypeDef]) – The Pydantic model type representing the structure of the SQS message.

Returns:

An executable SQLAlchemy statement object.

Return type:

sqlalchemy.Executable

class saigon.aws.flow.sqs.SqsToRdsForwarder(message_type, sqs_queue_url, db_connector, sql_statement_builder)

Bases: Generic

A generic class to forward messages from an SQS queue to an RDS database.

This class continuously polls an SQS queue for messages of a specific type, deserializes them, and then uses a SqlStatementBuilder to convert them into SQL statements that are executed against an RDS database via a DbConnector.

TypeVar ModelTypeDef is bound to saigon.model.ModelTypeDef.

forward(**kwargs)

Receives messages from the SQS queue and forwards them to RDS.

This method performs a receive_message call to SQS. For each message received, it calls forward_message to process it. Additional keyword arguments can be passed to customize the SQS receive_message call (e.g., MaxNumberOfMessages, WaitTimeSeconds).

Parameters:

**kwargs – Arbitrary keyword arguments to be passed directly to the SQSClient.receive_message method (e.g., MaxNumberOfMessages, VisibilityTimeout, WaitTimeSeconds, AttributeNames).

forward_message(message_body_json)

Processes a single SQS message by deserializing it and inserting it into RDS.

This method is responsible for taking the raw JSON body of an SQS message, validating it against the message_type, extracting parameters using the sql_statement_builder, and then executing the prepared SQL statement with those parameters against the database.

Parameters:

message_body_json (str) – The JSON string content of the SQS message body.