Integrations

vs-warehouse is a single, maintained source of truth for NZ, AU and OECD statistics. Pull once from one API and it flows into whatever tools your team already uses โ€” data warehouses, orchestration pipelines, BI tools, notebooks. All integrations authenticate with your API key. Get one free โ†’

Fivetran dbt Airflow Prefect Jupyter DuckDB Redshift BigQuery Databricks

Fivetran

Connector SDK

Build a custom Fivetran connector that syncs any vs-warehouse series into your destination warehouse on a schedule.

1. Install the SDK
pip install fivetran-connector-sdk requests
2. Create connector.py
from fivetran_connector_sdk import Connector, Operations as op
import requests

BASE = "https://api.virtus-solutions.io/v1/series"

# List the series you want to sync
SERIES = ["nz_cpi", "nz_gdp", "nz_unemployment"]

def schema(configuration):
    return [{"table": name, "primary_key": ["date"]} for name in SERIES]

def update(configuration, state):
    api_key = configuration["api_key"]
    for name in SERIES:
        rows = requests.get(
            f"{BASE}/{name}/data",
            headers={"X-API-Key": api_key}
        ).json()
        for row in rows:
            yield op.upsert(name, row)
    yield op.checkpoint({})

connector = Connector(update=update, schema=schema)

if __name__ == "__main__":
    connector.debug()
3. Test locally
python connector.py debug
4. Deploy to Fivetran

In the Fivetran dashboard: Connectors โ†’ Add connector โ†’ Connector SDK, upload connector.py and set api_key in the configuration.

dbt

Snowflake share

If you're on the Enterprise plan, mount the vs-warehouse Snowflake share and reference the tables directly in your dbt models โ€” no ingestion needed.

1. Mount the share (one-time, in Snowflake)
CREATE DATABASE vs_warehouse
  FROM SHARE MRWTYQY-WZ79363.vs_warehouse_share;
2. Add as a source in sources.yml
sources:
  - name: vs_warehouse
    database: vs_warehouse
    schema: stats_nz
    tables:
      - name: nz_cpi
      - name: nz_gdp_production_annual

  - name: vs_warehouse_oecd
    database: vs_warehouse
    schema: oecd
    tables:
      - name: nz_cpi
3. Reference in a model
-- models/inflation_trend.sql
SELECT
    date,
    value AS cpi_annual_change_pct
FROM {{ source('vs_warehouse_oecd', 'nz_cpi') }}
WHERE date >= '2015-01-01'
ORDER BY date
4. Run
dbt run --select inflation_trend

Apache Airflow

DAG

Schedule a DAG to pull vs-warehouse series into your data warehouse on a recurring basis.

DAG โ€” daily sync to a Postgres table
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
import requests, pandas as pd
from sqlalchemy import create_engine

SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE   = "https://api.virtus-solutions.io/v1/series"

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def vs_warehouse_sync():

    @task
    def fetch_and_load(name: str):
        api_key = Variable.get("VS_WAREHOUSE_API_KEY")
        rows = requests.get(
            f"{BASE}/{name}/data",
            headers={"X-API-Key": api_key},
            timeout=60,
        ).json()
        df = pd.DataFrame(rows)
        engine = create_engine(Variable.get("WAREHOUSE_CONN"))
        df.to_sql(name, engine, schema="vs_warehouse",
                  if_exists="replace", index=False)

    for s in SERIES:
        fetch_and_load(s)

vs_warehouse_sync()

Set VS_WAREHOUSE_API_KEY and WAREHOUSE_CONN as Airflow Variables in Admin โ†’ Variables.

Prefect

Flow

A lightweight Prefect flow to sync series on a schedule. Works with Prefect Cloud or self-hosted.

Install
pip install prefect requests pandas
Flow
from prefect import flow, task
import requests, pandas as pd

BASE    = "https://api.virtus-solutions.io/v1/series"
API_KEY = "your_key_here"
SERIES  = ["nz_cpi", "nz_gdp_production_annual"]

@task(retries=3, retry_delay_seconds=10)
def fetch_series(name: str) -> pd.DataFrame:
    rows = requests.get(
        f"{BASE}/{name}/data",
        headers={"X-API-Key": API_KEY},
        timeout=60,
    ).json()
    return pd.DataFrame(rows)

@task
def save(df: pd.DataFrame, name: str):
    df.to_parquet(f"{name}.parquet", index=False)
    print(f"Saved {len(df)} rows โ†’ {name}.parquet")

@flow(name="vs-warehouse-sync")
def sync():
    for name in SERIES:
        df = fetch_series(name)
        save(df, name)

if __name__ == "__main__":
    sync.serve(name="daily-sync", cron="0 6 * * *")

Jupyter

Notebook

Pull data directly into a notebook for analysis and visualisation.

Install
pip install vswarehouse pandas matplotlib
Fetch and plot
from vswarehouse import Client
import matplotlib.pyplot as plt

client = Client("your_key_here")

cpi  = client.get("nz_cpi",  start="2010-01-01")
gdp  = client.get("nz_gdp_production_annual", start="2010-01-01")

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6))
ax1.plot(cpi["date"], cpi["value"])
ax1.set_title("NZ CPI Annual Change (%)")
ax2.plot(gdp["date"], gdp["value"])
ax2.set_title("NZ GDP Growth (%)")
plt.tight_layout()
plt.show()
Combine multiple series into one DataFrame
import pandas as pd
from vswarehouse import Client

client = Client("your_key_here")
series = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]

df = pd.concat(
    [client.get(s, start="2015-01-01").assign(series=s) for s in series]
).pivot(index="date", columns="series", values="value")

print(df.tail())

DuckDB

SQL

Query vs-warehouse directly from DuckDB โ€” no ingestion step needed. read_json_auto fetches and parses the response in a single SQL expression.

Install
pip install duckdb
Query in Python
import duckdb

API_KEY = "your_key_here"

con = duckdb.connect()

df = con.execute("""
    SELECT *
    FROM read_json_auto(
        'https://api.virtus-solutions.io/v1/series/nz_cpi/data?limit=1000',
        headers = {'X-API-Key': '""" + API_KEY + """'}
    )
    WHERE date >= '2015-01-01'
    ORDER BY date
""").df()

print(df.tail())
Load multiple series into one table
import duckdb

API_KEY = "your_key_here"
SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE   = "https://api.virtus-solutions.io/v1/series"

con = duckdb.connect()
con.execute("CREATE TABLE economic_data AS SELECT * FROM range(0) LIMIT 0")

for name in SERIES:
    con.execute(f"""
        INSERT INTO economic_data
        SELECT *, '{name}' AS series_name
        FROM read_json_auto(
            '{BASE}/{name}/data?limit=1000',
            headers = {{'X-API-Key': '{API_KEY}'}}
        )
    """)

print(con.execute("SELECT series_name, COUNT(*) FROM economic_data GROUP BY 1").df())

Amazon Redshift

psycopg2 / SQLAlchemy

Fetch series via the API and load them into Redshift using psycopg2 or SQLAlchemy. Suitable for one-off loads or scheduling with Lambda / Airflow.

Install
pip install requests pandas sqlalchemy redshift-connector
Load to Redshift
import requests, pandas as pd
from sqlalchemy import create_engine

API_KEY  = "your_key_here"
REDSHIFT = "redshift+redshift_connector://user:pass@host:5439/dbname"
SERIES   = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE     = "https://api.virtus-solutions.io/v1/series"

engine = create_engine(REDSHIFT)

for name in SERIES:
    rows = requests.get(
        f"{BASE}/{name}/data",
        headers={"X-API-Key": API_KEY},
        timeout=60,
    ).json()
    df = pd.DataFrame(rows)
    df.to_sql(name, engine, schema="vs_warehouse",
              if_exists="replace", index=False, method="multi")
    print(f"Loaded {len(df)} rows โ†’ vs_warehouse.{name}")

For large datasets, consider staging via S3 and using COPY instead of row-by-row inserts โ€” it's significantly faster on Redshift.

Google BigQuery

google-cloud-bigquery

Fetch series and load them into BigQuery using the official Python client. Each series lands in its own table under a vs_warehouse dataset.

Install
pip install requests pandas google-cloud-bigquery pyarrow
Load to BigQuery
import requests, pandas as pd
from google.cloud import bigquery

API_KEY  = "your_key_here"
PROJECT  = "your-gcp-project"
DATASET  = "vs_warehouse"
SERIES   = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE     = "https://api.virtus-solutions.io/v1/series"

client = bigquery.Client(project=PROJECT)

# Create dataset if it doesn't exist
client.create_dataset(bigquery.Dataset(f"{PROJECT}.{DATASET}"), exists_ok=True)

for name in SERIES:
    rows = requests.get(
        f"{BASE}/{name}/data",
        headers={"X-API-Key": API_KEY},
        timeout=60,
    ).json()
    df = pd.DataFrame(rows)

    table_id = f"{PROJECT}.{DATASET}.{name}"
    job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
    job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
    job.result()
    print(f"Loaded {len(df)} rows โ†’ {table_id}")

Authenticate with gcloud auth application-default login locally, or set GOOGLE_APPLICATION_CREDENTIALS to a service account key file in production.

Databricks

Spark / SQL connector

Load vs-warehouse series into a Delta table on Databricks. Works in a notebook or as a job.

Option A โ€” notebook (requests + Spark)
import requests, pandas as pd

API_KEY = dbutils.secrets.get(scope="vs-warehouse", key="api-key")
SERIES  = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE    = "https://api.virtus-solutions.io/v1/series"

for name in SERIES:
    rows = requests.get(
        f"{BASE}/{name}/data",
        headers={"X-API-Key": API_KEY},
        timeout=60,
    ).json()
    pdf = pd.DataFrame(rows)
    sdf = spark.createDataFrame(pdf)
    sdf.write.format("delta").mode("overwrite").saveAsTable(f"vs_warehouse.{name}")
    print(f"Saved {pdf.shape[0]} rows โ†’ vs_warehouse.{name}")
Option B โ€” from outside a cluster (SQL connector)
pip install requests pandas databricks-sql-connector
import requests, pandas as pd
from databricks import sql

API_KEY    = "your_vs_warehouse_key"
DB_HOST    = "your-workspace.azuredatabricks.net"
HTTP_PATH  = "/sql/1.0/warehouses/your_warehouse_id"
DB_TOKEN   = "your_databricks_token"

conn = sql.connect(server_hostname=DB_HOST, http_path=HTTP_PATH, access_token=DB_TOKEN)
cursor = conn.cursor()

SERIES = ["nz_cpi", "nz_gdp_production_annual"]
BASE   = "https://api.virtus-solutions.io/v1/series"

for name in SERIES:
    rows = requests.get(f"{BASE}/{name}/data", headers={"X-API-Key": API_KEY}, timeout=60).json()
    df = pd.DataFrame(rows)
    cols = ", ".join(f"{c} STRING" for c in df.columns)
    cursor.execute(f"CREATE TABLE IF NOT EXISTS vs_warehouse.{name} ({cols})")
    cursor.execute(f"DELETE FROM vs_warehouse.{name}")
    data = [tuple(str(v) for v in row) for row in df.itertuples(index=False)]
    placeholders = ", ".join(["?"] * len(df.columns))
    cursor.executemany(f"INSERT INTO vs_warehouse.{name} VALUES ({placeholders})", data)

conn.close()

Missing an integration?

Let us know and we'll add it โ€” or contribute it yourself, the client libraries are open source.

Request an integration โ†’