Some days you win. Other days you have to process a continuous stream with thousands of incoming events per second from a PubSub subscription as quickly as possible. Even if your transformations are simple it is very likely that your business logic depends on some external and dynamic data. Let me show you a way how to efficiently cache these values and save you from hours of debugging by uncovering a bug in the Python library of Apache Beam itself.
If your configuration comes from an environment variable that changes once a year you can get away with updating your Dataflow every time but keep in mind that is usually takes a lot of time to execute by draining your existing pipelines and starting new ones.
If it’s more dynamic and can change every minute or so - for example you want to filter messages based on a criteria that a user can configure through UI - instead of using an env variable you will probably fetch a value from an external resource (e.g. a file from a bucket, a database query or an API call, you name it...) that you want to cache for a certain amount of time to prevent hammering your attached resource.
The naive implementation
Pipelines in Apache Beam are self-contained and isolated from each other, so they can be safely executed concurrently. But it also means that they can’t rely on shared state.
Depending on the load the Dataflow runner may start 1 to 10 worker processes and those workers can spawn hundreds or thousands of threads independent from each other. With a naive implementation each thread will use their own cache pool so even with a TTL high as 60 sec you can end up with ~10.000 API calls per minute. While in certain cases it’s better than making a call for every PubSub message it’s still far from ideal.
Shared handle in Python
Shared
is a serializable object that can be shared by all threads of each worker process, and an instance of it returns Shared handle
that can be used in your transform functions. This handle encapsulates a weak reference to a singleton object which will be our shared cache in form of a dict
and can be accessed through the handle’s acquire
function.
Please note, that most built-in types in Python cannot be weak referenced, but a dict
can be easily made to support it through subclassing:
class _SideInputContainer(dict):
pass
Watch out for a bug in Apache Beam versions <= 2.35.0
According to the documentation the acquire
function accepts a constructor_fn
and a tag
parameter. The first one defines how you want to load your data on a cache miss and the tag defines an identifier you want use in subsequent calls to access the cached object.
Whenever the tag changes the cached data will be reloaded, which can ultimately serve as our TTL by writing a function that returns a different identifier every 60 seconds:
CONFIG_TTL_IN_SECONDS = 60
# Dummy (but usable!) implementation :)
def get_cache_key():
return str(int(time.time() / CONFIG_TTL_IN_SECONDS))
You might think that writing
return shared_handle.acquire(load_sideinput, get_cache_key())
will do the job, but due to a bug in versions prior to this commit the tag parameter will be ignored. The cached object is going to be reloaded even if you provide the same identifier, rendering the whole mechanism useless and our transformation will hit our attached resources every time.
The workaround
The forward and backward compatible solution is to store this identifier on the cached dict
itself, manually check if a reload is needed and call acquire
again with a new tag.
class _SideInputContainer(dict):
pass
def get_sideinput(shared_handle):
def get_cache_key():
return str(int(time.time() / 60))
def load_sideinput():
sideinput_container = _SideInputContainer()
rawJson = # API call that returns a JSON that you want to cache
sideinput_container["cached_response"] = json.loads(rawJson)
sideinput_container["cache_key"] = get_cache_key()
return sideinput_container
sideinput_data = shared_handle.acquire(load_sideinput)
current_cache_key = get_cache_key()
if sideinput_data["cache_key"] != current_cache_key:
sideinput_data = shared_handle.acquire(load_sideinput, current_cache_key)
return sideinput_data
As always your likes and feedbacks are very much appreciated!
Top comments (2)
"object that can be shared by all threads of each worker process", in latest 2.41 version, looks like that's not the case, the object is different from other thread, something changed lately?
I'm not sure about the change, the documentation for 2.41 still says that
Shared is a helper class for managing a single instance of an object shared by multiple threads within the same process.