Weaverbird python package
The python module provide a way to turn pipelines into transformation functions or queries.
Purpose
This package is meant as a building block to create servers capable of understanding and executing such pipelines, and returning results to clients. It provides several ways to understand and run weaverbird’s pipelines, called backends.
Backends can either provide:
- a way to execute pipelines directly (let’s call them executor backends).
- a way to translate pipelines into queries meant to be run against a database (let’s call them translator backends).
Installation
pip install weaverbird
Usage
:warning: This doc is provisional, implementation pending
The package exposes:
- a pydantic model
Pipeline
which mirror the pipeline definition used by the front-end - several
weaverbird.backends.xxxx
sub-modules, each exposing:- either a
translate_pipeline
function (for translator backends), - or an
execute_pipeline
function (for executor backends).
- either a
Pipeline
model: validation
Using the pydantic model, one can validate that a series of pipeline steps are valid:
from weaverbird.pipeline import Pipeline
pipeline_steps = [{'name': 'domain', 'domain': 'example'}]
pipeline = Pipeline(steps=pipeline_steps)
A ValidationError
is raised when the provided steps are not valid:
> Pipeline()
ValidationError: 1 validation error for Pipeline
steps
field required (type=value_error.missing)
> Pipeline([{'name': 'domain', 'domain': 'example'}, {'name': 'invalid'}])
ValidationError: 130 validation errors for Pipeline
steps -> 1 -> name
unexpected value; permitted: 'addmissingdates' (type=value_error.const; given=invalid; permitted=['addmissingdates'])
[...]
Pipeline combinations: references
Pipelines can reference other pipelines in certain steps: domain, append, and join.
A method resolve_references
is provided to find and replace recursively all references.
It must be called before trying to execute or translate a pipeline.
Pipeline : variables
Some fields can contain variables instead of the actual value.
They are accepted only in the model PipelineWithVariables
.
This model provides a method to replace all variables by their value.
It must be called before trying to execute or translate a pipeline.
Executor backends: execute a pipeline
import pandas as pd
from weaverbird.backends.pandas_executor import execute_pipeline
def domain_retriever(domain_name: str) -> pd.DataFrame:
return pd.read_csv(f'./datasets/{domain_name}.csv')
pipeline = [
{'name': 'domain', 'domain': 'example'},
{'name': 'filter', 'condition': {
'column': 'planet',
'operator': 'eq',
'value': 'Earth',
}}
]
execute_pipeline(pipeline, domain_retriever)
where:
pipeline
is an instance of thePipeline
modeldomain_retriever
is a function that, from an identifier, returns a correspondingpanda
’sDataFrame
The result of execute_pipeline
is a tuple formed by:
- the transformed
DataFrame
, - a
PipelineExecutionReport
with details about time and memory usage for each of its steps.
As of today, only one executor backend exists for python, based on pandas.
Translator backends: translate a pipeline into a query
from weaverbird.backends.sqlite_translator import translate_pipeline
def domain_to_table_identifier(domain_name: str) -> str:
return domain_name
pipeline = [
{'name': 'domain', 'domain': 'example'},
{'name': 'filter', 'condition': {
'column': 'planet',
'operator': 'eq',
'value': 'Earth',
}}
]
translate_pipeline(pipeline, domain_to_table_identifier)
# SELECT * FROM example WHERE planet='Earth'
where:
pipeline
is an instance of thePipeline
modeldomain_to_table_identifier
is an optional function that, from an identifier, returns the corresponding identifier of the table in the targeted database
The result of translate_pipeline
is a query, generally a str
(but other types could be possible, like a list
or dict
for MongoDB queries).
As of today, no translator backend exists for python. We plan to implement one for MongoDB, and one for Snowflake SQL.
Summary
┌────────────────────────────┐
│ │
│ pipeline with variables │
│ │
└──────────────┬─────────────┘
│
│ PipelieWithVariables.render
│
▼
┌────────────────────────────┐
│ │
│ pipeline │
│ │
└──────────────┬─────────────┘
│
│ input dataframes
│ │
│ │
OR────────────────────────────────────────┐ │
│ │ ▼
│ │
│ translate_pipeline │ execute_pipeline
│ │
▼ ▼
┌─────────┐ ┌────────────────────┐
│ query │ │ output dataframe │
└─────────┘ └────────────────────┘
How to: add a new translator
The only requirement is to create a dedicated sub-package inside the weaverbird/backends/
directory,
exposing a translate_pipeline
function, following the signature already explained in the previous
section.
Example of what would look like a basic mongo translator implementing weaverbird’s steps
domain
, select
, lowercase
and join
:
# weaverbird/backends/mongo_translator/__init__.py
from typing import Callable, List
from weaverbird.pipeline import Pipeline, steps
def domain_to_table_identifier(domain_name: str) -> str:
return domain_name
def translate_pipeline(
pipeline: Pipeline, domain_to_collection_identifier: Callable
) -> List[dict]:
"""Translate a weaverbird pipeline to a mongo aggregation pipeline"""
mongo_pipeline = []
# Iterate on all the steps of the pipeline, and translate them
# one by one:
for step in pipeline.steps:
if isinstance(step, DomainStep):
mongo_step = {"$match": {"domain": step.domain}} # specific to toucan toco
mongo_pipeline.append(mongo_step)
elif isinstance(step, SelectStep):
mongo_step = {"$project": {col: 1 for col in step.columns}}
mongo_pipeline.append(mongo_step)
elif isinstance(step, LowercaseStep):
mongo_step = {"$addFields": {step.column: {"$toLower": f'${step.column}'}}}
mongo_pipeline.append(mongo_step)
elif isinstance(step, JoinStep):
mongo_let = {}
mongo_expr_and = []
for (left_on, right_on) in step.on:
mongo_let[slugify(left_on)] = f'${left_on}'
mongo_expr_and.push(
{
'$eq': [f'${right_on}', f'$${slugify(left_on)}'],
}
)
right_domain = step.right_pipeline[0].domain
right_without_domain = step.right_pipeline[1:]
right_mongo_pipeline = translate_pipeline(right_without_domain)
right_mongo_pipeline.append({"$match": {"$expr": {"$and": mongo_expr_and}}})
mongo_step = {
"$lookup": {
"from": domain_to_collection_identifier(right_domain),
"let": mongo_let,
"pipeline": right_mongo_pipeline,
"as": '_vqbJoinKey',
},
}
mongo_pipeline.append(mongo_step)
mongo_pipeline.append({'$unwind': '$_vqbJoinKey'})
mongo_pipeline.append({
'$replaceRoot': {'newRoot': {'$mergeObjects': ['$_vqbJoinKey', '$$ROOT']}},
})
mongo_pipeline.append({'$project': {'_vqbJoinKey': 0}})
else:
raise NotImplementedError
return mongo_pipeline
Of course in a real case you would split the work in several functions and files.
Test your translator
(work in progress)
For each weaverbird’s step, we provide one or several JSON fixtures containing:
- some input data
- the configuration of the step
- the expected output
It is up to you to write a test executor which will read the input, execute the step, and check the output is the one expected.
If your translator does not implement all weaverbird steps, you must declare which one are supported.
For example, testing a mongo translator would require:
- spawning a mongodb server
- store the input data in a collection
- translate the step with
translate_pipeline
- run the resulting query against the mongodb collection
- compare the output with the expected one
TODO: show some utils for spawning containers, reading input, comparing ouput
Playground server
See playground.py
. It provides a simple server that showcase how to use the module and test it.
- Previous
- Next