DEV Community

Márton Papp
Márton Papp

Posted on

Beginners Guide to Caching Inside an Apache Beam Dataflow Streaming Pipeline Using Python

Some days you win. Other days you have to process a continuous stream with thousands of incoming events per second from a PubSub subscription as quickly as possible. Even if your transformations are simple it is very likely that your business logic depends on some external and dynamic data. Let me show you a way how to efficiently cache these values and save you from hours of debugging by uncovering a bug in the Python library of Apache Beam itself.

If your configuration comes from an environment variable that changes once a year you can get away with updating your Dataflow every time but keep in mind that is usually takes a lot of time to execute by draining your existing pipelines and starting new ones.

If it’s more dynamic and can change every minute or so - for example you want to filter messages based on a criteria that a user can configure through UI - instead of using an env variable you will probably fetch a value from an external resource (e.g. a file from a bucket, a database query or an API call, you name it...) that you want to cache for a certain amount of time to prevent hammering your attached resource.

The naive implementation

Pipelines in Apache Beam are self-contained and isolated from each other, so they can be safely executed concurrently. But it also means that they can’t rely on shared state.

Depending on the load the Dataflow runner may start 1 to 10 worker processes and those workers can spawn hundreds or thousands of threads independent from each other. With a naive implementation each thread will use their own cache pool so even with a TTL high as 60 sec you can end up with ~10.000 API calls per minute. While in certain cases it’s better than making a call for every PubSub message it’s still far from ideal.

Shared handle in Python

Shared is a serializable object that can be shared by all threads of each worker process, and an instance of it returns Shared handle that can be used in your transform functions. This handle encapsulates a weak reference to a singleton object which will be our shared cache in form of a dict and can be accessed through the handle’s acquire function.

Please note, that most built-in types in Python cannot be weak referenced, but a dict can be easily made to support it through subclassing:

class _SideInputContainer(dict):
Enter fullscreen mode Exit fullscreen mode

Watch out for a bug in Apache Beam versions <= 2.35.0

According to the documentation the acquire function accepts a constructor_fn and a tag parameter. The first one defines how you want to load your data on a cache miss and the tag defines an identifier you want use in subsequent calls to access the cached object.

Whenever the tag changes the cached data will be reloaded, which can ultimately serve as our TTL by writing a function that returns a different identifier every 60 seconds:

# Dummy (but usable!) implementation :)
def get_cache_key():
    return str(int(time.time() / CONFIG_TTL_IN_SECONDS))
Enter fullscreen mode Exit fullscreen mode

You might think that writing

return shared_handle.acquire(load_sideinput, get_cache_key())
Enter fullscreen mode Exit fullscreen mode

will do the job, but due to a bug in versions prior to this commit the tag parameter will be ignored. The cached object is going to be reloaded even if you provide the same identifier, rendering the whole mechanism useless and our transformation will hit our attached resources every time.

The workaround

The forward and backward compatible solution is to store this identifier on the cached dict itself, manually check if a reload is needed and call acquire again with a new tag.

class _SideInputContainer(dict):

def get_sideinput(shared_handle):
    def get_cache_key():
        return str(int(time.time() / 60))

    def load_sideinput():
        sideinput_container = _SideInputContainer()
        rawJson = # API call that returns a JSON that you want to cache
        sideinput_container["cached_response"] = json.loads(rawJson)
        sideinput_container["cache_key"] = get_cache_key()
        return sideinput_container

    sideinput_data = shared_handle.acquire(load_sideinput)
    current_cache_key = get_cache_key()
    if sideinput_data["cache_key"] != current_cache_key:
        sideinput_data = shared_handle.acquire(load_sideinput, current_cache_key)
    return sideinput_data
Enter fullscreen mode Exit fullscreen mode

As always your likes and feedbacks are very much appreciated!

Latest comments (2)

ddst0111 profile image

"object that can be shared by all threads of each worker process", in latest 2.41 version, looks like that's not the case, the object is different from other thread, something changed lately?

morz profile image
Márton Papp

I'm not sure about the change, the documentation for 2.41 still says that Shared is a helper class for managing a single instance of an object shared by multiple threads within the same process.