loading...

First steps with Streammachine.io

bvdeenen profile image Bart van Deenen Updated on ・6 min read

This post aims to show some first steps with the Stream Machine platform. The target audience is developers, data engineers and possibly data scientists. This post uses Python, but it can similarly be done with Stream Machine's Java driver.

About me: I'm Bart van Deenen, lead engineer of Stream Machine, so I should sort of know what I'm talking about :-). This post is meant to be unbiased, and from the perspective of the developer.

Stream Machine

Stream Machine promises to provide Lightning fast, privacy secured, customer data - you can actually use.

So what does this actually mean? It means that Stream Machine is a system that...

  • ...accepts events with a strictly defined serialization format (currently Apache Avro and Json-Schema are supported). Any valid schema can be used, but needs to be registered by Stream Machine.
  • ...events refer to a meta-schema (the so-called schema definition) that defines which fields in the event schema (the so-called serialization schema) contain Personally Identifiable Information (for convenience referred to as PII or PII Data).
  • ...events refer to customizable validation rules that define field value validity.
  • ...the events are processed with a highly available fault tolerant stream processing system that encrypts all PII field values. The encryption keys are rotated every 24 hours, and this leads to a GDPR compliant stream of event data that can be used by everyone in your company. During the 24 hours, the encrypted values remain static.
  • ...each event contains consent-level information, and only those events that allow decryption of PII data for certain purposes will be decrypted into decrypted stream(s) that can only be used by those inside your company that are allowed.

This post uses a debugging output of the stream data that uses server-sent events. Production level output streams require hooking up our internal Apache Kafka streams. AWS S3 and Google Cloud Storage buckets can be used for batch processing. This will be explored in a next blog post.

The plan

I'm going to build a Python application that mimics users clicking around on a dummy web-shop, that will send a click stream to Stream Machine. I want to retrieve the anonymized data from a Google Cloud bucket, and show them in a Jupyter notebook. I also want to see that only for those simulated users that have given full personalized marketing permissions I retrieve their click stream events. This first post just gets the basics working, i.e. sending events to Stream Machine, and retrieving them.

The steps

An account

I went to streammachine.io to register an account, and after confirming my email, I was shown this page:

login

Let's create a new stream

OK, I'm going to create a stream named clickies. It turns out there's currently just one schema, named 'clickstream', so let's pick that. Once you click the view credentials button, you get a Stream Machine credentials json. It's important to store this in a file somewhere, otherwise you can't get at your data.

credentials.json

{
  "IN": {
    "billingId": "bart-strm5",
    "clientId": "....",
    "secret": "..."
  },
  "OUT": {
    "0": {
      "consentLevels": [
        0
      ],
      "clientId": ".....",
      "secret": "......"
    },
    "1": {
      "consentLevels": [
        1
      ],
      "clientId": "....",
      "secret": "........."
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The clientId and secret fields above are your secrets, that the Stream Machine driver uses to authenticate in order to retrieve a JWT. This is used for each subsequent communication with the Stream Machine endpoints. For the remainder, I assume you've saved this in a file named credentials.json in your working directory.

Let's send an event!

I'm following along with the Python example in the documentation. I'm going to use the syncsender because I want to play with it in ipython.

python3 -m venv venv
. venv/bin/activate
pip install ipython streammachine-driver streammachine-schemas-catalog-clickstream-avro
pip freeze | grep streammachine
  streammachine-avro==0.0.1
  streammachine-driver==0.0.8
  streammachine-schemas-catalog-clickstream-avro==0.0.3
  streammachine-schemas-common==0.0.3
Enter fullscreen mode Exit fullscreen mode

Ok, let's start ipython

$> ipython
Python 3.9.0 (default, Oct 10 2020, 14:22:51)

# import a class that matches the structure of the clickstream schema
from clickstream.io.streammachine.schemas.strmcatalog.clickstream \
    import ClickstreamEvent
from streammachine.driver import StreamMachineEvent, current_time_millis
from streammachine.driver.client.syncsender import SyncSender
event = ClickstreamEvent()
import json
creds = json.load(open("credentials.json"))
sender = SyncSender(creds['IN']['billingId'], creds['IN']['clientId'], creds['IN']['secret'])
sender.start()
sender.wait_ready()
sender.send_event(event)
Enter fullscreen mode Exit fullscreen mode

After the last line I got an html error which said basically 'no schema' and a 500 statuscode.

Let's dig a little deeper in the example. Ah, I need to set the name of the schema inside the event:

event.strmMeta.schemaId = "clickstream"
sender.send_event(event)
# Error while sending event to Stream Machine (https://in.strm.services/event),
#    response status = 400, response:
#    regex clickstream/customer/id '' doesnt match '^.+$'
Enter fullscreen mode Exit fullscreen mode

Nice, I'm talking to Stream Machine! So the customer needs at least one character? Weird...

event.customer.id="hi there"
sender.send_event(event)

# Error while sending event to Stream Machine (https://in.strm.services/event),
#           response status = 400, response: regex
#           clickstream/url '' doesnt match '^(https?|ftp|file)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]'
Enter fullscreen mode Exit fullscreen mode

So how come? This is because the Stream Machine endpoint does not just validate the form of the event data, but also the contents. This is defined in the schema (in this case the clickstream schema). Here you see the various validations that are applied to the event after deserialization.

So let's make a valid event from the example:

from random import randint

def create_avro_event(sessionid) -> StreamMachineEvent:
    event = ClickstreamEvent()
    event.abTests = ["abc"]
    event.eventType = "button x clicked"
    event.customer.id = "integration-test"
    event.referrer = "https://www.streammachine.io"
    event.userAgent = "Mozilla/5.0"
    event.producerSessionId = sessionid
    event.conversion = 1

    event.strmMeta.timestamp = current_time_millis()
    event.strmMeta.schemaId = "clickstream"
    event.strmMeta.nonce = 0
    event.strmMeta.consentLevels = [0, 1, 2]
    event.url = "https://portal.streammachine.io"
    return event

sessionid = f"session-{randint(1000,10000)}"
event = create_avro_event(sessionid)

r = sender.send_event(event)
print(r)
None
Enter fullscreen mode Exit fullscreen mode

Oh well, it's not complaining, so let's see about the output. For this we use a server-sent events endpoint that will provide JSON output of the event. Note that this is not suitable for production loads! It's merely intended for testing and debugging purposes. In another terminal in the same directory:

import asyncio, json, logging, sys
from streammachine.driver import StreamMachineClient, ClientConfig

logging.basicConfig(stream=sys.stderr)
creds = json.load(open("credentials.json"))

async def event_handler(event):
    print(json.loads(event))

async def main():
    config = ClientConfig(log_level=logging.DEBUG)
    client = StreamMachineClient(creds['IN']['billingId'], creds['IN']['clientId'],
        creds['IN']['secret'], config)
    await client.start_timers()
    await client.start_receiving_sse(True, event_handler)
asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Ok back to the first terminal, and send an event:

sender.send_event(create_avro_event())
Enter fullscreen mode Exit fullscreen mode

Hmmm, nothing happens. And again! Well it turns out, that the receiver uses server-sent events, and there's some batching going one somewhere in a cloud loadbalancer.
That's easy to solve. Run it in a loop, and you get a lot of data in your receiver!

import time
while True:
   sender.send_event(create_avro_event(sessionid))
   time.sleep(0.2)
Enter fullscreen mode Exit fullscreen mode

You'll get an occasional warning in the output, there are still some improvements to be made with the Python asyncio client.

Looking at one event in the receiving terminal, I get this:

{
  "strmMeta": {
    "schemaId": "clickstream",
    "nonce": 190417502,
    "timestamp": 1603288736020,
    "keyLink": 44384987,
    "billingId": "bart-strm5",
    "consentLevels": [
      0,
      1,
      2
    ]
  },
  "producerSessionId": "AS7qq8aWID2mAnEISXK4Qwz+JhykZK3xGEjYu7oI+A==",
  "url": "https://portal.streammachine.io",
  "eventType": "button x clicked",
  "referrer": "https://www.streammachine.io",
  "userAgent": "Mozilla/5.0",
  "conversion": 1,
  "customer": {
    "id": "AS7qq8YNkAEw2r4NuqJTEdoc/xCIUya1wzu/djO3XPbN6qcRNA=="
  },
  "abTests": [
    "abc"
  ]
}
Enter fullscreen mode Exit fullscreen mode

Just looking at it, I can see that producerSessionId and customer/id seem to have been encrypted, which fits the clickstream definition. Because the event we create has maximum consentLevel included, we should be able to get a decrypted stream as well.

Let's modify the receiver handler:

async def main():
    config = ClientConfig(log_level=logging.DEBUG)
    client = StreamMachineClient(creds['IN']['billingId'], creds['OUT']['1']['clientId'],
      creds['OUT']['1']['secret'], config)
    await client.start_timers()
    await client.start_receiving_sse(True, event_handler)
asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Start sending data again and we get decrypted events

{
  "strmMeta": {
    "schemaId": "clickstream",
    "nonce": 190417502,
    "timestamp": 1603288736020,
    "keyLink": 44384987,
    "billingId": "bart-strm5",
    "consentLevels": [
      0,
      1,
      2
    ]
  },
  "producerSessionId": "session-432",
  "url": "https://portal.streammachine.io",
  "eventType": "button x clicked",
  "referrer": "https://www.streammachine.io",
  "userAgent": "Mozilla/5.0",
  "conversion": 1,
  "customer": {
    "id": "integration-test"
  },
  "abTests": [
    "abc"
  ]
}
Enter fullscreen mode Exit fullscreen mode

Conclusions

We can send data into Stream Machine, and receive them privacy safe and of guaranteed quality.

The next post in this series will show how to get all these data into a Google Cloud bucket, and how to use those data in a Jupyter notebook.

Discussion

pic
Editor guide