DEV Community

Bart van Deenen
Bart van Deenen

Posted on

Streammachine export to Pandas

This post is a second in a series of posts on how to use the Stream Machine platform. The first post is about getting started.

This time, I'll look at how to get Stream Machine events into a Google Cloud Storage Bucket and from there into a Pandas dataframe. Let's get started.

I assume you have a Google Cloud Bucket somewhere, and you have created a service account.

gcloud permissions

  • IAM -> Service Accounts -> Create Service Account

The post assumes you've stored the Service Account credentials in a file credentials.json in the working directory. The file contents are something like this:

{
  "type": "service_account",
  "project_id": "stream-machine-development",
  "private_key_id": "bae39......d6efda",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG......3kS+0=\n-----END PRIVATE KEY-----\n",
  ...
}
Enter fullscreen mode Exit fullscreen mode

Creating a sink and an exporter

In order to store event data in a cloud bucket, you need to

  • configure the bucket (name, credentials and path) in a so-called sink entity.
  • create a batch-exporter from a stream to that sink.
strm create sink demo strm-demo --credentials-file=credentials.json
{
  "ref": { "billingId": "strmbart5986941267", "name": "demo" },
  "sinkType": "GCLOUD",
  "bucket": { "bucketName": "strm-demo", "credentials": "..." }
}

# batch exporter for the encrypted stream at path demo-in on the bucket
strm create batch-exporter demo --interval 30 --path-prefix demo-in --sink demo

# batch exporter for the decrypted level 2 stream at path demo-2 on the bucket
strm create batch-exporter demo-2 --interval 30 --sink demo --path-prefix demo-2
{
  "ref": {
    "billingId": "strmbart5986941267",
    "name": "demo-demo-2"
  },
  "streamRef": {
    "billingId": "strmbart5986941267",
    "name": "demo-2"
  },
  "interval": "30s",
  "sinkName": "demo",
  "pathPrefix": "demo-2"
}
Enter fullscreen mode Exit fullscreen mode

Start sending some data

The cli has a simulator on board. Use version 1.4.0 of the cli, otherwise you'll get different simulated events.

strm sim run-random demo --interval 100
Starting to simulate random streammachine/demo/1.0.2 events to stream demo.
Sending one event every 100 ms.
Sent 50 events
Sent 100 events
Sent 150 events
Sent 200 events
...
Enter fullscreen mode Exit fullscreen mode

We should see some data in our bucket:

gsutil ls gs://strm-demo/demo-in | head -5
gs://strm-demo/demo-in/2021-08-12T09:38:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T09:38:30-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T11:09:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T11:09:30-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl
gs://strm-demo/demo-in/2021-08-12T11:10:00-stream-69c9bbcd-98c2-40e6-b041-fe1af8498752---0-1-2-3-4.jsonl

gsutil ls gs://strm-demo/demo-2 | head -5
gs://strm-demo/demo-2/2021-08-12T09:38:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T09:38:30-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T09:39:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T11:09:00-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
gs://strm-demo/demo-2/2021-08-12T11:09:30-stream-4932a907-1972-4982-8018-45a5cd648003---0-1-2-3-4.jsonl
Enter fullscreen mode Exit fullscreen mode

and in one of the files we can see JSON events, newline based (for readability I've formatted an example JSON event below).

gsutil cat gs://strm-demo/demo-in/.....jsonl | head -1

{
  "strmMeta": {
    "eventContractRef": "streammachine/example/1.3.0",
    "nonce": -1557054268,
    "timestamp": 1628761079144,
    "keyLink": "1d960e7a-4169-4bcf-bece-e8fcc3243c06",
    "billingId": "strmbart5986941267",
    "consentLevels": [ 0, 1 ]
  },
  "uniqueIdentifier": "AQq8Ihq3DBOahkZNXpBfdky8m04pb6c02RIUNOHo",
  "consistentValue": "AQq8IhqopLqWCpIDd1+xZUw/KCtXObL7irK5NbgE1I4=",
  "someSensitiveValue": "AQq8Ihq62mxCv1fqEZ+0bcijMEFZ/VFnpA4EEs8XRp0P",
  "notSensitiveValue": "not-sensitive-30"
}

Enter fullscreen mode Exit fullscreen mode

The demo-2 directory will contain decrypted data.

gsutil cat gs://strm-demo/demo-2/.....jsonl | head -1
{
  "strmMeta": {
    "eventContractRef": "streammachine/example/1.3.0",
    "nonce": -62169113,
    "timestamp": 1628761077936,
    "keyLink": "1f118159-a27a-4468-a540-23a4938bce14",
    "billingId": "strmbart5986941267",
    "consentLevels": [ 0, 1, 2 ]
  },
  "uniqueIdentifier": "unique-25",
  "consistentValue": "session-488",
  "someSensitiveValue": "AXHCR9j0KLyYy7Bivvrk+xfU0D4pRJkIlHAE/PtvtsPx",
  "notSensitiveValue": "not-sensitive-19"
}
Enter fullscreen mode Exit fullscreen mode

To Jupyter and Pandas

The next step is to get these into a Pandas Dataframe in a Jupyter notebook. You need credentials.json that defines the Google Cloud Service Account credentials.

The essence of the Jupyter notebook is:

pip install jupyter gcsfs pandas
Enter fullscreen mode Exit fullscreen mode
import pandas as pd
from pandas import json_normalize
import gcsfs
import json

# set these to your own project and bucket
bucket = "strm-demo"
project = "stream-machine-development"
Enter fullscreen mode Exit fullscreen mode

Load the data

fs = gcsfs.GCSFileSystem(project=project, token="credentials.json")

def one_object_to_df(path):
    with fs.open(path, "r") as _f:
        df = json_normalize([json.loads(l) for l in _f.readlines()])
        df['strmMeta.timestamp'] = pd.to_datetime(df['strmMeta.timestamp'], unit='ms')
        df = df.set_index('strmMeta.timestamp').sort_index()
    return df

def make_df(folder):
    print(folder)
    df = None
    for f in fs.find(f"{bucket}/{folder}"):
        if df is None:
            df = one_object_to_df(f)
        else:
            df = df.append(one_object_to_df(f))
    return df

demo = make_df("demo-in")
demo_2 = make_df("demo-2")
Enter fullscreen mode Exit fullscreen mode

Show some data

demo.consistentValue.value_counts()
Enter fullscreen mode Exit fullscreen mode
AScCSOgnaoVZw2nqRtTSBlwV8pWe5R7SXXJcL1tXC5M=    55
AV3MqegMMQ3Bikr1klqdNN+X+6rykyNtftZizyRKA6U=    52
AQMdVpGbhAScpImKh1I5Lx1FLhb19N97/1reQhZd0ig=    50
AV4eJeE0kNrUw5svSpJTUryc+C4ZZ/zCdZL++VEgY6g=    48
AX2OWCN2zha+odokuX9rTDwOkM47lbNBGnMDPbJZieU=    48
                                                ..
AV8dQPY3mopfROkxlMDeLqcYAxA3qqbYG89J0SSEDio=    19
AVn0ykPCKcX5vtT07DjoV+tcTgUeLmknOytDWzcPAQ==    19
ATKFsJ5+fXeCnsLOc1k+IA+VSYtWR+wT7Iq/4IfwPjA=    19
AXRHjzm9RTN60ocpLNVgMvdW8mSEKKg0/fezhT/We78=    19
ARtzjnb15acL/xVvsS6dslv3M7A8WHUkCmy94LmK5g==    19
Name: consistentValue, Length: 1000, dtype: int64
Enter fullscreen mode Exit fullscreen mode

Top comments (0)