Learning Celery

Background task

Maintaining background tasks specially in python django is damn tough, so we to think spawning some daemon threads and getting that solved is very tough .. so rather we should use celery that starts a complete new process and doesnt maintain any-thing inside main process

Celery

What is the use of celery ? and its comparisons

asyncio.create_task() : This creates and executes task in event loop ( efficient management of event loop )

celery : multiprocessing, creates multiple processes and tasks to run on CPU , that means

Task Queue : Input is a task , Worker processes constantly monitor for new tasks in the task queue … (how do they monitor that task )

Broker : is the intermediary between clients and workers, where clients push there task and from where workers are allocated tasks , eg ; redis broker or rabitmq broker

Producer.py

# producer.py

import redis
import json
import uuid

# Connect to Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def send_task(task_name, *args):
    task = {
        "id": str(uuid.uuid4()),  # Unique task ID
        "name": task_name,
        "args": args
    }
    redis_client.rpush("task_queue", json.dumps(task))  # Push task to Redis
    print(f"Task {task['id']} sent: {task}")

# Example usage
send_task("add", 4, 6)
send_task("multiply", 3, 7)

Worker.py

# worker.py

import redis

redis_client = redis.Redis(host ='localhost' , port = 6379 , db =0)

TASKS = {
    "add": add,
    "multiply": multiply
}

def main():
  while(True):
    json_tasks = redis.blpop('task_queue' , timeout =5) # block when no element to pop
    if json_tasks:
      _, json_task = json_tasks
      task = json.load(json_task)
      func = TASKS.get(task['name'])
      if func:
          result = func(*task["args"])  # Execute function
          redis_client.set(f"result:{task['id']}", json.dumps({"status": "success", "result": result}))
          print(f"[Worker {worker_id}] Task {task['id']} completed. Result: {result}")
      else:
          print(f"[Worker {worker_id}] Unknown task: {task['name']}")

      time.sleep(1)

if __name__ == "__main__":
  main()


Completion fetcher


output = redis_client.get("result:{task['id']}")

Written on March 29, 2025