1 year ago

#363013

test-img

giada

Dynamic queues in Celery conditional routing

I have around 1500K URLs to GET every day. If the HTTP request times out, I repeat it up to two times. At the end of the procedure, I run a simple processing task on the body and then store the result in a Postgres instance.

I'm new to celery. I implemented one task to perform the HTTP request and one task to take care of the INSERT in the DB (although I'm not sure this is the best way to deal with it). For the first one, I considered using dynamic tasks, as specified here. For performance reasons, I'd like to have first-time requests in the requests queue, timed-out requests in the timeouts queue, and successful tasks, which need to be INSERTed, in the persist queue.

First I declare the Exchanges, Queues and the routes in celeryconfig.py:

from kombu import Exchange, Queue

# declare here Redis backend, RabbitMQ broker
# declare queue, exchange and routing key variables...

task_queues = (
    Queue(default_queue_name, default_exchange, routing_key=default_routing_key),
    Queue(requests_queue_name, default_exchange, routing_key=requests_routing_key),
    Queue(timeouts_queue_name, timeouts_exchange, routing_key=timeouts_routing_key),
    Queue(persist_queue_name, default_exchange, routing_key=persist_routing_key)
)

task_default_queue = default_queue_name
task_default_exchange = default_exchange_name
task_default_routing_key = default_routing_key

task_routes = [
    ('playground.to_db', {
        'queue': persist_queue_name,
        'routing_key': persist_routing_key
    }),
]

Then, the tasks look more or less like this:

@celery.dynamic_task
def request(url, counter):
    # perform GET request
    # suppose we store in 'timeout' whether the request timed out
    if timeout and counter < 2:
        request.si(url, counter + 1).set(queue='timeout')
    else:
        return data

@app.task
def to_db(data):
    # persist

The main routine:

tasks_group = []
for url in all_urls:
    tasks_group.append(
        request.s(url, 0).set(queue='requests') | 
        to_db.s().set(queue='persist')
    )
group(tasks_group).delay().get()

I then run celery in three separate terminals, each of them with a different -Q parameter (e.g., celery -A playground worker -Q persist -l debug), but it seems that the tasks all end up in the requests queue.

  1. Is set(queue='X') the right way of specifying queues for tasks?
  2. Is it possible to assign the same task to different queues dynamically? I.e., the request task first goes in the requests queue and then in the timeouts one.
  3. Should I avoid having tasks inserting data in a separate DB?

python

celery

celery-task

0 Answers

Your Answer

Accepted video resources