DEV Community

Cover image for Standing Queries: Turning Event-Driven Data Into Data-Driven Events
Michael Aglietti
Michael Aglietti

Posted on • Originally published at thatdot.com

Standing Queries: Turning Event-Driven Data Into Data-Driven Events

Quine's super power is the ability to store and execute business logic within the graph. That query can then operate directly on data as it streams in. We call this type of query a standing query.

A standing query incrementally matches some graph structure while new data is ingested into the graph. Quine’s special design makes this process extremely fast and efficient. When a full pattern match is found, a standing query takes action.

A standing query is defined in two parts: a pattern and an output. The pattern defines what we want to match, expressed in Cypher using the form MATCH … WHERE … RETURN …. The output defines the action(s) to take for each result produced by the RETURN in the pattern query.

The result of a standing query output is passed to a series of actions which process the output. This output can be logged, passed to other systems (via Kafka, Kinesis, HTTP POST, and more), or can even be used to perform additional actions like running new queries or even rewriting parts of the graph. Whatever logic your application needs.

How nodes match patterns

Each node in Quine is backed by an actor, which makes each graph node act like its own little CPU. Actors function as lightweight, single-threaded logical computation units that maintain state and communicate with each other by passing messages.

The actor model enables you to execute a standing query that is stored in the graph and remembered automatically. When you issue a DistinctId standing query, the query is broken into individual steps that can be tested one at a time on individual nodes. Quine stores the result of each successive decomposition of a query (smaller and smaller queries) internally on the node issuing that portion of the query. The previous node's query is essentially a subscription to the next nodes status as either matching the query or not.

Quine Streaming Graph Actor Model

Any changes in the next node’s pattern match state result in a notification to the querying node. In this way, a complex query is relayed through the graph, where each node subscribes to whether or not the next node fulfills its part of the query. When a complete match is made, or unmade, the chain is notified with results and an output action is triggered.

There are two pattern match modes DistinctId and MultipleValues

Creating a standing query

The first step to making a Standing Query is determining the graph pattern you want to watch for. You may have deployed Quine in your data pipeline to perform a series of tasks to isolate data, implement a specific feature, or monitor the stream to find a specific pattern in real time. In any case, Quine will implement your logic using Cypher. The recipe for this example is included in the Quine repo if you'd like to follow along.

Let's demonstrate this concept using Quine's built in synthetic data generator that was introduced in v1.3.0. Say that you have a need to establish the relationships between all numbers in a number line and any number that is divisible by 10 using integer division (where dividing always returns a whole number; the remainder is discarded).

ingestStreams:
  - format:
      query: |-
        WITH gen.node.from(toInteger($that)) AS n,
             toInteger($that) AS i
        MATCH (thisNode), (nextNode), (divNode) 
        WHERE id(thisNode) = id(n) 
          AND id(nextNode) = idFrom(i + 1) 
          AND id(divNode) = idFrom(i / 10) 
        SET this.i = i,
            this.prop = gen.string.from(i)
        CREATE (thisNode)-[:next]->(nextNode), 
               (thisNode)-[:div_by_ten]->(divNode)
      type: CypherLine
    type: NumberIteratorIngest
    ingestLimit: 100000
Enter fullscreen mode Exit fullscreen mode

Creates a graph with 100000 nodes and a shape that we can use for our example.

Graph Shape

In the example above, I want to count the unique times that a pattern like the one visualized above occurs in a sample of 100000 numbers. A key to our pattern is the existence of the "data" parameter in a node that is generated by the gen.string.from() function.

To detect a pattern in our data, we can write a Cypher query in the pattern section:

standingQueries:
  - pattern:
      query: |-
        MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)
        WHERE exists(c.prop)
        RETURN DISTINCT id(c) as id
      type: Cypher
    outputs:
      count-1000-results:
        type: Drop
Enter fullscreen mode Exit fullscreen mode

It is looking for a number which is the ten-divisor of another number which is also the ten-divisor of a number in the graph. That basically means it's looking for one of the first 1000 nodes created by our "number iterator" ingest.

❯ java -jar quine -r sq-test.yaml
Graph is ready
Running Recipe Standing Query Test Recipe
Using 1 node appearances
Using 11 quick queries 
Running Standing Query STANDING-1
Running Ingest Stream INGEST-1
Quine web server available at http://0.0.0.0:8080
INGEST-1 status is completed and ingested 100000

 | => STANDING-1 count 1000
Enter fullscreen mode Exit fullscreen mode

This example simply counts how many are detected, using the standing query output variant: type: Drop

Standing query result output

Say that instead of just counting the number of times that the pattern matches, we need to output the match for debugging or inspection. We can replace the Drop output with a CypherQuery that uses the matched result and then prints information to the console. When issuing a DistinctId standing query, the result of a match is a payload that looks like:

{
    "meta": {
        "isPositiveMatch": true,
        "resultId": "2a757517-1225-7fe2-0d0e-22625ad3be37"
    },
    "data": {
        "a.id": 45110,
        "a.prop": "YH32SISr",
        "b.id": 4511,
        "b.prop": "fqx8aVAU",
        "c.id": 451,
        "c.prop": "61mTZqH8"
    }
}
Enter fullscreen mode Exit fullscreen mode

This payload includes the ID of the node that initially matched in the data field. So We can write a new Cypher query to go fetch additional information triggered by this match:

MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)
WHERE id(c) = $that.data.id
RETURN a.i, a.prop, b.i, b.prop c.i, c.prop
Enter fullscreen mode Exit fullscreen mode

The MATCH portion looks similar to our standing query, but this time we're not monitoring the graph, we're fetching data from the three-node pattern rooted at (c).

Replacing the count-1000-results output with inspect-results from below would accomplish just that.

inspect-results:
  type: CypherQuery
  query: |-
    MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)
    WHERE id(c) = $that.data.id
    RETURN a.i, a.prop, b.i, b.prop c.i, c.prop
  andThen:
    type: PrintToStandardOut
Enter fullscreen mode Exit fullscreen mode

The outputs stage of a standing query is where you can express your business logic and put Quine to work for you in your data pipeline. Take some time to review all of the possible output types in our API documentation located on https://quine.io.

Modifying standing queries

Modify a Standing Query Output

Another time that you need to notify Quine of changes in your standing queries is when you modify the outputs section of an existing standing query. The Quine API has two methods for the /api/v1/query/standing/{standing-query-name}/output/{standing-query-output-name} endpoint that allow you to DELETE and POST a new output to an existing standing query.

From above, let's change the original standing query output type from Drop to a new CypherQuery that outputs the matches to the console. We will use two API calls to accomplish the change.

Delete the existing output:

curl --request DELETE \
  --url http://0.0.0.0:8080/api/v1/query/standing/STANDING-1/output/count-1000-results \
  --header 'Content-Type: application/json'
Enter fullscreen mode Exit fullscreen mode

Create the new output:

curl --request POST \
  --url http://0.0.0.0:8080/api/v1/query/standing/STANDING-1/output/inspect-results \
  --header 'Content-Type: application/json' \
  --data '{
    "type": "CypherQuery",
    "query": "MATCH (a)-[:div_by_ten]->(b)-[:div_by_ten]->(c)\nWHERE id(c) = $that.data.id\nRETURN a.i, a.prop, b.i, b.prop c.i, c.prop",
    "andThen": {
      "type": "PrintToStandardOut"
    }
}'
Enter fullscreen mode Exit fullscreen mode

Propagate a New Standing Query

When a new standing query is registered in the system, it gets automatically registered only new nodes (or old nodes that are loaded back into the cache). This behavior is the default because pro-actively setting the standing query on all existing data might be quite costly depending on how much historical data there is. So Quine defaults to the most efficient option.

However, sometimes there is a need to actively propagate standing queries across all previously ingested data as well. You can use the API to request that Quine propagate a new standing query to all nodes in the existing graph. Here's how the request looks in curl.

curl --request POST \
  --url http://0.0.0.0:8080/api/v1/query/standing/control/propagate?include-sleeping=true \
  --header 'Content-Type: application/json'
Enter fullscreen mode Exit fullscreen mode

Review the in-product API documentation via the Quine web interface for additional code snippets.

Conclusion

In this blog post, we looked at the different types of standing queries that you can create in Quine. A standing query is a powerful tool for data processing because it allows you to express your business logic as part of your data pipeline. We also looked at how you can modify an existing standing query output type and propagate a new standing query across the graph.

Quine is open source if you want to explore standing queries for yourself using your own data. Download a precompiled version or build it yourself from the codebase from the Quine Github codebase.

Have a question, suggestion, or improvement? I welcome your feedback! Please drop into Quine Slack and let me know. I'm always happy to discuss Quine or answer questions.


Research

From: https://drive.google.com/file/d/17uw36E3juptE2QEEwKt-WLRhdXLM9r_R/view?usp=sharing

Standing Query

Quine implements a facility for executing any query type as a “Standing Query” which is persisted in the graph in this fashion. When a query is issued as a Standing Query, it includes callback functions describing what to do in each of the four cases where a node:

1) Initially matches the query 
2) Initially does not match the query 
3) Initially did not match, but the node data changed so that it now does match the query
4) Initially did match, but the node data changed so that it no longer matches the query 

To implement standing queries, Quine stores the result of each successive decomposition of a query (into smaller and smaller branches) on the node issuing that portion of the query. The query issued to the next node is essentially a subscription to the next nodes status as either matching the query, or not. Changes in the next node’s state result in a notification to the querying node. In this way, a complex query is relayed through the graph, where each node subscribes to whether the next node fulfills the smaller query. When a complete match is made, a special actor is notified with the results and the appropriate callback (established with the original query) is called. These callbacks can simply return results, but can also execute arbitrary functionality, like initiating new queries, or even rewriting parts of the graph when certain patterns are matched.


From: Quine Innovations, Part II

Query Execution

Quine is implemented as a graph interpreter. A query is dropped into the graph and turned into a result by a recursive of process of: evaluate the first part of the query locally, and if it matches the requirements, relay the remainder to more nodes connected to the first (as relevant for the query definition), aggregating and processing the results returned. The relayed remainder of the query is smaller than the initial payload processed by the node in question. The process is repeated until the entire query is “consumed” and the relevant results returned and aggregated.

The process of resolving a query happens in two directions:

  1. the query is consumed while extending the remaining query parts to the next relevant set of nodes,
  2. results are returned from all relevant participating nodes in the query. The first component of query resolution is about exploration of the existing data graph to determine which nodes are responsible for resolving which portion of the query. The second component focuses on how results are returned to the requesting node recursively to produce the full and final result.

Quine enables the unique opportunity to perform these steps separately so that the result of performing the first part (exploration) can result in a back-pressured stream of results delivered only when the consumer is ready to consume each of the next results. This allows the Quine system to achieve maximal memory and computational efficiency by computing a “recipe for results” in phase 1 which does not execute until the optimal moment when a consumer is ready to receive the results in phase 2. The back-pressure technique to slow a stream of data processing is well-know in the streaming data community; however the application of it in a granular node-by-node fashion when resolving graph queries is a novel innovation.

Discussion (0)