August 5, 2021

Auto Initialize SQLAlchemy Models from A Python Dictionary

As an avid FastAPI and Pydantic user, I’ve stumbled into a problem that is common with any web framework. How do I turn JSON into a dictionary and then into an entry in my SQL Database? Initially, I worked through this problem mostly manually by defining that __init__ method on my SQLAlchemy Models to take in an unpacked dictionary and assign the corresponding values. Something like the code below.

class ShoppingListItem(SqlAlchemyBase, BaseMixins):
    __tablename__ = "shopping_list_items"
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("shopping_lists.id"))
    position = Column(Integer, nullable=False)

    title: Column(String)
    text = Column(String)
    quantity = Column(Integer)
    checked = Column(Boolean)

    def __init__(self, title, text, quantity, checked, **_) -> None:
        self.title: title
        self.text = text
        self.quantity = quantity
        self.checked = checked

While this works, It’s more verbose than I’d like to be. Defining the __init__ the method in each class is cumbersome and prone to errors. After some experimenting, I came across a thread with some code that I was able to adapt and add to in order to implement the decorator pattern to initialize the values of an SQLALchemy Model from a python dictionary (or Pydantic Model). This decorator supports Many To Many, One To Many, and Many To One relationship as well as white-listing attributes to be excluded, allowing for custom initialization as needed.

Quick Note:

This code isn’t well tested and is mostly experimental. I’ve used it extensively in a few projects, but can’t speak to the scalability of the approach. That said, along with the decorating you’ll also need to do a few things to prepare.

  1. Models that are a relationship must have a get_ref method that takes in an id and return None or a matching Model. This is used to set relationships and not create existing entries in the table.
  2. Models must have an ‘id’ column.

The Code

from functools import wraps
from typing import Union

from sqlalchemy.orm import MANYTOMANY, MANYTOONE, ONETOMANY


def handle_one_to_many_list(relation_cls, all_elements: list[dict]):
    elems_to_create = []
    updated_elems = []

    for elem in all_elements:
        elem_id = elem.get("id", None)

        existing_elem = relation_cls.get_ref(match_value=elem_id)

        if existing_elem is None:

            elems_to_create.append(elem)

        else:
            for key, value in elem.items():
                setattr(existing_elem, key, value)

            updated_elems.append(existing_elem)

    new_elems = []
    for elem in elems_to_create:
        new_elems = [relation_cls(**elem) for elem in all_elements]

    return new_elems


def auto_init(exclude: Union[set, list] = None):  # sourcery no-metrics
    """Wraps the `__init__` method of a class to automatically set the common
    attributes.

    Args:
        exclude (Union[set, list], optional): [description]. Defaults to None.
    """

    exclude = exclude or set()
    exclude.add("id")

    def decorator(init):
        @wraps(init)
        def wrapper(self, *args, **kwargs):  # sourcery no-metrics
            """
            Custom initializer that allows nested children initialization.
            Only keys that are present as instance's class attributes are allowed.
            These could be, for example, any mapped columns or relationships.

            Code inspired from GitHub.
            Ref: https://github.com/tiangolo/fastapi/issues/2194
            """
            cls = self.__class__
            model_columns = self.__mapper__.columns
            relationships = self.__mapper__.relationships

            for key, val in kwargs.items():
                if key in exclude:
                    continue

                if not hasattr(cls, key):
                    continue
                    # raise TypeError(f"Invalid keyword argument: {key}")

                if key in model_columns:
                    setattr(self, key, val)
                    continue

                if key in relationships:
                    relation_dir = relationships[key].direction.name
                    relation_cls = relationships[key].mapper.entity
                    use_list = relationships[key].uselist

                    if relation_dir == ONETOMANY.name and use_list:
                        instances = handle_one_to_many_list(relation_cls, val)
                        setattr(self, key, instances)

                    if relation_dir == ONETOMANY.name and not use_list:
                        instance = relation_cls(**val)
                        setattr(self, key, instance)

                    elif relation_dir == MANYTOONE.name and not use_list:
                        if isinstance(val, dict):
                            val = val.get("id")

                            if val is None:
                                raise ValueError(
                                    f"Expected 'id' to be provided for {key}"
                                )

                        if isinstance(val, (str, int)):
                            instance = relation_cls.get_ref(match_value=val)
                            setattr(self, key, instance)

                    elif relation_dir == MANYTOMANY.name:
                        if not isinstance(val, list):
                            raise ValueError(
                                f"Expected many to many input to be of type list for {key}"
                            )

                        if isinstance(val[0], dict):
                            val = [elem.get("id") for elem in val]
                        intstances = [relation_cls.get_ref(elem) for elem in val]
                        setattr(self, key, intstances)

            return init(self, *args, **kwargs)

        return wrapper

    return decorator

Usage

class AdminModel(SqlAlchemyBase, BaseMixins):
    name = Column(String, index=True)
    email = Column(String, unique=True, index=True)
    password = Column(String)
    is_superuser = Column(Boolean(), default=False)

    @auto_init(exclude={'is_superuser'})
    def __init__(self, **_):
        this.is_superuser = false

Get Ref Examples


@classmethod
def get_ref(cls, match_value: str, match_attr: str = "id"):
    with SessionLocal() as session:
        eff_ref = getattr(cls, match_attr)
        return session.query(cls).filter(eff_ref == match_value).one_or_none()