Mock a SQS queue with moto

drewmullen ・2 min read

Below is simple example code to mock the aws sqs api. This can be useful for development so you dont have to actually manage a real SQS queue / connectivity / IAM permissions until you're ready.

A simple function to write a message to a preexisting queue

QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')
def write_message(data):
    sqs = boto3.client('sqs', region_name = 'us-east-1')
    r = sqs.send_message(
        MessageBody = json.dumps(data),
        QueueUrl = QUEUE_URL
Pytest fixtures to mock up the aws sqs API. aws_credentials() also ensures that your pytest functions will not actually write to aws resources.

def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
    os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
    os.environ['AWS_SECURITY_TOKEN'] = 'testing'
    os.environ['AWS_SESSION_TOKEN'] = 'testing'

def sqs_client(aws_credentials):
    # setup
    with mock_sqs():
        yield boto3.client('sqs', region_name=REGION)
    # teardown
An example test function. Create a queue using the mock client from (notice sqs_client parameter matches the conftest function name sqs_client), invoke your python module function app.write_message(). Validate the returned message matches what you sent

def test_write_message(sqs_client):
    queue = sqs_client.create_queue(QueueName='test-msg-sender')
    queue_url = queue['QueueUrl']
    # override function global URL variable
    app.QUEUE_URL = queue_url
    expected_msg = str({'msg':f'this is a test'})
    sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)

    assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
In case you wanted to see my file structure:

├── requirements.txt
├── requirements_dev.txt
└── tests
    └── unit
Thank you

kudos to Arunkumar Muralidharan who got me started

