Migrating to Partitioned Native Google BigQuery Table From External GCS Files

Skip to the solution if you just want to know how to migrate GCS data to a partitioned BigQuery table.

I've recently been working on an Apache Kafka/Confluent data pipeline to analyse event streams. I decided to use Google Cloud BigQuery for the data analysis as it seemed to be easy to get set up with and extremely powerful. But to get up and running I'd need to backfill all my existing data. I also decided to add it to a time-partitioned table to increase performance and reduce costs.

Troubleshooting

I found that WePay had already developed a connector to push Kafka topics to BigQuery. And bonus: it had recently been updated to support timestamp partitioning. I set up the connector and chose my timestamp field. All seemed to be going well until I started seeing odd errors in the logs and the connector was constantly retrying to send batches.

Value 1624291503000000 for field timestamp of the destination table is outside the allowed bounds. You can only stream to date range within 365 days in the past and 183 days in the future relative to the current date.

BigQuery's timestamp-partitioned tables were unable to accept data with a timestamp from before a year ago or more than 6 months in the future. If I was to backfill the data, I'd need to find another way to do it.

(The eagle eyed of you might have noticed that's a future timestamp, not a past one. It turns out some of the events Apple sends to us can be in the future, presumably because they believe users' system clocks!)

With a bit more digging, I found that BigQuery could read from external sources, such as Google Cloud Storage (GCS). It wouldn't be as well-performing but it would work temporarily while I filled the old data in. There's also a GCS connector for Kafka too and it can be configured to hive partition the data.

Another Snag

And then another snag! While I could create a native table from my GCS source, I wouldn't be able to partition it on a field. Mostly because BigQuery doesn't read the schema until it creates the table so doesn't give partitioning on anything other than ingestion time, which is useless in this case.

The Solution

A quick conversation with Kevin, the angel from Google support, and he suggested it's possible to create partitioned tables from query results.

First create a BigQuery table from the external GCS data so you're in a position to query the data, I used AVRO format files as the source but CSV, Parquet and JSON are all supported. This can be partitioned, but doesn't need to be,

Then install the google-cloud-sdk with the command line tools, authenticate and set the project.

gcloud auth login
gcloud config set project

And finally, run the query into a new partitioned table:

bq query --destination_table :.  \
    --time_partitioning_field \
    --use_legacy_sql=false \
    'SELECT * FROM `..
`;'

The --destination_table flag is for your new table and --time_partitioning_field for the field you're using for partitioning; I used a DATE field, but TIMESTAMP and INTEGER fields are also supported.

And that's it, incredibly simple.

Posted on May 22, 2020

Discuss This

blog comments powered by Disqus