DEV Community

Kamal Mustafa
Kamal Mustafa

Posted on

Django: Fixing race condition when queuing with on_commit hook

This has been explained in Celery docs.

And we bumped into this issue after rolling out django-q into production recently. Let say we have a function like this:-

@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
    msg = Message.objects.get_or_create(msgid=msgid)
    if queue:
        async('send_msg', recipient, text, msg.msgid, queue=False)
        msg.status = 'QUEUED'
        msg.save()
        return msg

    # pass msg to backend gateway
    send_to_gateway(recipient, msg, ....)

Once we had the qcluster running, we start seeing "Duplicate entry error" in the log. Look like when we call async(), the cluster picked it up before the transaction get committed, so get_or_create() there will create new object with the same msgid instead of getting an existing one. So we fix that using transaction.on_commit() in django 1.9.

@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
    msg = Message.objects.get_or_create(msgid=msgid)
    if queue:
        def _add_to_queue():
            async('send_msg', recipient, text, msg.msgid, queue=False)

        msg.status = 'QUEUED'
        msg.save()
        transaction.on_commit(_add_to_queue)
        return msg

    # pass msg to backend gateway
    send_to_gateway(recipient, msg, ....)

We use SQS but it doesn't matter what broker you use. It's depend whether your task depend on something in db. In our case above, the flow basically:-

Web request

  1. User send message
  2. We create a record in the db to represent the sent message with unique id
  3. async() the function to send the message, with the message id as one of the arguments.
  4. commit the transaction.

Queue worker

  1. The send message function invoked from qcluster
  2. Try to get the message by message id or create new one if doesn't exists.
  3. Send the message and update status as SENT.
  4. Commit transaction.

This issue happened between no. 3 in web request and no. 2 in the queue worker. Basically the rule of thumb if you're using transaction, async should be called from the outside of the transaction, not inside.


Originally posted at issue on Github.

Top comments (3)

Collapse
 
rhymes profile image
rhymes

What if instead of hooking to the transaction callbacks you send the message on the queue with a signal on post_save ?

This way if there is an error the message won't be saved or queued, if the message is saved in the database you can be sure that it is sent to the queue after being saved

Collapse
 
k4ml profile image
Kamal Mustafa

Hmm, that make sense. I'll look back into our code and see if that feasible. Thanks for the suggestion !

Ok, on a second thought, I try to avoid using signal as it make code hard to follow.

Collapse
 
rhymes profile image
rhymes

He has a point, thanks for telling me! I'll consider it :-)