vs-warehouse is a single, maintained source of truth for NZ, AU and OECD statistics. Pull once from one API and it flows into whatever tools your team already uses โ data warehouses, orchestration pipelines, BI tools, notebooks. All integrations authenticate with your API key. Get one free โ
Build a custom Fivetran connector that syncs any vs-warehouse series into your destination warehouse on a schedule.
pip install fivetran-connector-sdk requests
connector.pyfrom fivetran_connector_sdk import Connector, Operations as op
import requests
BASE = "https://api.virtus-solutions.io/v1/series"
# List the series you want to sync
SERIES = ["nz_cpi", "nz_gdp", "nz_unemployment"]
def schema(configuration):
return [{"table": name, "primary_key": ["date"]} for name in SERIES]
def update(configuration, state):
api_key = configuration["api_key"]
for name in SERIES:
rows = requests.get(
f"{BASE}/{name}/data",
headers={"X-API-Key": api_key}
).json()
for row in rows:
yield op.upsert(name, row)
yield op.checkpoint({})
connector = Connector(update=update, schema=schema)
if __name__ == "__main__":
connector.debug()
python connector.py debug
In the Fivetran dashboard: Connectors โ Add connector โ Connector SDK, upload connector.py and set api_key in the configuration.
If you're on the Enterprise plan, mount the vs-warehouse Snowflake share and reference the tables directly in your dbt models โ no ingestion needed.
CREATE DATABASE vs_warehouse FROM SHARE MRWTYQY-WZ79363.vs_warehouse_share;
sources.ymlsources:
- name: vs_warehouse
database: vs_warehouse
schema: stats_nz
tables:
- name: nz_cpi
- name: nz_gdp_production_annual
- name: vs_warehouse_oecd
database: vs_warehouse
schema: oecd
tables:
- name: nz_cpi
-- models/inflation_trend.sql
SELECT
date,
value AS cpi_annual_change_pct
FROM {{ source('vs_warehouse_oecd', 'nz_cpi') }}
WHERE date >= '2015-01-01'
ORDER BY date
dbt run --select inflation_trend
Schedule a DAG to pull vs-warehouse series into your data warehouse on a recurring basis.
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
import requests, pandas as pd
from sqlalchemy import create_engine
SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE = "https://api.virtus-solutions.io/v1/series"
@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def vs_warehouse_sync():
@task
def fetch_and_load(name: str):
api_key = Variable.get("VS_WAREHOUSE_API_KEY")
rows = requests.get(
f"{BASE}/{name}/data",
headers={"X-API-Key": api_key},
timeout=60,
).json()
df = pd.DataFrame(rows)
engine = create_engine(Variable.get("WAREHOUSE_CONN"))
df.to_sql(name, engine, schema="vs_warehouse",
if_exists="replace", index=False)
for s in SERIES:
fetch_and_load(s)
vs_warehouse_sync()
Set VS_WAREHOUSE_API_KEY and WAREHOUSE_CONN as Airflow Variables in Admin โ Variables.
A lightweight Prefect flow to sync series on a schedule. Works with Prefect Cloud or self-hosted.
pip install prefect requests pandas
from prefect import flow, task
import requests, pandas as pd
BASE = "https://api.virtus-solutions.io/v1/series"
API_KEY = "your_key_here"
SERIES = ["nz_cpi", "nz_gdp_production_annual"]
@task(retries=3, retry_delay_seconds=10)
def fetch_series(name: str) -> pd.DataFrame:
rows = requests.get(
f"{BASE}/{name}/data",
headers={"X-API-Key": API_KEY},
timeout=60,
).json()
return pd.DataFrame(rows)
@task
def save(df: pd.DataFrame, name: str):
df.to_parquet(f"{name}.parquet", index=False)
print(f"Saved {len(df)} rows โ {name}.parquet")
@flow(name="vs-warehouse-sync")
def sync():
for name in SERIES:
df = fetch_series(name)
save(df, name)
if __name__ == "__main__":
sync.serve(name="daily-sync", cron="0 6 * * *")
Pull data directly into a notebook for analysis and visualisation.
pip install vswarehouse pandas matplotlib
from vswarehouse import Client
import matplotlib.pyplot as plt
client = Client("your_key_here")
cpi = client.get("nz_cpi", start="2010-01-01")
gdp = client.get("nz_gdp_production_annual", start="2010-01-01")
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6))
ax1.plot(cpi["date"], cpi["value"])
ax1.set_title("NZ CPI Annual Change (%)")
ax2.plot(gdp["date"], gdp["value"])
ax2.set_title("NZ GDP Growth (%)")
plt.tight_layout()
plt.show()
import pandas as pd
from vswarehouse import Client
client = Client("your_key_here")
series = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
df = pd.concat(
[client.get(s, start="2015-01-01").assign(series=s) for s in series]
).pivot(index="date", columns="series", values="value")
print(df.tail())
Query vs-warehouse directly from DuckDB โ no ingestion step needed. read_json_auto fetches and parses the response in a single SQL expression.
pip install duckdb
import duckdb
API_KEY = "your_key_here"
con = duckdb.connect()
df = con.execute("""
SELECT *
FROM read_json_auto(
'https://api.virtus-solutions.io/v1/series/nz_cpi/data?limit=1000',
headers = {'X-API-Key': '""" + API_KEY + """'}
)
WHERE date >= '2015-01-01'
ORDER BY date
""").df()
print(df.tail())
import duckdb
API_KEY = "your_key_here"
SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE = "https://api.virtus-solutions.io/v1/series"
con = duckdb.connect()
con.execute("CREATE TABLE economic_data AS SELECT * FROM range(0) LIMIT 0")
for name in SERIES:
con.execute(f"""
INSERT INTO economic_data
SELECT *, '{name}' AS series_name
FROM read_json_auto(
'{BASE}/{name}/data?limit=1000',
headers = {{'X-API-Key': '{API_KEY}'}}
)
""")
print(con.execute("SELECT series_name, COUNT(*) FROM economic_data GROUP BY 1").df())
Fetch series via the API and load them into Redshift using psycopg2 or SQLAlchemy. Suitable for one-off loads or scheduling with Lambda / Airflow.
pip install requests pandas sqlalchemy redshift-connector
import requests, pandas as pd
from sqlalchemy import create_engine
API_KEY = "your_key_here"
REDSHIFT = "redshift+redshift_connector://user:pass@host:5439/dbname"
SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE = "https://api.virtus-solutions.io/v1/series"
engine = create_engine(REDSHIFT)
for name in SERIES:
rows = requests.get(
f"{BASE}/{name}/data",
headers={"X-API-Key": API_KEY},
timeout=60,
).json()
df = pd.DataFrame(rows)
df.to_sql(name, engine, schema="vs_warehouse",
if_exists="replace", index=False, method="multi")
print(f"Loaded {len(df)} rows โ vs_warehouse.{name}")
For large datasets, consider staging via S3 and using COPY instead of row-by-row inserts โ it's significantly faster on Redshift.
Fetch series and load them into BigQuery using the official Python client. Each series lands in its own table under a vs_warehouse dataset.
pip install requests pandas google-cloud-bigquery pyarrow
import requests, pandas as pd
from google.cloud import bigquery
API_KEY = "your_key_here"
PROJECT = "your-gcp-project"
DATASET = "vs_warehouse"
SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE = "https://api.virtus-solutions.io/v1/series"
client = bigquery.Client(project=PROJECT)
# Create dataset if it doesn't exist
client.create_dataset(bigquery.Dataset(f"{PROJECT}.{DATASET}"), exists_ok=True)
for name in SERIES:
rows = requests.get(
f"{BASE}/{name}/data",
headers={"X-API-Key": API_KEY},
timeout=60,
).json()
df = pd.DataFrame(rows)
table_id = f"{PROJECT}.{DATASET}.{name}"
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()
print(f"Loaded {len(df)} rows โ {table_id}")
Authenticate with gcloud auth application-default login locally, or set GOOGLE_APPLICATION_CREDENTIALS to a service account key file in production.
Load vs-warehouse series into a Delta table on Databricks. Works in a notebook or as a job.
import requests, pandas as pd
API_KEY = dbutils.secrets.get(scope="vs-warehouse", key="api-key")
SERIES = ["nz_cpi", "nz_gdp_production_annual", "nz_unemployment"]
BASE = "https://api.virtus-solutions.io/v1/series"
for name in SERIES:
rows = requests.get(
f"{BASE}/{name}/data",
headers={"X-API-Key": API_KEY},
timeout=60,
).json()
pdf = pd.DataFrame(rows)
sdf = spark.createDataFrame(pdf)
sdf.write.format("delta").mode("overwrite").saveAsTable(f"vs_warehouse.{name}")
print(f"Saved {pdf.shape[0]} rows โ vs_warehouse.{name}")
pip install requests pandas databricks-sql-connector
import requests, pandas as pd
from databricks import sql
API_KEY = "your_vs_warehouse_key"
DB_HOST = "your-workspace.azuredatabricks.net"
HTTP_PATH = "/sql/1.0/warehouses/your_warehouse_id"
DB_TOKEN = "your_databricks_token"
conn = sql.connect(server_hostname=DB_HOST, http_path=HTTP_PATH, access_token=DB_TOKEN)
cursor = conn.cursor()
SERIES = ["nz_cpi", "nz_gdp_production_annual"]
BASE = "https://api.virtus-solutions.io/v1/series"
for name in SERIES:
rows = requests.get(f"{BASE}/{name}/data", headers={"X-API-Key": API_KEY}, timeout=60).json()
df = pd.DataFrame(rows)
cols = ", ".join(f"{c} STRING" for c in df.columns)
cursor.execute(f"CREATE TABLE IF NOT EXISTS vs_warehouse.{name} ({cols})")
cursor.execute(f"DELETE FROM vs_warehouse.{name}")
data = [tuple(str(v) for v in row) for row in df.itertuples(index=False)]
placeholders = ", ".join(["?"] * len(df.columns))
cursor.executemany(f"INSERT INTO vs_warehouse.{name} VALUES ({placeholders})", data)
conn.close()
Let us know and we'll add it โ or contribute it yourself, the client libraries are open source.
Request an integration โ