Skip to content

SchedulerConfig

Up to the version 3.2.3, Esmerald was supporting only Asyncz for its internal scheduler. From the version 3.2.3 onwards, that is still the case but Esmerald also makes it modular, like everything in it.

What does this even mean?

Well, this means that if you don't want to use Asyncz for your own personal or applicational reasons, then you can simply build your own configuration and plug the scheduler into Esmerald.

This is now possible due to the fact that Esmerald now implements the SchedulerConfig.

How to import it

You can import the configuration from the following:

from esmerald.contrib.schedulers import SchedulerConfig

The SchedulerConfig class

When implementing a scheduler configurations you must implement two functions.

  1. async def start()
  2. async def shutdown()

This is what makes the SchedulerConfig modular because there are plenty of schedulers out there and each one of them with a lot of different options and configurations but the one thing they all have in common is the fact that all of them must start and shutdown at some point. The only thing Esmerald "cares" is that by encapsulating that functionality into two simple functions.

The start function

The start function, as the name suggests, its the function that Esmerald calls internally to start the scheduler for you. This is important because when the enable_scheduler flag is set, it will look for the scheduler config and call the start on startup.

The shutdown function

The shutdown function, as the name suggests, its the function that Esmerald calls internally to shutdown the scheduler for you. This is important because when the enable_scheduler flag is set, it will look for the scheduler config and call the shutdown on shutdown (usually when the application stops).

How to use it

Esmerald already implements this interface with the custom AsynczConfig. This functionality is very handy since Asyncz has a lot of configurations that can be passed and used within an Esmerald application.

Let us see how the implementation looks like.

import warnings
from uuid import uuid4
from datetime import datetime
from datetime import timezone as dtimezone
from typing import Any, Callable, Dict, Union, cast, Type

from asyncz.schedulers import AsyncIOScheduler
from asyncz.schedulers.types import SchedulerType
from asyncz.triggers.types import TriggerType
from asyncz.tasks.base import Task as AsynczTask
from asyncz.typing import undefined, UndefinedType
from esmerald.conf import settings
from esmerald.contrib.schedulers.base import SchedulerConfig
from esmerald.exceptions import ImproperlyConfigured
from esmerald.utils.module_loading import import_string


class AsynczConfig(SchedulerConfig):
    """
    Implements an integration with Asyncz, allowing to
    customise the scheduler with the provided configurations.
    """

    def __init__(
        self,
        scheduler_class: Type[SchedulerType] = AsyncIOScheduler,
        tasks: Union[Dict[str, str]] = None,
        timezone: Union[dtimezone, str, None] = None,
        configurations: Union[Dict[str, Dict[str, str]], None] = None,
        **kwargs: Dict[str, Any],
    ):
        """
        Initializes the AsynczConfig object.

        Args:
            scheduler_class: The class of the scheduler to be used.
            tasks: A dictionary of tasks to be registered in the scheduler.
            timezone: The timezone to be used by the scheduler.
            configurations: Extra configurations to be passed to the scheduler.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(**kwargs)
        self.scheduler_class = scheduler_class
        self.tasks = tasks
        self.timezone = timezone
        self.configurations = configurations
        self.options = kwargs

        for task, module in self.tasks.items():
            if not isinstance(task, str) or not isinstance(module, str):
                raise ImproperlyConfigured("The dict of tasks must be Dict[str, str].")

        if not self.tasks:
            warnings.warn(
                "Esmerald is starting the scheduler, yet there are no tasks declared.",
                UserWarning,
                stacklevel=2,
            )

        # Load the scheduler object
        self.handler = self.get_scheduler(
            scheduler=self.scheduler_class,
            timezone=self.timezone,
            configurations=self.configurations,
            **self.options,
        )

        self.register_tasks(tasks=self.tasks)

    def register_tasks(self, tasks: Dict[str, str]) -> None:
        """
        Registers the tasks in the Scheduler.

        Args:
            tasks: A dictionary of tasks to be registered in the scheduler.
        """
        for task, _module in tasks.items():
            imported_task = f"{_module}.{task}"
            scheduled_task: "Task" = import_string(imported_task)

            if not scheduled_task.is_enabled:
                continue

            try:
                scheduled_task.add_task(self.handler)
            except Exception as e:
                raise ImproperlyConfigured(str(e)) from e

    def get_scheduler(
        self,
        scheduler: Type[SchedulerType],
        timezone: Union[dtimezone, str, None] = None,
        configurations: Union[Dict[str, Any], None] = None,
        **options: Dict[str, Any],
    ) -> SchedulerType:
        """
        Initiates the scheduler from the given time.
        If no value is provided, it will default to AsyncIOScheduler.

        The value of `scheduler_class` can be overwritten by any esmerald custom settings.

        Args:
            scheduler: The class of the scheduler to be used.
            timezone: The timezone instance.
            configurations: A dictionary with extra configurations to be passed to the scheduler.
            **options: Additional options.

        Returns:
            SchedulerType: An instance of a Scheduler.
        """
        if not timezone:
            timezone = settings.timezone

        if not configurations:
            return scheduler(timezone=timezone, **options)

        return scheduler(global_config=configurations, timezone=timezone, **options)

    async def start(self, **kwargs: Dict[str, Any]) -> None:
        """
        Starts the scheduler.

        Args:
            **kwargs: Additional keyword arguments.
        """
        self.handler.start(**kwargs)

    async def shutdown(self, **kwargs: Dict[str, Any]) -> None:
        """
        Shuts down the scheduler.

        Args:
            **kwargs: Additional keyword arguments.
        """
        self.handler.shutdown(**kwargs)


class Task:
    """
    Base for the scheduler decorator that will auto discover the
    tasks in the application and add them to the internal scheduler.
    """

    def __init__(
        self,
        *,
        name: Union[str, None] = None,
        trigger: Union[TriggerType, None] = None,
        id: Union[str, None] = None,
        mistrigger_grace_time: Union[int, UndefinedType, None] = undefined,
        coalesce: Union[bool, UndefinedType] = undefined,
        max_instances: Union[int, UndefinedType, None] = undefined,
        next_run_time: Union[datetime, str, UndefinedType, None] = undefined,
        store: str = "default",
        executor: str = "default",
        replace_existing: bool = False,
        args: Union[Any, None] = None,
        kwargs: Union[Dict[str, Any], None] = None,
        is_enabled: bool = True,
    ) -> None:
        """
        Initializes a new instance of the `Task` class for the  Scheduler.

        Args:
            name (str, optional): Textual description of the task.
            trigger (TriggerType, optional): An instance of a trigger class.
            id (str, optional): Explicit identifier for the task.
            mistrigger_grace_time (int, optional): Seconds after the designated runtime that the task is still allowed to be run
                (or None to allow the task to run no matter how late it is).
            coalesce (bool, optional): Run once instead of many times if the scheduler determines that the task should be run more than once in succession.
            max_instances (int, optional): Maximum number of concurrently running instances allowed for this task.
            next_run_time (datetime, optional): When to first run the task, regardless of the trigger (pass None to add the task as paused).
            store (str, optional): Alias of the task store to store the task in.
            executor (str, optional): Alias of the executor to run the task with.
            replace_existing (bool, optional): True to replace an existing task with the same id
                (but retain the number of runs from the existing one).
            args (Any, optional): List of positional arguments to call func with.
            kwargs (Dict[str, Any], optional): Dict of keyword arguments to call func with.
            is_enabled (bool, optional): True if the task is to be added to the scheduler.
        """
        self.name = name
        self.trigger = trigger
        self.id = id
        self.mistrigger_grace_time = mistrigger_grace_time
        self.coalesce = coalesce
        self.max_instances = max_instances
        self.next_run_time = next_run_time
        self.store = store
        self.executor = executor
        self.replace_existing = replace_existing
        self.args = args
        self.kwargs = kwargs
        self.is_enabled = is_enabled
        self.fn = None

    def add_task(self, scheduler: SchedulerType) -> None:
        try:
            scheduler.add_task(
                self.fn,
                trigger=self.trigger,
                args=self.args,
                kwargs=self.kwargs,
                id=self.id,
                name=self.name,
                mistrigger_grace_time=self.mistrigger_grace_time,
                coalesce=self.coalesce,
                max_instances=self.max_instances,
                next_run_time=self.next_run_time,
                store=self.store,
                executor=self.executor,
                replace_existing=self.replace_existing,
            )
        except Exception as e:
            raise ImproperlyConfigured(str(e)) from e

We won't be dueling on the technicalities of this configuration because its unique to Asyncz provided by Esmerald but it is not mandatory to use it as you can build your own and pass it to Esmerald scheduler_config parameter.

SchedulerConfig and application

To use the SchedulerConfig in an application, like the one shown above with asyncz, you can simply do this:

Note

We use the existing AsynczConfig as example but feel free to use your own if you require something else.

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig


def get_scheduler_config() -> AsynczConfig:
    # Define the stores
    # Override the default MemoryStore to become RedisStore where the db is 0
    stores = {"default": MongoDBStore()}

    # Define the executors
    # Override the default ot be the AsyncIOExecutor
    executors = {
        "default": AsyncIOExecutor(),
        "threadpool": ThreadPoolExecutor(max_workers=20),
    }

    # Set the defaults
    task_defaults = {"coalesce": False, "max_instances": 4}

    return AsynczConfig(
        tasks=...,
        timezone="UTC",
        stores=stores,
        executors=executors,
        task_defaults=task_defaults,
    )


app = Esmerald(routes=[...], scheduler_config=get_scheduler_config())

If you want to know more about how to use the AsynczConfig, check out the section.

Application lifecycle

Esmerald scheduler is tight to the application lifecycle and that means the on_startup/on_shutdown and lifespan. You can read more about this in the appropriate section of the documentation.

By default, the scheduler is linked to on_startup/on_shutdown events and those are automatically managed for you but if you require the lifespan instead, then you must do the appropriate adjustments.

The following example serves as a suggestion but feel free to use your own design. Let us check how we could manage this using the lifespan instead.

from contextlib import asynccontextmanager
from functools import lru_cache

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import Esmerald
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig


@asynccontextmanager
async def lifespan(app: Esmerald):
    # What happens on startup
    await get_scheduler_config().start()
    yield
    # What happens on shutdown
    await get_scheduler_config().shutdown()


@lru_cache
def get_scheduler_config() -> AsynczConfig:
    # Define the stores
    # Override the default MemoryStore to become RedisStore where the db is 0
    stores = {"default": MongoDBStore()}

    # Define the executors
    # Override the default ot be the AsyncIOExecutor
    executors = {
        "default": AsyncIOExecutor(),
        "threadpool": ThreadPoolExecutor(max_workers=20),
    }

    # Set the defaults
    task_defaults = {"coalesce": False, "max_instances": 4}

    return AsynczConfig(
        tasks=...,
        timezone="UTC",
        stores=stores,
        executors=executors,
        task_defaults=task_defaults,
    )


app = Esmerald(
    routes=[...],
    lifespan=lifespan,
    scheduler_config=get_scheduler_config(),
)

Pretty easy, right? Esmerald then understands what needs to be done as normal.

The SchedulerConfig and the settings

Like everything in Esmerald, the SchedulerConfig can be also made available via settings.

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.stores.mongo import MongoDBStore
from esmerald import EsmeraldAPISettings
from esmerald.contrib.schedulers import SchedulerConfig
from esmerald.contrib.schedulers.asyncz.config import AsynczConfig


class CustomSettings(EsmeraldAPISettings):
    @property
    def scheduler_config(self) -> SchedulerConfig:
        stores = {"default": MongoDBStore()}

        # Define the executors
        # Override the default ot be the AsyncIOExecutor
        executors = {
            "default": AsyncIOExecutor(),
            "threadpool": ThreadPoolExecutor(max_workers=20),
        }

        # Set the defaults
        task_defaults = {"coalesce": False, "max_instances": 4}

        return AsynczConfig(
            tasks=...,
            timezone="UTC",
            stores=stores,
            executors=executors,
            task_defaults=task_defaults,
        )

Important Notes

  • You can create your own custom scheduler config.
  • You must implement the start/shutdown functions in any scheduler configuration.
  • You can use or on_startup/shutdown or lifespan events. The first is automatically managed for you.