This has been explained in Celery docs.
And we bumped into this issue after rolling out django-q into production recently. Let say we have a function like this:-
@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
msg = Message.objects.get_or_create(msgid=msgid)
if queue:
async('send_msg', recipient, text, msg.msgid, queue=False)
msg.status = 'QUEUED'
msg.save()
return msg
# pass msg to backend gateway
send_to_gateway(recipient, msg, ....)
Once we had the qcluster running, we start seeing "Duplicate entry error" in the log. Look like when we call async(), the cluster picked it up before the transaction get committed, so get_or_create() there will create new object with the same msgid instead of getting an existing one. So we fix that using transaction.on_commit()
in django 1.9.
@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
msg = Message.objects.get_or_create(msgid=msgid)
if queue:
def _add_to_queue():
async('send_msg', recipient, text, msg.msgid, queue=False)
msg.status = 'QUEUED'
msg.save()
transaction.on_commit(_add_to_queue)
return msg
# pass msg to backend gateway
send_to_gateway(recipient, msg, ....)
We use SQS but it doesn't matter what broker you use. It's depend whether your task depend on something in db. In our case above, the flow basically:-
Web request
- User send message
- We create a record in the db to represent the sent message with unique id
- async() the function to send the message, with the message id as one of the arguments.
- commit the transaction.
Queue worker
- The send message function invoked from qcluster
- Try to get the message by message id or create new one if doesn't exists.
- Send the message and update status as SENT.
- Commit transaction.
This issue happened between no. 3 in web request and no. 2 in the queue worker. Basically the rule of thumb if you're using transaction, async should be called from the outside of the transaction, not inside.
Originally posted at issue on Github.
Top comments (3)
What if instead of hooking to the transaction callbacks you send the message on the queue with a signal on
post_save
?This way if there is an error the message won't be saved or queued, if the message is saved in the database you can be sure that it is sent to the queue after being saved
Hmm, that make sense. I'll look back into our code and see if that feasible. Thanks for the suggestion !
Ok, on a second thought, I try to avoid using signal as it make code hard to follow.
He has a point, thanks for telling me! I'll consider it :-)