1 year ago
#363013
giada
Dynamic queues in Celery conditional routing
I have around 1500K URLs to GET every day. If the HTTP request times out, I repeat it up to two times. At the end of the procedure, I run a simple processing task on the body and then store the result in a Postgres instance.
I'm new to celery. I implemented one task to perform the HTTP request and one task to take care of the INSERT in the DB (although I'm not sure this is the best way to deal with it). For the first one, I considered using dynamic tasks, as specified here. For performance reasons, I'd like to have first-time requests in the requests
queue, timed-out requests in the timeouts
queue, and successful tasks, which need to be INSERTed, in the persist
queue.
First I declare the Exchanges, Queues and the routes in celeryconfig.py
:
from kombu import Exchange, Queue
# declare here Redis backend, RabbitMQ broker
# declare queue, exchange and routing key variables...
task_queues = (
Queue(default_queue_name, default_exchange, routing_key=default_routing_key),
Queue(requests_queue_name, default_exchange, routing_key=requests_routing_key),
Queue(timeouts_queue_name, timeouts_exchange, routing_key=timeouts_routing_key),
Queue(persist_queue_name, default_exchange, routing_key=persist_routing_key)
)
task_default_queue = default_queue_name
task_default_exchange = default_exchange_name
task_default_routing_key = default_routing_key
task_routes = [
('playground.to_db', {
'queue': persist_queue_name,
'routing_key': persist_routing_key
}),
]
Then, the tasks look more or less like this:
@celery.dynamic_task
def request(url, counter):
# perform GET request
# suppose we store in 'timeout' whether the request timed out
if timeout and counter < 2:
request.si(url, counter + 1).set(queue='timeout')
else:
return data
@app.task
def to_db(data):
# persist
The main routine:
tasks_group = []
for url in all_urls:
tasks_group.append(
request.s(url, 0).set(queue='requests') |
to_db.s().set(queue='persist')
)
group(tasks_group).delay().get()
I then run celery
in three separate terminals, each of them with a different -Q
parameter (e.g., celery -A playground worker -Q persist -l debug
), but it seems that the tasks all end up in the requests
queue.
- Is
set(queue='X')
the right way of specifying queues for tasks? - Is it possible to assign the same task to different queues dynamically? I.e., the
request
task first goes in therequests
queue and then in thetimeouts
one. - Should I avoid having tasks inserting data in a separate DB?
python
celery
celery-task
0 Answers
Your Answer