Transparent schema evolution is a hard problem to solve. At Lightspeed, we have tens of thousands of customers and each of them has their own database schemas that evolve as new features are added to the platform. The events from those databases are streamed in real-time to our data lake where they are immediately available for further processing and analysis.

By abstracting the underlying structural changes, our objective is to make the use and consumption of data straightforward and easy for users.. Most data engineers and consumers will surely remember a time when some hidden upstream table change broke a processing job or at least led to incorrect results. When table structures are constantly being modified, the processing and analysis of datasets prove to be challenging.

We want to share the solution that we—the data team at Lightspeed—found to address some aspects of schema evolution and its implementation in BigQuery (BQ). All the code is open source and contributions are welcome and encouraged. We intentionally left out security and permissions from this article to keep the focus on database schema evolution challenges.

Source code:
https://github.com/lightspeed/bigquery-uniview

The initial block of our change data capture (CDC) processing is a homebrew MySQL Binary log Parser & Pusher (BP&P). It is a process that reads all database changes and generates JSON records containing data objects (representations of rows that were inserted, deleted or updated). The process also generates several helpful metadata objects such as database name, table name, operation type and table schema version, to name a few. This process also keeps track of MySQL table column types and their mapping to their BigQuery counterparts. We will talk about those components in a separate blog post so stay tuned.

Change Event Flow
Change Event Flow

Example of process flow:

  1. Table customer is created in MySQL
  2. Records are inserted in the customer table
  3. The Binlog Parser & Pusher detects data movement (DDL and DML) on a new, previously un-tracked table
    1. BP&P generates a unique but consistent hash to identify the records’ origin table - 5b6ffe5fb, for instance
    2. BP&P initiates a process that creates the table customer_v_5b6ffe5fb in BigQuery, matching its MySQL customer counterpart.
    3. BP&P initiates the creation of a unified view that includes the newly created customer_v_5b6ffe5fb table
    4. BP&P generates a json record with customer data and pushes it to the message queue
  4. Sink process retrieves the records from the queue and streams them to the BigQuery’s customer_v_5b6ffe5fb table

Simplified json record produced by our Binlog Parser & Pusher.

{
    "data": {
        "id": 1,
        "first_name" : "John",
        "last_name" : "Doe",
        "full_name" : "John Doe"
    },
    "metadata": {
        "ingestion_method": "BINLOG",
        "table_name": "customer",
        "table_version": "5b6ffe5fb",
        "database_name": "customer1"
    }
}

A particularly important aspect within the flow is the table hash aka the table version. The table version is based on the table name, column names, as well as their types and type sizes. The same table version guarantees the same table structure. Different table versions indicate there are some structural differences between the tables.

Let’s ignore step 3.c in the process flow for a moment and think about the challenge that we are now facing. The Binlog Parser & Pusher process just detected a new table version and created its counterpart in BQ. This may be the first table version of that table which would correspond to newly created MySQL tables Alternatively, it may be a new hash for a table we already have in BigQuery. The latter case represents a table alteration. After the records are inserted in their designated versioned tables, we still need to address how to select the records from all the versions.

BigQuery supports querying multiple tables with a single statement by using the wildcard character at the end of a table selector. Having tables customer_abc and customer_def, the query below would return results from both tables if their schemas are compatible.

SELECT
    *
FROM
    customer_*;

The result set from this query matches the schema definition of the table which was created as last (relative to the other tables matched).

If the definition of the latest table is not union-compatible with all of the other tables matched by the wildcard, the query will fail. This means that if we have two versions of the same table, where one version has an integer column and the other version has that same column typed as a string, querying across those two tables using a wildcard will result in an error. BigQuery can automatically coerce values to correct for minor type discrepancies, as specified by the data conversion rules, but this mechanism is quite limited.

Using a few sample records to demonstrate the expected and actual results, let’s consider a simple example highlighting two errors that can happen with a basic table schema.

To simplify the examples below, the hashes abc and def will be used instead of the actual hash value to make it easier to follow through the steps.

Customers Table version 1 — customer_v_abc
id (integer) first_name (string) last_name (string) full_name (string)
1 John Doe John Doe
2 Tom Smith Tom Smith
Customers Table version 2 — customer_v_def
id (string) first_name (string) last_name (string) email (string)
1 John Doe j.doe@example.com
2 Tom Smith t.smith@example.com
SELECT
    id,
    first_name,
    last_name,
    full_name,
    email
FROM
    customer_v_*

In our ideal scenario, we would like the result of the query above to return rows from both table versions and to include a superset of the columns. We would like the id column to automatically cast integer values to strings.

id (string) first_name (string) last_name (string) full_name (string) email (string)
1 John Doe j.doe@example.com
2 Tom Smith t.smith@example.com
1 John Doe John Doe
2 Tom Smith Tom Smith

Instead, BigQuery returns an error message, “Cannot read field ‘id’ of type INT64 as STRING”, because BigQuery does not perform the implicit conversion between integer and string. Explicitly casting the id field to string does not help either because it happens too late in the query execution phase. This means that the query will still fail.

Also, the statement result is missing the full_name column—the last version of customer_v_def only contains the id, first_name, last_name and email columns.

id (string) first_name (string) last_name (string) email (string)
1 John Doe j.doe@example.com
2 Tom Smith t.smith@example.com
1 John Doe
2 Tom Smith

Because the BigQuery wildcard feature doesn’t handle implicit casting and only exposes columns present in one of the underlying tables, we had to come up with a different approach.

That approach is to create one unifying view per shared table prefix which finds records from all its associated versions. A static list of conversion rules is determined and values that have different data types are automatically cast based on those rules.

CREATE OR REPLACE VIEW customer_unified AS
SELECT
    CAST(id AS STRING) AS id, first_name, last_name, full_name, NULL AS email
FROM
    customer_v_abc
UNION ALL
SELECT
    CAST(id AS STRING) AS id, first_name, last_name, NULL AS full_name, email
FROM
    customer_v_def;

Example of unified view for customer table

BigQuery recently added INFORMATION_SCHEMA metadata, which facilitates the introspection of table structures. Using the metadata information, we can create views with behavior similar to the wildcard queries described above.

Querying INFORMATION_SCHEMA for the structure of the versioned customer table (versions abc and def), we can obtain the following descriptive result set:

Row table_name cols_name cols.types
1 customer_v_abc id INT64
STRING
first_name STRING
last_name STRING
full_name STRING
2 customer_v_def id INT64
STRING
first_name STRING
last_name STRING
email STRING

With the structure of both table versions, we generate the view customer_unified, which unifies all the customer versions under a single schema.

Under this unification strategy, columns that conflict in their data type are cast to a common representation. The specific cast used depends on the conflicting data types. The goal is to extend the target data type. We go from a more specific type to one that is more generic. This mitigates any information loss. As such, STRING represents our most generic type since it can represent any other BQ type.

The rules for selecting a cast to resolve a type conflict are:

Conflict Types Cast To
DATETIME TIMESTAMP TIMESTAMP
ANY ANY STRING

The view creation is handled by a Cloud Function triggered by a Pub/Sub topic. Our BigQuery data sink process sends a message to the topic after it creates a new BigQuery table. The message contains the dataset name, table name and the table version. Once triggered, the function reads the metadata for all of the versions of the table base name present in the INFORMATION_SCHEMA and creates or replaces the unified view.

{
    "dataset": "...",
    "table": {
        "name": "customer",
        "version": "abc"
    }
}

Querying this view returns records from both table versions as we expected. The view is automatically updated every time a new table version is added. We haven’t observed any query interruption during the view alteration.

SELECT *
  FROM customer_unified

Querying the unified view now leads to the desired resultset.

Unified View Query Result
id (string) first_name (string) last_name (string) full_name (string) email (string)
1 John Doe j.doe@example.com
2 Tom Smith t.smith@example.com
1 John Doe John Doe
2 Tom Smith Tom Smith

What’s Next

We would like to invite anyone interested to fork or contribute to this utility.

https://github.com/lightspeed/bigquery-uniview. Some of the areas of improvement can be:

  • Extended data type conversions
  • Two level views to bypass the 250K characters limit of view definition in BQ
  • Support complex types mapping (objects/json)

Final thoughts

Most commercial ETL solutions address the presented problem by altering the destination table to match the structure changes of the source table. This approach has several drawbacks: because BQ only supports appending new columns, when a new column is created (and it is not at the end of the table) it creates a new table and copies the original data into it. The solution we present avoids data manipulation and focuses on the metadata layer, thus reducing such hurdles.

Seeing the high rate at which new features are added into BigQuery, we expect that sooner or later, a feature similar to the one presented in this post will become native.