Skip to content

Reference

pydantic_redis

A simple declarative ORM for redis based on pydantic.

Provides:

  1. A subclass-able Model class to create Object Relational Mapping to redis hashes
  2. A redis Store class to mutate and query Model's registered in it
  3. A RedisConfig class to pass to the Store constructor to connect to a redis instance
  4. A synchronous syncio and an asynchronous asyncio interface to the above classes
  5. Parent-child relationships allowing for nesting models within models.

pydantic_redis.syncio

Synchronous API for pydantic-redis ORM.

Typical usage example:

# from pydantic_redis import Store, Model, RedisConfig
from pydantic_redis.syncio import Store, Model, RedisConfig

class Book(Model):
    _primary_key_field = 'title'
    title: str

if __name__ == '__main__':
    store = Store(name="sample", redis_config=RedisConfig())
    store.register_model(Book)

    Book.insert(Book(title="Oliver Twist", author="Charles Dickens"))
    Book.update(
        _id="Oliver Twist", data={"author": "Jane Austen"}, life_span_seconds=3600
    )
    results = Book.select()
    Book.delete(ids=["Oliver Twist", "Great Expectations"])

Model

Bases: AbstractModel

The Base class for all Synchronous models.

Inherit this class when creating a new model. The new model should have _primary_key_field defined. Any interaction with redis is done through Model's.

Source code in pydantic_redis/syncio/model.py
class Model(AbstractModel):
    """
    The Base class for all Synchronous models.

    Inherit this class when creating a new model.
    The new model should have `_primary_key_field` defined.
    Any interaction with redis is done through `Model`'s.
    """

    _store: Store

    @classmethod
    def insert(
        cls,
        data: Union[List[AbstractModel], AbstractModel],
        life_span_seconds: Optional[float] = None,
    ):
        """Inserts a given record or list of records into the redis.

        Can add a single record or multiple records into redis.
        The records must be instances of this class. i.e. a `Book`
        model can only insert `Book` instances.

        Args:
            data: a model instance or list of model instances to put
                into the redis store
            life_span_seconds: the time-to-live in seconds of the records
                to be inserted. If not specified, it defaults to the `Store`'s
                life_span_seconds.
        """
        store = cls.get_store()

        life_span = (
            life_span_seconds
            if life_span_seconds is not None
            else store.life_span_in_seconds
        )
        with store.redis_store.pipeline(transaction=True) as pipeline:
            data_list = []

            if isinstance(data, list):
                data_list = data
            elif isinstance(data, AbstractModel):
                data_list = [data]

            for record in data_list:
                insert_on_pipeline(
                    model=cls,
                    _id=None,
                    pipeline=pipeline,
                    record=record,
                    life_span=life_span,
                )

            return pipeline.execute()

    @classmethod
    def update(
        cls, _id: Any, data: Dict[str, Any], life_span_seconds: Optional[float] = None
    ):
        """Updates the record whose primary key is `_id`.

        Updates the record of this Model in redis whose primary key is equal to the `_id` provided.
        The record is partially updated from the `data`.
        If `life_span_seconds` is provided, it will also update the time-to-live of
        the record.

        Args:
            _id: the primary key of record to be updated.
            data: the new changes
            life_span_seconds: the new time-to-live for the record
        """
        store = cls.get_store()
        life_span = (
            life_span_seconds
            if life_span_seconds is not None
            else store.life_span_in_seconds
        )
        with store.redis_store.pipeline(transaction=True) as pipeline:
            if isinstance(data, dict):
                insert_on_pipeline(
                    model=cls,
                    _id=_id,
                    pipeline=pipeline,
                    record=data,
                    life_span=life_span,
                )

            return pipeline.execute()

    @classmethod
    def delete(cls, ids: Union[Any, List[Any]]):
        """Removes a list of this Model's records from redis

        Removes all the records for the current Model whose primary keys
        have been included in the `ids` passed.

        Args:
            ids: list of primary keys of the records to remove
        """
        store = cls.get_store()
        with store.redis_store.pipeline() as pipeline:
            delete_on_pipeline(model=cls, pipeline=pipeline, ids=ids)
            return pipeline.execute()

    @classmethod
    def select(
        cls,
        columns: Optional[List[str]] = None,
        ids: Optional[List[Any]] = None,
        skip: int = 0,
        limit: Optional[int] = None,
        **kwargs,
    ) -> Union["Model", Dict[str, Any]]:
        """Retrieves records of this Model from redis.

        Retrieves the records for this Model from redis.

        Args:
            columns: the fields to return for each record
            ids: the primary keys of the records to returns
            skip: the number of records to skip. (default: 0)
            limit: the maximum number of records to return

        Returns:
            By default, it returns all records that belong to current Model.

            If `ids` are specified, it returns only records whose primary keys
            have been listed in `ids`.

            If `skip` and `limit` are specified WITHOUT `ids`, a slice of
            all records are returned.

            If `limit` and `ids` are specified, `limit` is ignored.

            If `columns` are specified, a list of dictionaries containing only
            the fields specified in `columns` is returned. Otherwise, instances
            of the current Model are returned.
        """
        if columns is None and ids is None:
            response = select_all_fields_all_ids(model=cls, skip=skip, limit=limit)

        elif columns is None and isinstance(ids, list):
            response = select_all_fields_some_ids(model=cls, ids=ids)

        elif isinstance(columns, list) and ids is None:
            response = select_some_fields_all_ids(
                model=cls, fields=columns, skip=skip, limit=limit
            )

        elif isinstance(columns, list) and isinstance(ids, list):
            response = select_some_fields_some_ids(model=cls, fields=columns, ids=ids)

        else:
            raise ValueError(
                f"columns {columns}, ids: {ids} should be either None or lists"
            )

        return parse_select_response(
            model=cls, response=response, as_models=(columns is None)
        )

delete(ids) classmethod

Removes a list of this Model's records from redis

Removes all the records for the current Model whose primary keys have been included in the ids passed.

Parameters:

Name Type Description Default
ids Union[Any, List[Any]]

list of primary keys of the records to remove

required
Source code in pydantic_redis/syncio/model.py
@classmethod
def delete(cls, ids: Union[Any, List[Any]]):
    """Removes a list of this Model's records from redis

    Removes all the records for the current Model whose primary keys
    have been included in the `ids` passed.

    Args:
        ids: list of primary keys of the records to remove
    """
    store = cls.get_store()
    with store.redis_store.pipeline() as pipeline:
        delete_on_pipeline(model=cls, pipeline=pipeline, ids=ids)
        return pipeline.execute()

insert(data, life_span_seconds=None) classmethod

Inserts a given record or list of records into the redis.

Can add a single record or multiple records into redis. The records must be instances of this class. i.e. a Book model can only insert Book instances.

Parameters:

Name Type Description Default
data Union[List[AbstractModel], AbstractModel]

a model instance or list of model instances to put into the redis store

required
life_span_seconds Optional[float]

the time-to-live in seconds of the records to be inserted. If not specified, it defaults to the Store's life_span_seconds.

None
Source code in pydantic_redis/syncio/model.py
@classmethod
def insert(
    cls,
    data: Union[List[AbstractModel], AbstractModel],
    life_span_seconds: Optional[float] = None,
):
    """Inserts a given record or list of records into the redis.

    Can add a single record or multiple records into redis.
    The records must be instances of this class. i.e. a `Book`
    model can only insert `Book` instances.

    Args:
        data: a model instance or list of model instances to put
            into the redis store
        life_span_seconds: the time-to-live in seconds of the records
            to be inserted. If not specified, it defaults to the `Store`'s
            life_span_seconds.
    """
    store = cls.get_store()

    life_span = (
        life_span_seconds
        if life_span_seconds is not None
        else store.life_span_in_seconds
    )
    with store.redis_store.pipeline(transaction=True) as pipeline:
        data_list = []

        if isinstance(data, list):
            data_list = data
        elif isinstance(data, AbstractModel):
            data_list = [data]

        for record in data_list:
            insert_on_pipeline(
                model=cls,
                _id=None,
                pipeline=pipeline,
                record=record,
                life_span=life_span,
            )

        return pipeline.execute()

select(columns=None, ids=None, skip=0, limit=None, **kwargs) classmethod

Retrieves records of this Model from redis.

Retrieves the records for this Model from redis.

Parameters:

Name Type Description Default
columns Optional[List[str]]

the fields to return for each record

None
ids Optional[List[Any]]

the primary keys of the records to returns

None
skip int

the number of records to skip. (default: 0)

0
limit Optional[int]

the maximum number of records to return

None

Returns:

Type Description
Union[Model, Dict[str, Any]]

By default, it returns all records that belong to current Model.

Union[Model, Dict[str, Any]]

If ids are specified, it returns only records whose primary keys

Union[Model, Dict[str, Any]]

have been listed in ids.

Union[Model, Dict[str, Any]]

If skip and limit are specified WITHOUT ids, a slice of

Union[Model, Dict[str, Any]]

all records are returned.

Union[Model, Dict[str, Any]]

If limit and ids are specified, limit is ignored.

Union[Model, Dict[str, Any]]

If columns are specified, a list of dictionaries containing only

Union[Model, Dict[str, Any]]

the fields specified in columns is returned. Otherwise, instances

Union[Model, Dict[str, Any]]

of the current Model are returned.

Source code in pydantic_redis/syncio/model.py
@classmethod
def select(
    cls,
    columns: Optional[List[str]] = None,
    ids: Optional[List[Any]] = None,
    skip: int = 0,
    limit: Optional[int] = None,
    **kwargs,
) -> Union["Model", Dict[str, Any]]:
    """Retrieves records of this Model from redis.

    Retrieves the records for this Model from redis.

    Args:
        columns: the fields to return for each record
        ids: the primary keys of the records to returns
        skip: the number of records to skip. (default: 0)
        limit: the maximum number of records to return

    Returns:
        By default, it returns all records that belong to current Model.

        If `ids` are specified, it returns only records whose primary keys
        have been listed in `ids`.

        If `skip` and `limit` are specified WITHOUT `ids`, a slice of
        all records are returned.

        If `limit` and `ids` are specified, `limit` is ignored.

        If `columns` are specified, a list of dictionaries containing only
        the fields specified in `columns` is returned. Otherwise, instances
        of the current Model are returned.
    """
    if columns is None and ids is None:
        response = select_all_fields_all_ids(model=cls, skip=skip, limit=limit)

    elif columns is None and isinstance(ids, list):
        response = select_all_fields_some_ids(model=cls, ids=ids)

    elif isinstance(columns, list) and ids is None:
        response = select_some_fields_all_ids(
            model=cls, fields=columns, skip=skip, limit=limit
        )

    elif isinstance(columns, list) and isinstance(ids, list):
        response = select_some_fields_some_ids(model=cls, fields=columns, ids=ids)

    else:
        raise ValueError(
            f"columns {columns}, ids: {ids} should be either None or lists"
        )

    return parse_select_response(
        model=cls, response=response, as_models=(columns is None)
    )

update(_id, data, life_span_seconds=None) classmethod

Updates the record whose primary key is _id.

Updates the record of this Model in redis whose primary key is equal to the _id provided. The record is partially updated from the data. If life_span_seconds is provided, it will also update the time-to-live of the record.

Parameters:

Name Type Description Default
_id Any

the primary key of record to be updated.

required
data Dict[str, Any]

the new changes

required
life_span_seconds Optional[float]

the new time-to-live for the record

None
Source code in pydantic_redis/syncio/model.py
@classmethod
def update(
    cls, _id: Any, data: Dict[str, Any], life_span_seconds: Optional[float] = None
):
    """Updates the record whose primary key is `_id`.

    Updates the record of this Model in redis whose primary key is equal to the `_id` provided.
    The record is partially updated from the `data`.
    If `life_span_seconds` is provided, it will also update the time-to-live of
    the record.

    Args:
        _id: the primary key of record to be updated.
        data: the new changes
        life_span_seconds: the new time-to-live for the record
    """
    store = cls.get_store()
    life_span = (
        life_span_seconds
        if life_span_seconds is not None
        else store.life_span_in_seconds
    )
    with store.redis_store.pipeline(transaction=True) as pipeline:
        if isinstance(data, dict):
            insert_on_pipeline(
                model=cls,
                _id=_id,
                pipeline=pipeline,
                record=data,
                life_span=life_span,
            )

        return pipeline.execute()

RedisConfig

Bases: BaseModel

Configuration for connecting to redis database.

Inorder to connect to a redis database, there are a number of configurations that are needed including the server's host address and port. RedisConfig computes a redis-url similar to redis://:password@host:self.port/db

Attributes:

Name Type Description
host str

the host address where the redis server is found (default: 'localhost').

port int

the port on which the redis server is running (default: 6379).

db int

the redis database identifier (default: 0).

password Optional[int]

the password for connecting to the redis server (default: None).

ssl bool

whether the connection to the redis server is to be via TLS (default: False)

encoding Optional[str]

(Optional[str]): the string encoding used with the redis database (default: utf-8)

Source code in pydantic_redis/config.py
class RedisConfig(BaseModel):
    """Configuration for connecting to redis database.

    Inorder to connect to a redis database, there are a number of
    configurations that are needed including the server's host address
    and port. `RedisConfig` computes a redis-url similar to
    `redis://:password@host:self.port/db`

    Attributes:
        host (str): the host address where the redis server is found (default: 'localhost').
        port (int): the port on which the redis server is running (default: 6379).
        db (int): the redis database identifier (default: 0).
        password (Optional[int]): the password for connecting to the
            redis server (default: None).
        ssl (bool): whether the connection to the redis server is to be via TLS (default: False)
        encoding: (Optional[str]): the string encoding used with the redis database
            (default: utf-8)
    """

    model_config = ConfigDict(from_attributes=True)

    host: str = "localhost"
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
    ssl: bool = False
    encoding: Optional[str] = "utf-8"

    @property
    def redis_url(self) -> str:
        """a redis URL of form `redis://:password@host:port/db`. (`rediss://..` if TLS)."""
        proto = "rediss" if self.ssl else "redis"
        if self.password is None:
            return f"{proto}://{self.host}:{self.port}/{self.db}"
        return f"{proto}://:{self.password}@{self.host}:{self.port}/{self.db}"

redis_url: str property

a redis URL of form redis://:password@host:port/db. (rediss://.. if TLS).

Store

Bases: AbstractStore

Manages a collection of Model's, connecting them to a redis database

A Model can only interact with a redis database when it is registered with a Store that is connected to that database.

Attributes:

Name Type Description
models Dict[str, Type[Model]]

a mapping of registered Model's, with the keys being the Model name

name str

the name of this Store

redis_config RedisConfig

the configuration for connecting to a redis database

life_span_in_seconds Optional[int]

the default time-to-live for the records inserted in this store (default: None)

Source code in pydantic_redis/syncio/store.py
class Store(AbstractStore):
    """Manages a collection of Model's, connecting them to a redis database

    A Model can only interact with a redis database when it is registered
    with a `Store` that is connected to that database.

    Attributes:
        models (Dict[str, Type[pydantic_redis.syncio.Model]]): a mapping of registered `Model`'s, with the keys being the
            Model name
        name (str): the name of this Store
        redis_config (pydantic_redis.syncio.RedisConfig): the configuration for connecting to a redis database
        life_span_in_seconds (Optional[int]): the default time-to-live for the records inserted in this store
            (default: None)
    """

    models: Dict[str, Type["Model"]] = {}

    def _connect_to_redis(self) -> redis.Redis:
        """Connects the store to redis.

        See base class.
        """
        return redis.from_url(
            self.redis_config.redis_url,
            encoding=self.redis_config.encoding,
            decode_responses=True,
        )

pydantic_redis.asyncio

Asynchronous API for pydantic-redis ORM.

Typical usage example:

import asyncio
from pydantic_redis.asyncio import Store, Model, RedisConfig

class Book(Model):
    _primary_key_field = 'title'
    title: str

async def main():
    store = Store(name="sample", redis_config=RedisConfig())
    store.register_model(Book)

    await Book.insert(Book(title="Oliver Twist", author="Charles Dickens"))
    await Book.update(
        _id="Oliver Twist", data={"author": "Jane Austen"}, life_span_seconds=3600
    )
    results = await Book.select()
    await Book.delete(ids=["Oliver Twist", "Great Expectations"])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Model

Bases: AbstractModel

The Base class for all Asynchronous models.

Inherit this class when creating a new model. The new model should have _primary_key_field defined. Any interaction with redis is done through Model's.

Source code in pydantic_redis/asyncio/model.py
class Model(AbstractModel):
    """The Base class for all Asynchronous models.

    Inherit this class when creating a new model.
    The new model should have `_primary_key_field` defined.
    Any interaction with redis is done through `Model`'s.
    """

    _store: Store

    @classmethod
    async def insert(
        cls,
        data: Union[List[AbstractModel], AbstractModel],
        life_span_seconds: Optional[float] = None,
    ):
        """Inserts a given record or list of records into the redis.

        Can add a single record or multiple records into redis.
        The records must be instances of this class. i.e. a `Book`
        model can only insert `Book` instances.

        Args:
            data: a model instance or list of model instances to put
                into the redis store
            life_span_seconds: the time-to-live in seconds of the records
                to be inserted. If not specified, it defaults to the `Store`'s
                life_span_seconds.
        """
        store = cls.get_store()
        life_span = (
            life_span_seconds
            if life_span_seconds is not None
            else store.life_span_in_seconds
        )

        async with store.redis_store.pipeline(transaction=True) as pipeline:
            data_list = []

            if isinstance(data, list):
                data_list = data
            elif isinstance(data, AbstractModel):
                data_list = [data]

            for record in data_list:
                insert_on_pipeline(
                    model=cls,
                    _id=None,
                    pipeline=pipeline,
                    record=record,
                    life_span=life_span,
                )

            return await pipeline.execute()

    @classmethod
    async def update(
        cls, _id: Any, data: Dict[str, Any], life_span_seconds: Optional[float] = None
    ):
        """Updates the record whose primary key is `_id`.

        Updates the record of this Model in redis whose primary key is equal to the `_id` provided.
        The record is partially updated from the `data`.
        If `life_span_seconds` is provided, it will also update the time-to-live of
        the record.

        Args:
            _id: the primary key of record to be updated.
            data: the new changes
            life_span_seconds: the new time-to-live for the record
        """
        store = cls.get_store()
        life_span = (
            life_span_seconds
            if life_span_seconds is not None
            else store.life_span_in_seconds
        )
        async with store.redis_store.pipeline(transaction=True) as pipeline:
            if isinstance(data, dict):
                insert_on_pipeline(
                    model=cls,
                    _id=_id,
                    pipeline=pipeline,
                    record=data,
                    life_span=life_span,
                )

            return await pipeline.execute()

    @classmethod
    async def delete(cls, ids: Union[Any, List[Any]]):
        """Removes a list of this Model's records from redis

        Removes all the records for the current Model whose primary keys
        have been included in the `ids` passed.

        Args:
            ids: list of primary keys of the records to remove
        """
        store = cls.get_store()

        async with store.redis_store.pipeline() as pipeline:
            delete_on_pipeline(model=cls, pipeline=pipeline, ids=ids)
            return await pipeline.execute()

    @classmethod
    async def select(
        cls,
        columns: Optional[List[str]] = None,
        ids: Optional[List[Any]] = None,
        skip: int = 0,
        limit: Optional[int] = None,
        **kwargs,
    ) -> Union["Model", Dict[str, Any]]:
        """Retrieves records of this Model from redis.

        Retrieves the records for this Model from redis.

        Args:
            columns: the fields to return for each record
            ids: the primary keys of the records to returns
            skip: the number of records to skip. (default: 0)
            limit: the maximum number of records to return

        Returns:
            By default, it returns all records that belong to current Model.

            If `ids` are specified, it returns only records whose primary keys
            have been listed in `ids`.

            If `skip` and `limit` are specified WITHOUT `ids`, a slice of
            all records are returned.

            If `limit` and `ids` are specified, `limit` is ignored.

            If `columns` are specified, a list of dictionaries containing only
            the fields specified in `columns` is returned. Otherwise, instances
            of the current Model are returned.
        """
        if columns is None and ids is None:
            response = await select_all_fields_all_ids(
                model=cls, skip=skip, limit=limit
            )

        elif columns is None and isinstance(ids, list):
            response = await select_all_fields_some_ids(model=cls, ids=ids)

        elif isinstance(columns, list) and ids is None:
            response = await select_some_fields_all_ids(
                model=cls, fields=columns, skip=skip, limit=limit
            )

        elif isinstance(columns, list) and isinstance(ids, list):
            response = await select_some_fields_some_ids(
                model=cls, fields=columns, ids=ids
            )

        else:
            raise ValueError(
                f"columns {columns}, ids: {ids} should be either None or lists"
            )

        return parse_select_response(
            model=cls, response=response, as_models=(columns is None)
        )

delete(ids) async classmethod

Removes a list of this Model's records from redis

Removes all the records for the current Model whose primary keys have been included in the ids passed.

Parameters:

Name Type Description Default
ids Union[Any, List[Any]]

list of primary keys of the records to remove

required
Source code in pydantic_redis/asyncio/model.py
@classmethod
async def delete(cls, ids: Union[Any, List[Any]]):
    """Removes a list of this Model's records from redis

    Removes all the records for the current Model whose primary keys
    have been included in the `ids` passed.

    Args:
        ids: list of primary keys of the records to remove
    """
    store = cls.get_store()

    async with store.redis_store.pipeline() as pipeline:
        delete_on_pipeline(model=cls, pipeline=pipeline, ids=ids)
        return await pipeline.execute()

insert(data, life_span_seconds=None) async classmethod

Inserts a given record or list of records into the redis.

Can add a single record or multiple records into redis. The records must be instances of this class. i.e. a Book model can only insert Book instances.

Parameters:

Name Type Description Default
data Union[List[AbstractModel], AbstractModel]

a model instance or list of model instances to put into the redis store

required
life_span_seconds Optional[float]

the time-to-live in seconds of the records to be inserted. If not specified, it defaults to the Store's life_span_seconds.

None
Source code in pydantic_redis/asyncio/model.py
@classmethod
async def insert(
    cls,
    data: Union[List[AbstractModel], AbstractModel],
    life_span_seconds: Optional[float] = None,
):
    """Inserts a given record or list of records into the redis.

    Can add a single record or multiple records into redis.
    The records must be instances of this class. i.e. a `Book`
    model can only insert `Book` instances.

    Args:
        data: a model instance or list of model instances to put
            into the redis store
        life_span_seconds: the time-to-live in seconds of the records
            to be inserted. If not specified, it defaults to the `Store`'s
            life_span_seconds.
    """
    store = cls.get_store()
    life_span = (
        life_span_seconds
        if life_span_seconds is not None
        else store.life_span_in_seconds
    )

    async with store.redis_store.pipeline(transaction=True) as pipeline:
        data_list = []

        if isinstance(data, list):
            data_list = data
        elif isinstance(data, AbstractModel):
            data_list = [data]

        for record in data_list:
            insert_on_pipeline(
                model=cls,
                _id=None,
                pipeline=pipeline,
                record=record,
                life_span=life_span,
            )

        return await pipeline.execute()

select(columns=None, ids=None, skip=0, limit=None, **kwargs) async classmethod

Retrieves records of this Model from redis.

Retrieves the records for this Model from redis.

Parameters:

Name Type Description Default
columns Optional[List[str]]

the fields to return for each record

None
ids Optional[List[Any]]

the primary keys of the records to returns

None
skip int

the number of records to skip. (default: 0)

0
limit Optional[int]

the maximum number of records to return

None

Returns:

Type Description
Union[Model, Dict[str, Any]]

By default, it returns all records that belong to current Model.

Union[Model, Dict[str, Any]]

If ids are specified, it returns only records whose primary keys

Union[Model, Dict[str, Any]]

have been listed in ids.

Union[Model, Dict[str, Any]]

If skip and limit are specified WITHOUT ids, a slice of

Union[Model, Dict[str, Any]]

all records are returned.

Union[Model, Dict[str, Any]]

If limit and ids are specified, limit is ignored.

Union[Model, Dict[str, Any]]

If columns are specified, a list of dictionaries containing only

Union[Model, Dict[str, Any]]

the fields specified in columns is returned. Otherwise, instances

Union[Model, Dict[str, Any]]

of the current Model are returned.

Source code in pydantic_redis/asyncio/model.py
@classmethod
async def select(
    cls,
    columns: Optional[List[str]] = None,
    ids: Optional[List[Any]] = None,
    skip: int = 0,
    limit: Optional[int] = None,
    **kwargs,
) -> Union["Model", Dict[str, Any]]:
    """Retrieves records of this Model from redis.

    Retrieves the records for this Model from redis.

    Args:
        columns: the fields to return for each record
        ids: the primary keys of the records to returns
        skip: the number of records to skip. (default: 0)
        limit: the maximum number of records to return

    Returns:
        By default, it returns all records that belong to current Model.

        If `ids` are specified, it returns only records whose primary keys
        have been listed in `ids`.

        If `skip` and `limit` are specified WITHOUT `ids`, a slice of
        all records are returned.

        If `limit` and `ids` are specified, `limit` is ignored.

        If `columns` are specified, a list of dictionaries containing only
        the fields specified in `columns` is returned. Otherwise, instances
        of the current Model are returned.
    """
    if columns is None and ids is None:
        response = await select_all_fields_all_ids(
            model=cls, skip=skip, limit=limit
        )

    elif columns is None and isinstance(ids, list):
        response = await select_all_fields_some_ids(model=cls, ids=ids)

    elif isinstance(columns, list) and ids is None:
        response = await select_some_fields_all_ids(
            model=cls, fields=columns, skip=skip, limit=limit
        )

    elif isinstance(columns, list) and isinstance(ids, list):
        response = await select_some_fields_some_ids(
            model=cls, fields=columns, ids=ids
        )

    else:
        raise ValueError(
            f"columns {columns}, ids: {ids} should be either None or lists"
        )

    return parse_select_response(
        model=cls, response=response, as_models=(columns is None)
    )

update(_id, data, life_span_seconds=None) async classmethod

Updates the record whose primary key is _id.

Updates the record of this Model in redis whose primary key is equal to the _id provided. The record is partially updated from the data. If life_span_seconds is provided, it will also update the time-to-live of the record.

Parameters:

Name Type Description Default
_id Any

the primary key of record to be updated.

required
data Dict[str, Any]

the new changes

required
life_span_seconds Optional[float]

the new time-to-live for the record

None
Source code in pydantic_redis/asyncio/model.py
@classmethod
async def update(
    cls, _id: Any, data: Dict[str, Any], life_span_seconds: Optional[float] = None
):
    """Updates the record whose primary key is `_id`.

    Updates the record of this Model in redis whose primary key is equal to the `_id` provided.
    The record is partially updated from the `data`.
    If `life_span_seconds` is provided, it will also update the time-to-live of
    the record.

    Args:
        _id: the primary key of record to be updated.
        data: the new changes
        life_span_seconds: the new time-to-live for the record
    """
    store = cls.get_store()
    life_span = (
        life_span_seconds
        if life_span_seconds is not None
        else store.life_span_in_seconds
    )
    async with store.redis_store.pipeline(transaction=True) as pipeline:
        if isinstance(data, dict):
            insert_on_pipeline(
                model=cls,
                _id=_id,
                pipeline=pipeline,
                record=data,
                life_span=life_span,
            )

        return await pipeline.execute()

RedisConfig

Bases: BaseModel

Configuration for connecting to redis database.

Inorder to connect to a redis database, there are a number of configurations that are needed including the server's host address and port. RedisConfig computes a redis-url similar to redis://:password@host:self.port/db

Attributes:

Name Type Description
host str

the host address where the redis server is found (default: 'localhost').

port int

the port on which the redis server is running (default: 6379).

db int

the redis database identifier (default: 0).

password Optional[int]

the password for connecting to the redis server (default: None).

ssl bool

whether the connection to the redis server is to be via TLS (default: False)

encoding Optional[str]

(Optional[str]): the string encoding used with the redis database (default: utf-8)

Source code in pydantic_redis/config.py
class RedisConfig(BaseModel):
    """Configuration for connecting to redis database.

    Inorder to connect to a redis database, there are a number of
    configurations that are needed including the server's host address
    and port. `RedisConfig` computes a redis-url similar to
    `redis://:password@host:self.port/db`

    Attributes:
        host (str): the host address where the redis server is found (default: 'localhost').
        port (int): the port on which the redis server is running (default: 6379).
        db (int): the redis database identifier (default: 0).
        password (Optional[int]): the password for connecting to the
            redis server (default: None).
        ssl (bool): whether the connection to the redis server is to be via TLS (default: False)
        encoding: (Optional[str]): the string encoding used with the redis database
            (default: utf-8)
    """

    model_config = ConfigDict(from_attributes=True)

    host: str = "localhost"
    port: int = 6379
    db: int = 0
    password: Optional[str] = None
    ssl: bool = False
    encoding: Optional[str] = "utf-8"

    @property
    def redis_url(self) -> str:
        """a redis URL of form `redis://:password@host:port/db`. (`rediss://..` if TLS)."""
        proto = "rediss" if self.ssl else "redis"
        if self.password is None:
            return f"{proto}://{self.host}:{self.port}/{self.db}"
        return f"{proto}://:{self.password}@{self.host}:{self.port}/{self.db}"

redis_url: str property

a redis URL of form redis://:password@host:port/db. (rediss://.. if TLS).

Store

Bases: AbstractStore

Manages a collection of Model's, connecting them to a redis database

A Model can only interact with a redis database when it is registered with a Store that is connected to that database.

Attributes:

Name Type Description
models Dict[str, Type[Model]]

a mapping of registered Model's, with the keys being the Model name

name str

the name of this Store

redis_config RedisConfig

the configuration for connecting to a redis database

life_span_in_seconds Optional[int]

the default time-to-live for the records inserted in this store (default: None)

Source code in pydantic_redis/asyncio/store.py
class Store(AbstractStore):
    """Manages a collection of Model's, connecting them to a redis database

    A Model can only interact with a redis database when it is registered
    with a `Store` that is connected to that database.

    Attributes:
        models (Dict[str, Type[pydantic_redis.syncio.Model]]): a mapping of registered `Model`'s, with the keys being the
            Model name
        name (str): the name of this Store
        redis_config (pydantic_redis.syncio.RedisConfig): the configuration for connecting to a redis database
        life_span_in_seconds (Optional[int]): the default time-to-live for the records inserted in this store
            (default: None)
    """

    models: Dict[str, Type["Model"]] = {}

    def _connect_to_redis(self) -> redis.Redis:
        """Connects the store to redis.

        See the base class.
        """
        return redis.from_url(
            self.redis_config.redis_url,
            encoding=self.redis_config.encoding,
            decode_responses=True,
        )