Sometimes you just want parallelism
Learn how parallelism can dramatically improve performance when processing messages and storing data in S3, with Python code examples.

Background
So at the enterprise I am working with, we are consuming messages from Confluent Cloud topics. These messages should end up in S3 to be available for further processing. Due to enterprise regulations, (network technicalities, but also allowed services) we can not consume messages from Confluent Cloud via serverless options like EventBridge Pipes. Also, we can not use managed solutions provided by Confluent Cloud (SAAS) such as the s3 sink connectors, due to our network setup.
So long story short, we are back to using my good old friend EC2 with a custom consumer Python script.
To make it more robust on the EC2 instance, we have created the EC2 instance inside an AutoScaling group. This makes sure when a glitch in the Matrix occurs, the EC2 will be recreated and starts running the consumer Python script again.
Real-world scenario
When we scaled up the number of messages on the topics, we noticed that our consumer Python script on the EC2 instance was running behind with processing messages. The so-called consumer lag.
After looking into the issue, the problem was found and pinpointed to:
- The number of messages, quantity
- The API call length of storing an object in S3 (put_object)
So how can we fix this? Well almost all services in AWS do some type of scaling. Is it horizontally or vertically, often there is a solution available.
But looking into AutoScaling, vertically or horizontally, unfortunately, does not cut it for the consumer Python script. AutoScaling gives us problems with maintaining the states of the consumer, the so-called offset.
So fixing and tackling the number of messages for now is not possible. This got us looking into problem number two. We will try to optimize the API call length which is the main reason our consumer is lagging.
Performance testing
So how can we optimise this API behaviour? Well, first we need numbers. So let us create a test which creates 1000 unique JSON messages and store these messages in S3.
import boto3
import json
import time
s3 = boto3.client("s3", region_name="eu-central-1")
bucket_name = "my_test_bucket"
s3_start_time = time.time()
for i in range(1000):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
object_key = f"message_{i}.json"
object_body = json.dumps(message_body)
s3.put_object(Bucket=bucket_name, Key=object_key, Body=object_body)
s3_end_time = time.time()
s3_elapsed_time = s3_end_time - s3_start_time
print(f"Elapsed time S3 copy: {s3_elapsed_time:.2f} seconds")Running the above Python code results in 73 seconds of elapsed time. So this is our baseline to beat.
What about SQS? Let us test how fast SQS is for accepting messages:
sqs = boto3.resource("sqs", region_name="eu-central-1")
queue_name = "my_test_sqs_queue"
queue = sqs.get_queue_by_name(QueueName=queue_name)
sqs_start_time = time.time()
for i in range(1000):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
response = queue.send_message(MessageBody=json.dumps(message_body))
sqs_end_time = time.time()
sqs_elapsed_time = sqs_end_time - sqs_start_time
print(f"Elapsed time SQS: {sqs_elapsed_time:.2f} seconds")Results:
Elapsed time SQS: 34.14 seconds
Elapsed time S3 copy: 73.57 seconds
S3 time: 73.57 is 215.51% slower than SQS time: 34.14Wow, that is a difference of 215%. But still no real excited milliseconds latency per message here. So how can we optimize some more? Well, what about parallelism?
Parallelism is according to Wikipedia:
Parallel computing is a type of computation in which many calculations or processes are carried out simultaneously
So what if we store the objects in S3 again but use parallelism to store items in parallel instead of sequential?
import boto3
import json
import time
import concurrent.futures
s3 = boto3.client("s3", region_name="eu-central-1")
bucket_name = "my_test_bucket"
def upload_message(i):
message_body = {"message_id": i, "message_content": f"This is message {i}"}
object_key = f"message_{i}.json"
object_body = json.dumps(message_body)
s3.put_object(Bucket=bucket_name, Key=object_key, Body=object_body)
s3_parallelism_start_time = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for i in range(1000):
futures.append(executor.submit(upload_message, i))
s3_parallelism_end_time = time.time()
s3_parallelism_elapsed_time = s3_parallelism_end_time - s3_parallelism_start_time
print(f"Elapsed time S3_parallelism: {s3_parallelism_elapsed_time:.2f} seconds")The result
So running the Python test_performance.py script with parallelism gives us the following results:
Elapsed time S3_parallelism: 4.93 seconds
Elapsed time SQS_parallelism: 2.26 seconds
S3 time: 4.93 is 218.22% slower than SQS time: 2.26Compared to non-parallelism the difference between SQS and S3 stays around 220% difference. But the difference between non-parallelism and parallelism is 10x faster with parallelism-enabled code.
Is this the final solution for our consumer Python script? No, it is not. There is more code optimisation possible. But for now, a simple change, by adding parallelism to our put_objects will make our code 10x faster and will be sufficient to not get consumer lag.
Have questions about Python performance or AWS? Find me on Twitter or LinkedIn.