Distributed Task Processing in Python with Celery

Table of Contents

  1. Introduction
  2. Prerequisites
  3. Installation
  4. Setting up a Celery Project
  5. Defining Tasks
  6. Running Workers
  7. Calling Tasks
  8. Monitoring and Managing Tasks
  9. Conclusion

Introduction

In this tutorial, we will learn how to use Celery, a distributed task queue library, to process tasks asynchronously in Python. Celery allows you to distribute tasks across multiple workers, making it ideal for handling computationally intensive or time-consuming operations.

By the end of this tutorial, you will be able to:

  • Install and set up Celery in a Python project
  • Define tasks that can be executed asynchronously
  • Run Celery workers to process tasks
  • Call and monitor tasks from your Python code

Prerequisites

To follow along with this tutorial, you should have a basic understanding of Python programming. You will also need the following:

  • Python 3 installed on your system
  • A text editor or integrated development environment (IDE) for writing Python code

Installation

Before we can start using Celery, we need to install it. Open your terminal or command prompt and enter the following command to install Celery using pip: shell pip install celery This will install the latest version of Celery and its dependencies.

Setting up a Celery Project

To use Celery in a Python project, we first need to define a Celery application. Create a new directory for your project and navigate to it in the terminal. Inside this directory, create a new Python script called tasks.py.

In tasks.py, import the Celery class and create a Celery application. Add the following code to your tasks.py file: ```python from celery import Celery

app = Celery(
    "myapp",
    broker="amqp://guest:guest@localhost",
    backend="rpc://"
)

@app.task
def add(x, y):
    return x + y
``` In the code above, we imported the `Celery` class from the `celery` module. We then created a new Celery application by instantiating the `Celery` class. The `broker` argument specifies the message broker URL, which is used to send and receive messages between the Celery application and the workers. In this example, we are using RabbitMQ as our message broker, with the default guest/guest credentials and a local host. You can replace this with the URL of your own message broker.

The backend argument specifies the result backend URL, which is used to store and retrieve the results of executed tasks. In this example, we are using the RPC (Remote Procedure Call) backend, which allows us to retrieve task results synchronously.

Next, we defined a task called add. The @app.task decorator is used to register the function as a Celery task. In this case, the add task takes two arguments x and y and returns their sum.

Running Workers

Now that we have defined our Celery application and tasks, we can start running Celery workers to process the tasks. Open a new terminal or command prompt window and navigate to the project directory.

To start a Celery worker, use the following command: shell celery -A tasks worker --loglevel=info

  • The -A option specifies the name of the Celery application we defined in tasks.py.
  • The worker command tells Celery to start a worker process.
  • The --loglevel=info option sets the logging level to info, which will display detailed logs of task execution.

You should see output similar to the following: ``` ————– celery@your-hostname v5.x.x (singularity) — *** —– – *** —- - *** — * — - ** ———- [config] - ** ———- .> app: myapp:0x10abf70f0 - ** ———- .> transport: opq://guest:@localhost:5672// - ** ———- .> results: rpc:// - ** —— * —— ————— [queues] .> celery exchange=celery(direct) key=celery

[tasks]
    . tasks.add

[2022-01-01 20:00:00,000: INFO/MainProcess] Connected to opq://guest:**@localhost:5672//
[2022-01-01 20:00:00,000: INFO/MainProcess] mingle: searching for neighbors
[2022-01-01 20:00:01,000: INFO/MainProcess] mingle: sync with 1 nodes
[2022-01-01 20:00:01,000: INFO/MainProcess] mingle: sync complete
[2022-01-01 20:00:01,000: INFO/MainProcess] celery@your-hostname ready.
``` This means that the Celery worker is up and running, waiting for tasks to be dispatched. Keep this terminal window open, as the worker needs to continue running.

Calling Tasks

Now that our Celery worker is running, we can call our tasks from Python code. Create a new Python script in the project directory, called main.py. Add the following code to your main.py file: ```python from tasks import add

result = add.delay(4, 6)
print(f"Task ID: {result.task_id}")

while not result.ready():
    continue

print(f"Task Result: {result.get()}")
``` In the code above, we import the `add` task from our `tasks.py` module. We then call the `add` task using the `delay` method, passing in the arguments `4` and `6`. The `delay` method returns a `AsyncResult` object, which represents the result of the task.

We print the task ID assigned to the result, which can be useful for tracking and monitoring tasks. We then enter a loop and wait for the task to be marked as ready. Finally, we retrieve the result of the task using the get method and print it.

To execute the main.py script, open a new terminal or command prompt window and navigate to the project directory. Run the following command: shell python main.py You should see the following output: Task ID: some-task-id Task Result: 10 This indicates that the task was successfully executed by the Celery worker and returned the expected result.

Monitoring and Managing Tasks

Celery provides a powerful monitoring and management tool called flower. Flower allows you to monitor the status and progress of your Celery tasks via a web-based user interface.

To install flower, run the following command: shell pip install flower Once flower is installed, you can start it by running the following command in a new terminal window: shell flower --app=myapp Replace myapp with the name of your Celery application.

After starting flower, open your web browser and navigate to http://localhost:5555 to access the flower user interface. Here, you can view task details, monitor active and completed tasks, and access various statistics and metrics.

Conclusion

In this tutorial, we learned how to use Celery, a distributed task processing library, in Python. We covered the installation of Celery, setting up a Celery project, defining tasks, running Celery workers, and calling tasks from Python code. We also explored monitoring and managing tasks using the flower tool.

Celery is a powerful tool for processing tasks asynchronously and distributing them across multiple workers. It can greatly improve the scalability and performance of your Python applications by offloading computationally intensive or time-consuming operations.

We encourage you to experiment with Celery and explore its advanced features, such as task scheduling, retries, and task prioritization. By leveraging the capabilities of Celery, you can build robust and efficient distributed systems in Python.

Remember to refer to the official Celery documentation for more detailed information and guidelines on using Celery in your projects. Happy task processing!